This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d54c762118 [core] Introduce withBlobConsumer in TableWrite (#7074)
d54c762118 is described below
commit d54c762118394412dadcb047502c7dea1a65b11b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 19 21:48:37 2026 +0800
[core] Introduce withBlobConsumer in TableWrite (#7074)
---
.../java/org/apache/paimon/data/BlobConsumer.java | 30 ++++++
.../paimon/format/FileAwareFormatWriter.java | 31 ++++++
.../org/apache/paimon/append/AppendOnlyWriter.java | 9 +-
.../paimon/append/RollingBlobFileWriter.java | 19 ++--
.../apache/paimon/io/RollingFileWriterImpl.java | 2 +-
.../org/apache/paimon/io/SingleFileWriter.java | 29 +++--
.../paimon/operation/AbstractFileStoreWrite.java | 6 ++
.../paimon/operation/BaseAppendFileStoreWrite.java | 11 +-
.../apache/paimon/operation/FileStoreWrite.java | 3 +
.../paimon/table/format/FormatTableWrite.java | 11 +-
.../org/apache/paimon/table/sink/TableWrite.java | 15 +--
.../apache/paimon/table/sink/TableWriteImpl.java | 7 ++
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 +-
.../org/apache/paimon/append/BlobTableTest.java | 28 +++++
.../paimon/append/RollingBlobFileWriterTest.java | 20 ++--
.../apache/paimon/format/FileFormatSuffixTest.java | 117 ---------------------
.../paimon/io/KeyValueFileReadWriteTest.java | 91 +++++++++++++---
.../apache/paimon/format/blob/BlobFileFormat.java | 11 +-
.../paimon/format/blob/BlobFormatReader.java | 4 +-
.../paimon/format/blob/BlobFormatWriter.java | 35 +++++-
20 files changed, 302 insertions(+), 180 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java
new file mode 100644
index 0000000000..13ed1d3b37
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+/** A consumer for blob. */
+public interface BlobConsumer {
+
+ /**
+ * Accept a blob descriptor.
+ *
+ * @return Whether to flush to output stream.
+ */
+ boolean accept(BlobDescriptor blobDescriptor);
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
new file mode 100644
index 0000000000..0259fb72e2
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FileAwareFormatWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.Path;
+
+/** A {@link FormatWriter} that can be aware of the file. */
+public interface FileAwareFormatWriter extends FormatWriter {
+
+ /** Set the path of the file. */
+ void setFile(Path file);
+
+ /** Whether to delete the file upon abort. */
+ boolean deleteFileUponAbort();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 14e21e2265..61abbb0cb0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
@@ -92,6 +93,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
@Nullable private final IOManager ioManager;
private final FileIndexOptions fileIndexOptions;
private final MemorySize maxDiskSize;
+ @Nullable private final BlobConsumer blobConsumer;
@Nullable private CompactDeletionFile compactDeletionFile;
private SinkWriter<InternalRow> sinkWriter;
@@ -120,7 +122,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
MemorySize maxDiskSize,
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite,
- boolean statsDenseStore) {
+ boolean statsDenseStore,
+ @Nullable BlobConsumer blobConsumer) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -134,6 +137,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.forceCompact = forceCompact;
this.asyncFileWrite = asyncFileWrite;
this.statsDenseStore = statsDenseStore;
+ this.blobConsumer = blobConsumer;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
@@ -311,7 +315,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
fileIndexOptions,
FileSource.APPEND,
asyncFileWrite,
- statsDenseStore);
+ statsDenseStore,
+ blobConsumer);
}
return new RowDataRollingFileWriter(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 8d511c6b91..583c0ee44b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.append;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
@@ -44,6 +45,8 @@ import org.apache.paimon.utils.StatsCollectorFactories;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -91,7 +94,6 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>>
blobWriterFactory;
private final long targetFileSize;
- private final long blobTargetFileSize;
// State management
private final List<FileWriterAbortExecutor> closedWriters;
@@ -118,11 +120,10 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
FileIndexOptions fileIndexOptions,
FileSource fileSource,
boolean asyncFileWrite,
- boolean statsDenseStore) {
-
+ boolean statsDenseStore,
+ @Nullable BlobConsumer blobConsumer) {
// Initialize basic fields
this.targetFileSize = targetFileSize;
- this.blobTargetFileSize = blobTargetFileSize;
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
@@ -161,7 +162,8 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
fileSource,
asyncFileWrite,
statsDenseStore,
- blobTargetFileSize);
+ blobTargetFileSize,
+ blobConsumer);
}
/** Creates a factory for normal data writers. */
@@ -221,9 +223,10 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore,
- long targetFileSize) {
-
+ long targetFileSize,
+ @Nullable BlobConsumer blobConsumer) {
BlobFileFormat blobFileFormat = new BlobFileFormat();
+ blobFileFormat.setWriteConsumer(blobConsumer);
List<String> blobNames = blobType.getFieldNames();
// Validate blob field count
@@ -375,7 +378,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
/** Closes the main writer and returns its metadata. */
private DataFileMeta closeMainWriter() throws IOException {
currentWriter.close();
- closedWriters.add(currentWriter.writer().abortExecutor());
+ currentWriter.writer().abortExecutor().ifPresent(closedWriters::add);
return currentWriter.result();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
index 7353ea1971..299a4f9742 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -129,7 +129,7 @@ public class RollingFileWriterImpl<T, R> implements
RollingFileWriter<T, R> {
// only store abort executor in memory
// cannot store whole writer, it includes lots of memory for example
column vectors to read
// and write
- closedWriters.add(currentWriter.abortExecutor());
+ currentWriter.abortExecutor().ifPresent(closedWriters::add);
results.add(currentWriter.result());
currentWriter = null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 5a810d716f..aea24be3ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BundleFormatWriter;
+import org.apache.paimon.format.FileAwareFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SupportsDirectWrite;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Optional;
import java.util.function.Function;
/**
@@ -52,8 +54,9 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
protected final Path path;
private final Function<T, InternalRow> converter;
+ private boolean deleteFileUponAbort;
private FormatWriter writer;
- private PositionOutputStream out;
+ @Nullable private PositionOutputStream out;
@Nullable private Long outputBytes;
private long recordCount;
@@ -69,6 +72,8 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
this.fileIO = fileIO;
this.path = path;
this.converter = converter;
+ // true first to clean file in exception
+ this.deleteFileUponAbort = true;
try {
if (factory instanceof SupportsDirectWrite) {
@@ -80,6 +85,12 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
}
writer = factory.create(out, compression);
}
+
+ if (writer instanceof FileAwareFormatWriter) {
+ FileAwareFormatWriter fileAwareFormatWriter =
(FileAwareFormatWriter) writer;
+ fileAwareFormatWriter.setFile(path);
+ deleteFileUponAbort =
fileAwareFormatWriter.deleteFileUponAbort();
+ }
} catch (IOException e) {
LOG.warn(
"Failed to open the bulk writer, closing the output stream
and throw the error.",
@@ -118,7 +129,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
}
recordCount += bundle.rowCount();
} catch (Throwable e) {
- LOG.warn("Exception occurs when writing file " + path + ".
Cleaning up.", e);
+ LOG.warn("Exception occurs when writing file {}. Cleaning up.",
path, e);
abort();
throw e;
}
@@ -135,7 +146,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
recordCount++;
return rowData;
} catch (Throwable e) {
- LOG.warn("Exception occurs when writing file " + path + ".
Cleaning up.", e);
+ LOG.warn("Exception occurs when writing file {}. Cleaning up.",
path, e);
abort();
throw e;
}
@@ -160,15 +171,13 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
IOUtils.closeQuietly(out);
out = null;
}
- fileIO.deleteQuietly(path);
+ abortExecutor().ifPresent(FileWriterAbortExecutor::abort);
}
- public FileWriterAbortExecutor abortExecutor() {
- if (!closed) {
- throw new RuntimeException("Writer should be closed!");
- }
-
- return new FileWriterAbortExecutor(fileIO, path);
+ public Optional<FileWriterAbortExecutor> abortExecutor() {
+ return deleteFileUponAbort
+ ? Optional.of(new FileWriterAbortExecutor(fileIO, path))
+ : Optional.empty();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index ddf2addf53..f0e836a9ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -133,6 +134,11 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
return this;
}
+ @Override
+ public FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer) {
+ return this;
+ }
+
@Override
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 1c63f41b02..9eb49fee83 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.append.cluster.Sorter;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
@@ -82,6 +83,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private @Nullable List<String> writeCols;
private boolean forceBufferSpill = false;
private boolean withBlob;
+ private @Nullable BlobConsumer blobConsumer;
public BaseAppendFileStoreWrite(
FileIO fileIO,
@@ -109,6 +111,12 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
this.fileIndexOptions = options.indexColumnsOptions();
}
+ @Override
+ public BaseAppendFileStoreWrite withBlobConsumer(BlobConsumer
blobConsumer) {
+ this.blobConsumer = blobConsumer;
+ return this;
+ }
+
@Override
protected RecordWriter<InternalRow> createWriter(
BinaryRow partition,
@@ -142,7 +150,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
options.writeBufferSpillDiskSize(),
fileIndexOptions,
options.asyncFileWrite(),
- options.statsDenseStore());
+ options.statsDenseStore(),
+ blobConsumer);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index ce71b8cd2a..24cb9c1287 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.FileStore;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -64,6 +65,8 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
*/
FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory
memoryPoolFactory);
+ FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer);
+
/**
* Set whether the write operation should ignore previously stored files.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index c74bf95e1a..2f49fa1585 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.format;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.casting.DefaultValueRow;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
@@ -43,14 +44,13 @@ import java.util.stream.Collectors;
/** {@link TableWrite} implementation for format table. */
public class FormatTableWrite implements BatchTableWrite {
- private RowType rowType;
+ private final RowType rowType;
private final FormatTableFileWriter write;
private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
private final int[] notNullFieldIndex;
private final @Nullable DefaultValueRow defaultValueRow;
private final ProjectedRow projectedRow;
- private final RowType writeRowType;
public FormatTableWrite(
FileIO fileIO,
@@ -68,7 +68,7 @@ public class FormatTableWrite implements BatchTableWrite {
.collect(Collectors.toList());
this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
this.defaultValueRow = DefaultValueRow.create(rowType);
- this.writeRowType =
+ RowType writeRowType =
rowType.project(
rowType.getFieldNames().stream()
.filter(name ->
!partitionType.getFieldNames().contains(name))
@@ -117,6 +117,11 @@ public class FormatTableWrite implements BatchTableWrite {
return this;
}
+ @Override
+ public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public BatchTableWrite withIOManager(IOManager ioManager) {
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index a36f314cd6..536ebf856a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -20,11 +20,11 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
@@ -43,13 +43,16 @@ public interface TableWrite extends AutoCloseable {
/** Specified the write rowType. */
TableWrite withWriteType(RowType writeType);
- /** With {@link MemorySegmentPool} for the current table write. */
- default TableWrite withMemoryPool(MemorySegmentPool memoryPool) {
- return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
- }
-
+ /** With a shared {@link MemoryPoolFactory} for the current table write. */
TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
+ /**
+ * With a blob consumer for the current table write, all newly written
Blobs will be called back
+ * through this interface. Note that when this method is invoked,
temporary files will not be
+ * deleted in case of exceptions. Please delete them yourself.
+ */
+ TableWrite withBlobConsumer(BlobConsumer blobConsumer);
+
/** Calculate which partition {@code row} belongs to. */
BinaryRow getPartition(InternalRow row);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index b5e9cebbb5..101a51e8a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.FileStore;
import org.apache.paimon.casting.DefaultValueRow;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BundleRecords;
@@ -128,6 +129,12 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
return this;
}
+ @Override
+ public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
+ write.withBlobConsumer(blobConsumer);
+ return this;
+ }
+
public TableWriteImpl<T> withCompactExecutor(ExecutorService
compactExecutor) {
write.withCompactExecutor(compactExecutor);
return this;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 56426fce2e..bd31672d6a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -710,7 +710,8 @@ public class AppendOnlyWriterTest {
MemorySize.MAX_VALUE,
new FileIndexOptions(),
true,
- false);
+ false,
+ null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 5bc9105e77..c393ed9017 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -22,25 +22,32 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.UriReader;
import org.junit.jupiter.api.Test;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -54,6 +61,27 @@ public class BlobTableTest extends TableTestBase {
private final byte[] blobBytes = randomBytes();
+ @Test
+ public void testBlobConsumer() throws Exception {
+ createTableDefault();
+ FileStoreTable table = getTableDefault();
+ List<BlobDescriptor> blobs = new ArrayList<>();
+ try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
+ write.withBlobConsumer(blobs::add);
+ write.write(dataDefault(0, 0));
+ write.write(dataDefault(0, 0));
+ }
+ assertThat(blobs.size()).isEqualTo(2);
+ FileIO fileIO = table.fileIO();
+ UriReader uriReader = UriReader.fromFile(fileIO);
+ for (BlobDescriptor blob : blobs) {
+ assertThat(Blob.fromDescriptor(uriReader,
blob).toData()).isEqualTo(blobBytes);
+ }
+ for (BlobDescriptor blob : blobs) {
+ fileIO.deleteQuietly(new Path(blob.uri()));
+ }
+ }
+
@Test
public void testBasic() throws Exception {
createTableDefault();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index e2e68905c8..8fe9f193dc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -105,8 +105,8 @@ public class RollingBlobFileWriterTest {
new FileIndexOptions(),
FileSource.APPEND,
false, // asyncFileWrite
- false // statsDenseStore
- );
+ false, // statsDenseStore
+ null);
}
@Test
@@ -205,8 +205,8 @@ public class RollingBlobFileWriterTest {
new FileIndexOptions(),
FileSource.APPEND,
false, // asyncFileWrite
- false // statsDenseStore
- );
+ false, // statsDenseStore
+ null);
// Create large blob data that will exceed the blob target file size
byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -282,8 +282,8 @@ public class RollingBlobFileWriterTest {
new FileIndexOptions(),
FileSource.APPEND,
false, // asyncFileWrite
- false // statsDenseStore
- );
+ false, // statsDenseStore
+ null);
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -361,8 +361,8 @@ public class RollingBlobFileWriterTest {
new FileIndexOptions(),
FileSource.APPEND,
false, // asyncFileWrite
- false // statsDenseStore
- );
+ false, // statsDenseStore
+ null);
// Create blob data that will trigger rolling (non-descriptor mode:
direct blob data)
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -579,8 +579,8 @@ public class RollingBlobFileWriterTest {
new FileIndexOptions(),
FileSource.APPEND,
false, // asyncFileWrite
- false // statsDenseStore
- );
+ false, // statsDenseStore
+ null);
// Write data
for (int i = 0; i < 3; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
deleted file mode 100644
index 7d1b4f502f..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.append.AppendOnlyWriter;
-import org.apache.paimon.append.BucketedAppendCompactManager;
-import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.KeyValueFileReadWriteTest;
-import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.StatsCollectorFactories;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Collections;
-import java.util.LinkedList;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** test file format suffix. */
-public class FileFormatSuffixTest extends KeyValueFileReadWriteTest {
-
- private static final RowType SCHEMA =
- RowType.of(
- new DataType[] {new IntType(), new VarCharType(), new
VarCharType()},
- new String[] {"id", "name", "dt"});
-
- @Test
- public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws
Exception {
- String format = "avro";
- KeyValueFileWriterFactory writerFactory =
createWriterFactory(tempDir.toString(), format);
- Path path = writerFactory.pathFactory(0).newPath();
- assertThat(path.toString().endsWith(format)).isTrue();
-
- DataFilePathFactory dataFilePathFactory =
- new DataFilePathFactory(
- new Path(tempDir + "/dt=1/bucket-1"),
- format,
- CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
- CoreOptions.FILE_COMPRESSION.defaultValue(),
- null);
- FileFormat fileFormat = FileFormat.fromIdentifier(format, new
Options());
- LinkedList<DataFileMeta> toCompact = new LinkedList<>();
- CoreOptions options =
- new
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
- AppendOnlyWriter appendOnlyWriter =
- new AppendOnlyWriter(
- LocalFileIO.create(),
- IOManager.create(tempDir.toString()),
- 0,
- fileFormat,
- 10,
- 10,
- SCHEMA,
- null,
- 0,
- new BucketedAppendCompactManager(
- null, toCompact, null, 4, 10, false, null,
null), // not used
- null,
- false,
- dataFilePathFactory,
- null,
- false,
- false,
- CoreOptions.FILE_COMPRESSION.defaultValue(),
- CompressOptions.defaultOptions(),
- new StatsCollectorFactories(options),
- MemorySize.MAX_VALUE,
- new FileIndexOptions(),
- true,
- false);
- appendOnlyWriter.setMemoryPool(
- new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
- appendOnlyWriter.write(
- GenericRow.of(1, BinaryString.fromString("aaa"),
BinaryString.fromString("1")));
- CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
- appendOnlyWriter.close();
-
- DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
- assertThat(meta.fileName().endsWith(format)).isTrue();
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 025f5849c5..f5c26445bf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -22,10 +22,17 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializerTest;
import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.append.AppendOnlyWriter;
+import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.fs.FileIO;
@@ -34,14 +41,21 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
@@ -50,8 +64,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -224,38 +240,83 @@ public class KeyValueFileReadWriteTest {
kv.value().getInt(1))));
}
- protected KeyValueFileWriterFactory createWriterFactory(String pathStr,
String format) {
- Path path = new Path(pathStr);
- FileStorePathFactory pathFactory =
- new FileStorePathFactory(
- path,
- RowType.of(),
- CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+ @Test
+ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws
Exception {
+ RowType schema =
+ RowType.of(
+ new DataType[] {new IntType(), new VarCharType(), new
VarCharType()},
+ new String[] {"id", "name", "dt"});
+
+ String format = "avro";
+ KeyValueFileWriterFactory writerFactory =
createWriterFactory(tempDir.toString(), format);
+ Path path = writerFactory.pathFactory(0).newPath();
+ assertThat(path.toString().endsWith(format)).isTrue();
+
+ DataFilePathFactory dataFilePathFactory =
+ new DataFilePathFactory(
+ new Path(tempDir + "/dt=1/bucket-1"),
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null);
+ FileFormat fileFormat = FileFormat.fromIdentifier(format, new
Options());
+ LinkedList<DataFileMeta> toCompact = new LinkedList<>();
+ CoreOptions options =
+ new
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
+ AppendOnlyWriter appendOnlyWriter =
+ new AppendOnlyWriter(
+ LocalFileIO.create(),
+ IOManager.create(tempDir.toString()),
+ 0,
+ fileFormat,
+ 10,
+ 10,
+ schema,
+ null,
+ 0,
+ new BucketedAppendCompactManager(
+ null, toCompact, null, 4, 10, false, null,
null), // not used
null,
+ false,
+ dataFilePathFactory,
null,
- CoreOptions.ExternalPathStrategy.NONE,
+ false,
+ false,
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ CompressOptions.defaultOptions(),
+ new StatsCollectorFactories(options),
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions(),
+ true,
false,
null);
+ appendOnlyWriter.setMemoryPool(
+ new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
+ appendOnlyWriter.write(
+ GenericRow.of(1, BinaryString.fromString("aaa"),
BinaryString.fromString("1")));
+ CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
+ appendOnlyWriter.close();
+
+ DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0);
+ assertThat(meta.fileName().endsWith(format)).isTrue();
+ }
+
+ protected KeyValueFileWriterFactory createWriterFactory(String pathStr,
String format) {
+ Path path = new Path(pathStr);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
Function<String, FileStorePathFactory> pathFactoryMap =
- new Function<String, FileStorePathFactory>() {
- @Override
- public FileStorePathFactory apply(String format) {
- return new FileStorePathFactory(
+ format1 ->
+ new FileStorePathFactory(
path,
RowType.of(),
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
- format,
+ format1,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
@@ -266,8 +327,6 @@ public class KeyValueFileReadWriteTest {
CoreOptions.ExternalPathStrategy.NONE,
false,
null);
- }
- };
return KeyValueFileWriterFactory.builder(
fileIO,
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
index 78f06b486b..06ebc18b6f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.blob;
+import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.EmptyStatsExtractor;
import org.apache.paimon.format.FileFormat;
@@ -49,6 +50,8 @@ public class BlobFileFormat extends FileFormat {
private final boolean blobAsDescriptor;
+ @Nullable public BlobConsumer writeConsumer;
+
public BlobFileFormat() {
this(false);
}
@@ -62,6 +65,10 @@ public class BlobFileFormat extends FileFormat {
return fileName.endsWith("." + BlobFileFormatFactory.IDENTIFIER);
}
+ public void setWriteConsumer(@Nullable BlobConsumer writeConsumer) {
+ this.writeConsumer = writeConsumer;
+ }
+
@Override
public FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType,
@@ -90,11 +97,11 @@ public class BlobFileFormat extends FileFormat {
return Optional.of(new EmptyStatsExtractor());
}
- private static class BlobFormatWriterFactory implements
FormatWriterFactory {
+ private class BlobFormatWriterFactory implements FormatWriterFactory {
@Override
public FormatWriter create(PositionOutputStream out, String
compression) {
- return new BlobFormatWriter(out);
+ return new BlobFormatWriter(out, writeConsumer);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index bed455fda7..b06fb05d96 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -37,6 +37,7 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
private final FileIO fileIO;
private final Path filePath;
+ private final String filePathString;
private final BlobFileMeta fileMeta;
private final @Nullable SeekableInputStream in;
@@ -46,6 +47,7 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
FileIO fileIO, Path filePath, BlobFileMeta fileMeta, @Nullable
SeekableInputStream in) {
this.fileIO = fileIO;
this.filePath = filePath;
+ this.filePathString = filePath.toString();
this.fileMeta = fileMeta;
this.in = in;
this.returned = false;
@@ -86,7 +88,7 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
if (in != null) {
blob = Blob.fromData(readInlineBlob(in, offset, length));
} else {
- blob = Blob.fromFile(fileIO, filePath.toString(), offset,
length);
+ blob = Blob.fromFile(fileIO, filePathString, offset,
length);
}
currentPosition++;
return GenericRow.of(blob);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index a1b1c113d5..4198edc27e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -19,13 +19,19 @@
package org.apache.paimon.format.blob;
import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobConsumer;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileAwareFormatWriter;
import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.utils.DeltaVarintCompressor;
import org.apache.paimon.utils.LongArrayList;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.zip.CRC32;
@@ -34,24 +40,38 @@ import static
org.apache.paimon.utils.StreamUtils.intToLittleEndian;
import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
/** {@link FormatWriter} for blob file. */
-public class BlobFormatWriter implements FormatWriter {
+public class BlobFormatWriter implements FileAwareFormatWriter {
public static final byte VERSION = 1;
public static final int MAGIC_NUMBER = 1481511375;
public static final byte[] MAGIC_NUMBER_BYTES =
intToLittleEndian(MAGIC_NUMBER);
private final PositionOutputStream out;
+ @Nullable private final BlobConsumer writeConsumer;
private final CRC32 crc32;
private final byte[] tmpBuffer;
private final LongArrayList lengths;
- public BlobFormatWriter(PositionOutputStream out) {
+ private String pathString;
+
+ public BlobFormatWriter(PositionOutputStream out, @Nullable BlobConsumer
writeConsumer) {
this.out = out;
+ this.writeConsumer = writeConsumer;
this.crc32 = new CRC32();
this.tmpBuffer = new byte[4096];
this.lengths = new LongArrayList(16);
}
+ @Override
+ public void setFile(Path file) {
+ this.pathString = file.toString();
+ }
+
+ @Override
+ public boolean deleteFileUponAbort() {
+ return writeConsumer == null;
+ }
+
@Override
public void addElement(InternalRow element) throws IOException {
checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only
support one field.");
@@ -62,6 +82,8 @@ public class BlobFormatWriter implements FormatWriter {
crc32.reset();
write(MAGIC_NUMBER_BYTES);
+
+ long blobPos = out.getPos();
try (SeekableInputStream in = blob.newInputStream()) {
int bytesRead = in.read(tmpBuffer);
while (bytesRead >= 0) {
@@ -70,12 +92,21 @@ public class BlobFormatWriter implements FormatWriter {
}
}
+ long blobLength = out.getPos() - blobPos;
long binLength = out.getPos() - previousPos + 12;
lengths.add(binLength);
byte[] lenBytes = longToLittleEndian(binLength);
write(lenBytes);
int crcValue = (int) crc32.getValue();
out.write(intToLittleEndian(crcValue));
+
+ if (writeConsumer != null) {
+ BlobDescriptor descriptor = new BlobDescriptor(pathString,
blobPos, blobLength);
+ boolean flush = writeConsumer.accept(descriptor);
+ if (flush) {
+ out.flush();
+ }
+ }
}
private void write(byte[] bytes) throws IOException {