This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c5ba6e4e5 [core][flink] supports reading blob descriptors from
external storage (#7231)
5c5ba6e4e5 is described below
commit 5c5ba6e4e5ce6f0cc5f9d28ac5fbb27affd7cc05
Author: Faiz <[email protected]>
AuthorDate: Mon Feb 9 17:03:04 2026 +0800
[core][flink] supports reading blob descriptors from external storage
(#7231)
---
docs/content/append-table/blob.md | 18 +++
.../main/java/org/apache/paimon/CoreOptions.java | 2 +
.../apache/paimon/utils/BlobDescriptorUtils.java | 56 +++++++++
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +-
.../paimon/flink/ExternalBlobDescriptorITCase.java | 137 +++++++++++++++++++++
.../paimon/flink/IsolatedDirectoryFileIO.java | 130 +++++++++++++++++++
.../services/org.apache.paimon.fs.FileIOLoader | 1 +
7 files changed, 351 insertions(+), 4 deletions(-)
diff --git a/docs/content/append-table/blob.md
b/docs/content/append-table/blob.md
index 23cea8aa51..cfae5d0605 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -122,6 +122,24 @@ For details about the blob file format structure, see
[File Format - BLOB]({{< r
*Required for blob functionality to work correctly.
+Specifically, if the storage system of the input BlobDescriptor differs from
that used by Paimon,
+you can specify the storage configuration for the input blob descriptor using
the prefix
+`blob-descriptor.`. For example, if the source data is stored in a different
OSS endpoint,
+you can configure it as below (using flink sql as an example):
+```sql
+CREATE TABLE image_table (
+ id INT,
+ name STRING,
+ image BYTES
+) WITH (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true',
+ 'blob-field' = 'image',
+ 'fs.oss.endpoint' = 'aaa', -- This is for Paimon's own
config
+ 'blob-descriptor.fs.oss.endpoint' = 'bbb' -- This is for input blob
descriptors' config
+);
+```
+
## SQL Usage
### Creating a Table
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e9de99e59d..ee3f8dd7e2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -90,6 +90,8 @@ public class CoreOptions implements Serializable {
public static final String COLUMNS = "columns";
+ public static final String BLOB_DESCRIPTOR_PREFIX = "blob-descriptor.";
+
public static final ConfigOption<TableType> TYPE =
key("type")
.enumType(TableType.class)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java
new file mode 100644
index 0000000000..3c48f37fa3
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.options.Options;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BLOB_DESCRIPTOR_PREFIX;
+
+/** Utils for {@link BlobDescriptor}. */
+public class BlobDescriptorUtils {
+
+ /**
+ * Try to create a {@link CatalogContext} for input {@link
BlobDescriptor}. This enables reading
+ * descriptors from external storages which can be different from paimon's
own.
+ */
+ public static CatalogContext getCatalogContext(
+ @Nullable CatalogContext currentContext, Options tableOptions) {
+ Map<String, String> descriptorSpecified = new HashMap<>();
+ for (Map.Entry<String, String> entry :
tableOptions.toMap().entrySet()) {
+ String key = entry.getKey();
+ if (key != null && key.startsWith(BLOB_DESCRIPTOR_PREFIX)) {
+ descriptorSpecified.put(
+ key.substring(BLOB_DESCRIPTOR_PREFIX.length()),
entry.getValue());
+ }
+ }
+
+ if (descriptorSpecified.isEmpty()) {
+ return currentContext == null ?
CatalogContext.create(tableOptions) : currentContext;
+ } else {
+ return CatalogContext.create(Options.fromMap(descriptorSpecified));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 46c1f040ec..da8af4ed78 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PostponeUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.utils.BlobDescriptorUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -207,12 +208,14 @@ public class FlinkSinkBuilder {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ CatalogContext contextForDescriptor =
+ BlobDescriptorUtils.getCatalogContext(
+ table.catalogEnvironment().catalogContext(),
+ table.coreOptions().toConfiguration());
+
DataStream<InternalRow> input =
mapToInternalRow(
- this.input,
- table.rowType(),
- blobAsDescriptor,
- table.catalogEnvironment().catalogContext());
+ this.input, table.rowType(), blobAsDescriptor,
contextForDescriptor);
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator<InternalRow> newInput =
input.forward()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
new file mode 100644
index 0000000000..a928cc1492
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.UriReaderFactory;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for external blob descriptors. */
+public class ExternalBlobDescriptorITCase extends CatalogITCaseBase {
+
+ private static final Random RANDOM = new Random();
+
+ @TempDir private Path warehouse;
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT,
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture',
'blob-as-descriptor'='true')",
+ "CREATE TABLE external_blob_table_descriptor (id INT, data
STRING, picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob-field'='picture',
'blob-as-descriptor'='true', 'blob-descriptor.root-dir'='"
+ + "isolated://"
+ + warehouse
+ + "')");
+ }
+
+ @Override
+ protected Map<String, String> catalogOptions() {
+ return Collections.singletonMap("root-dir", path);
+ }
+
+ @Override
+ protected String getTempDirPath() {
+ // make paimon use isolated file io.
+ return "isolated://" + super.getTempDirPath();
+ }
+
+ @Test
+ public void testWriteBlobDescriptorFromExternalStorage() throws Exception {
+ byte[] blobData = new byte[1024 * 1024];
+ RANDOM.nextBytes(blobData);
+ FileIO fileIO = new LocalFileIO();
+ String blobPath = warehouse + "/external_blob";
+ try (OutputStream outputStream =
+ fileIO.newOutputStream(new org.apache.paimon.fs.Path("file://"
+ blobPath), true)) {
+ outputStream.write(blobData);
+ }
+
+ String isolatedPath = "isolated://" + blobPath;
+
+ // directly write should raise an error
+ Assertions.assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO blob_table_descriptor
VALUES (1, 'paimon', sys.path_to_descriptor('"
+ + isolatedPath
+ + "'))"))
+ .isInstanceOf(Exception.class)
+ .hasRootCauseInstanceOf(UnsupportedOperationException.class);
+
+ // however, we can specify io config for input descriptors
+ // this mocks using paimon through connector mode (not catalog mode)
+ batchSql(
+ "INSERT INTO external_blob_table_descriptor VALUES (1,
'paimon', sys.path_to_descriptor('"
+ + isolatedPath
+ + "'))");
+ byte[] newDescriptorBytes =
+ (byte[])
+ batchSql("SELECT picture FROM
external_blob_table_descriptor")
+ .get(0)
+ .getField(0);
+
+ BlobDescriptor newBlobDescriptor =
BlobDescriptor.deserialize(newDescriptorBytes);
+ assertBlobEquals(blobData, newBlobDescriptor);
+
+ // alternatively, we could also set options through dynamic options
+ tEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ "paimon.*.*.blob_table_descriptor."
+ + CoreOptions.BLOB_DESCRIPTOR_PREFIX
+ + IsolatedDirectoryFileIO.ROOT_DIR,
+ "isolated://" + warehouse);
+ batchSql(
+ "INSERT INTO blob_table_descriptor VALUES (1, 'paimon',
sys.path_to_descriptor('"
+ + isolatedPath
+ + "'))");
+ newDescriptorBytes =
+ (byte[]) batchSql("SELECT picture FROM
blob_table_descriptor").get(0).getField(0);
+ newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes);
+ assertBlobEquals(blobData, newBlobDescriptor);
+ }
+
+ private void assertBlobEquals(byte[] expected, BlobDescriptor
readDescriptor) {
+ Options options = new Options();
+ options.set("root-dir", path);
+ CatalogContext catalogContext = CatalogContext.create(options);
+ UriReaderFactory uriReaderFactory = new
UriReaderFactory(catalogContext);
+ Blob blob =
+
Blob.fromDescriptor(uriReaderFactory.create(readDescriptor.uri()),
readDescriptor);
+ assertThat(blob.toData()).isEqualTo(expected);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
new file mode 100644
index 0000000000..b473b26fc1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import java.io.IOException;
+
+/** An isolated LocalFileIO, only permitting accessing files under the root
directory. */
+public class IsolatedDirectoryFileIO extends LocalFileIO {
+
+ public static final String ROOT_DIR = "root-dir";
+
+ private Path root;
+
+ public IsolatedDirectoryFileIO() {}
+
+ @Override
+ public void configure(CatalogContext context) {
+ root = new Path(context.options().get(ROOT_DIR));
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ checkPath(path);
+ return super.newInputStream(path);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ checkPath(path);
+ return super.newOutputStream(path, overwrite);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ checkPath(path);
+ return super.getFileStatus(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ checkPath(path);
+ return super.listStatus(path);
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ checkPath(path);
+ return super.exists(path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ checkPath(path);
+ return super.delete(path, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ checkPath(path);
+ return super.mkdirs(path);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ checkPath(src);
+ checkPath(dst);
+ return super.rename(src, dst);
+ }
+
+ @Override
+ public void copyFile(Path sourcePath, Path targetPath, boolean overwrite)
throws IOException {
+ checkPath(sourcePath);
+ checkPath(targetPath);
+ super.copyFile(sourcePath, targetPath, overwrite);
+ }
+
+ private void checkPath(Path path) {
+ if (path == null) {
+ throw new NullPointerException("path is null");
+ }
+ if (!path.toString().startsWith(root.toString())) {
+ throw new UnsupportedOperationException(
+ "Isolated file io only supports reading child of root
directory "
+ + root
+ + ", but current accessing path is "
+ + path);
+ }
+ }
+
+ /** FileIOLoader of {@link IsolatedDirectoryFileIO}. */
+ public static final class Loader implements FileIOLoader {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getScheme() {
+ return "isolated";
+ }
+
+ @Override
+ public FileIO load(Path path) {
+ return new IsolatedDirectoryFileIO();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
index 0b01694979..1f1b56be00 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.paimon.flink.action.CopyFilesActionSlowFileIO$Loader
+org.apache.paimon.flink.IsolatedDirectoryFileIO$Loader
\ No newline at end of file