This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2e7febe4c3 [core] Support blob set target size (#6424)
2e7febe4c3 is described below
commit 2e7febe4c3f48bf99740fba8a683939215afb9f0
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 21 19:41:24 2025 +0800
[core] Support blob set target size (#6424)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 16 +++++
.../org/apache/paimon/append/AppendOnlyWriter.java | 4 ++
.../paimon/append/RollingBlobFileWriter.java | 5 +-
.../paimon/operation/BaseAppendFileStoreWrite.java | 1 +
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../paimon/append/RollingBlobFileWriterTest.java | 73 ++++++++++++++++++++++
.../apache/paimon/format/FileFormatSuffixTest.java | 1 +
8 files changed, 106 insertions(+), 1 deletion(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5d95f775aa..d058669f65 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -62,6 +62,12 @@ under the License.
<td>String</td>
<td>Specify the blob field.</td>
</tr>
+ <tr>
+ <td><h5>blob.target-file-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Target size of a blob file. Default is value of
TARGET_FILE_SIZE.</td>
+ </tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 00118784af..d64f2e40e9 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -623,6 +623,16 @@ public class CoreOptions implements Serializable {
text("append table: the default
value is 256 MB."))
.build());
+ public static final ConfigOption<MemorySize> BLOB_TARGET_FILE_SIZE =
+ key("blob.target-file-size")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Target size of a blob file.
Default is value of TARGET_FILE_SIZE.")
+ .build());
+
public static final ConfigOption<Integer>
NUM_SORTED_RUNS_COMPACTION_TRIGGER =
key("num-sorted-run.compaction-trigger")
.intType()
@@ -2445,6 +2455,12 @@ public class CoreOptions implements Serializable {
.getBytes();
}
+ public long blobTargetFileSize() {
+ return options.getOptional(BLOB_TARGET_FILE_SIZE)
+ .map(MemorySize::getBytes)
+ .orElse(targetFileSize(false));
+ }
+
public long compactionFileSize(boolean hasPrimaryKey) {
// file size to join the compaction, we don't process on middle file
size to avoid
// compact a same file twice (the compression is not calculate so
accurately. the output
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4cfceefecc..14e21e2265 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -72,6 +72,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final long schemaId;
private final FileFormat fileFormat;
private final long targetFileSize;
+ private final long blobTargetFileSize;
private final RowType writeSchema;
@Nullable private final List<String> writeCols;
private final DataFilePathFactory pathFactory;
@@ -102,6 +103,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
long schemaId,
FileFormat fileFormat,
long targetFileSize,
+ long blobTargetFileSize,
RowType writeSchema,
@Nullable List<String> writeCols,
long maxSequenceNumber,
@@ -123,6 +125,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.schemaId = schemaId;
this.fileFormat = fileFormat;
this.targetFileSize = targetFileSize;
+ this.blobTargetFileSize = blobTargetFileSize;
this.writeSchema = writeSchema;
this.writeCols = writeCols;
this.pathFactory = pathFactory;
@@ -299,6 +302,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
schemaId,
fileFormat,
targetFileSize,
+ blobTargetFileSize,
writeSchema,
pathFactory,
seqNumCounter,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 17d4be7c76..ba51f8d3bc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -89,6 +89,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
blobWriter;
private final long targetFileSize;
+ private final long blobTargetFileSize;
// State management
private final List<FileWriterAbortExecutor> closedWriters;
@@ -103,6 +104,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
long schemaId,
FileFormat fileFormat,
long targetFileSize,
+ long blobTargetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
LongCounter seqNumCounter,
@@ -115,6 +117,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
// Initialize basic fields
this.targetFileSize = targetFileSize;
+ this.blobTargetFileSize = blobTargetFileSize;
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
@@ -152,7 +155,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
fileSource,
asyncFileWrite,
statsDenseStore,
- targetFileSize);
+ blobTargetFileSize);
}
/** Creates a factory for normal data writers. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index ff04b2d1ed..08f9dd120f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -122,6 +122,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
schemaId,
fileFormat,
options.targetFileSize(false),
+ options.blobTargetFileSize(),
writeType,
writeCols,
restoredMaxSeqNumber,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 62a50699b3..b593cf0ef7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -693,6 +693,7 @@ public class AppendOnlyWriterTest {
SCHEMA_ID,
fileFormat,
targetFileSize,
+ targetFileSize,
AppendOnlyWriterTest.SCHEMA,
null,
getMaxSequenceNumber(toCompact),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index fef2e59136..fb74b5f203 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -95,6 +95,7 @@ public class RollingBlobFileWriterTest {
SCHEMA_ID,
FileFormat.fromIdentifier("parquet", new Options()),
TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
SCHEMA,
pathFactory,
seqNumCounter,
@@ -174,6 +175,78 @@ public class RollingBlobFileWriterTest {
assertThat(results).isNotEmpty();
}
+ @Test
+ public void testBlobTargetFileSize() throws IOException {
+ // Set a specific blob target file size (different from regular target
file size)
+ long blobTargetFileSize = 500 * 1024 * 1024L; // 2 MB for blob files
+
+ // Create a new writer with different blob target file size
+ RollingBlobFileWriter blobSizeTestWriter =
+ new RollingBlobFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ 128 * 1024 * 1024,
+ blobTargetFileSize, // Different blob target size
+ SCHEMA,
+ new DataFilePathFactory(
+ new Path(tempDir + "/blob-size-test"),
+ "parquet",
+ "data",
+ "changelog",
+ false,
+ null,
+ null),
+ new LongCounter(),
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new
Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false, // asyncFileWrite
+ false // statsDenseStore
+ );
+
+ // Create large blob data that will exceed the blob target file size
+ byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
+ new Random(123).nextBytes(largeBlobData);
+
+ // Write multiple rows with large blob data to trigger rolling
+ for (int i = 0; i < 400; i++) {
+ InternalRow row =
+ GenericRow.of(
+ i,
+ BinaryString.fromString("large-blob-test-" + i),
+ new BlobData(largeBlobData));
+ blobSizeTestWriter.write(row);
+ }
+
+ blobSizeTestWriter.close();
+ List<DataFileMeta> results = blobSizeTestWriter.result();
+
+ // Verify that we have multiple files due to rolling
+ assertThat(results.size()).isGreaterThan(1);
+
+ // Check that blob files (format = "blob") meet the target size
requirement
+ List<DataFileMeta> blobFiles =
+ results.stream()
+ .filter(file -> "blob".equals(file.fileFormat()))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles).isNotEmpty();
+
+ // Verify that blob files are close to the target size (within
reasonable tolerance)
+ for (DataFileMeta blobFile : blobFiles.subList(0, blobFiles.size() -
1)) {
+ long fileSize = blobFile.fileSize();
+ assertThat(fileSize)
+ .as("Blob file size should be close to target size")
+ .isGreaterThanOrEqualTo(blobTargetFileSize)
+ .isLessThanOrEqualTo(blobTargetFileSize +
largeBlobData.length);
+ }
+
+ // Verify total record count
+ assertThat(blobSizeTestWriter.recordCount()).isEqualTo(400);
+ }
+
@Test
public void testSchemaValidation() throws IOException {
// Test that the writer correctly handles the schema with blob field
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index a4dfdfd733..7d1b4f502f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,6 +85,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
0,
fileFormat,
10,
+ 10,
SCHEMA,
null,
0,