This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c5ba6e4e5 [core][flink] supports reading blob descriptors from 
external storage (#7231)
5c5ba6e4e5 is described below

commit 5c5ba6e4e5ce6f0cc5f9d28ac5fbb27affd7cc05
Author: Faiz <[email protected]>
AuthorDate: Mon Feb 9 17:03:04 2026 +0800

    [core][flink] supports reading blob descriptors from external storage 
(#7231)
---
 docs/content/append-table/blob.md                  |  18 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 +
 .../apache/paimon/utils/BlobDescriptorUtils.java   |  56 +++++++++
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  11 +-
 .../paimon/flink/ExternalBlobDescriptorITCase.java | 137 +++++++++++++++++++++
 .../paimon/flink/IsolatedDirectoryFileIO.java      | 130 +++++++++++++++++++
 .../services/org.apache.paimon.fs.FileIOLoader     |   1 +
 7 files changed, 351 insertions(+), 4 deletions(-)

diff --git a/docs/content/append-table/blob.md 
b/docs/content/append-table/blob.md
index 23cea8aa51..cfae5d0605 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -122,6 +122,24 @@ For details about the blob file format structure, see 
[File Format - BLOB]({{< r
 
 *Required for blob functionality to work correctly.
 
+Specifically, if the storage system of the input BlobDescriptor differs from 
that used by Paimon, 
+you can specify the storage configuration for the input blob descriptor using 
the prefix 
+`blob-descriptor.`. For example, if the source data is stored in a different 
OSS endpoint, 
+you can configure it as below (using flink sql as an example):
+```sql
+CREATE TABLE image_table (
+    id INT,
+    name STRING,
+    image BYTES
+) WITH (
+    'row-tracking.enabled' = 'true',
+    'data-evolution.enabled' = 'true',
+    'blob-field' = 'image',
+    'fs.oss.endpoint' = 'aaa',                   -- This is for Paimon's own 
config
+    'blob-descriptor.fs.oss.endpoint' = 'bbb'    -- This is for input blob 
descriptors' config
+);
+```
+
 ## SQL Usage
 
 ### Creating a Table
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e9de99e59d..ee3f8dd7e2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -90,6 +90,8 @@ public class CoreOptions implements Serializable {
 
     public static final String COLUMNS = "columns";
 
+    public static final String BLOB_DESCRIPTOR_PREFIX = "blob-descriptor.";
+
     public static final ConfigOption<TableType> TYPE =
             key("type")
                     .enumType(TableType.class)
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java
new file mode 100644
index 0000000000..3c48f37fa3
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/BlobDescriptorUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.options.Options;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BLOB_DESCRIPTOR_PREFIX;
+
+/** Utils for {@link BlobDescriptor}. */
+public class BlobDescriptorUtils {
+
+    /**
+     * Try to create a {@link CatalogContext} for input {@link 
BlobDescriptor}. This enables reading
+     * descriptors from external storages which can be different from paimon's 
own.
+     */
+    public static CatalogContext getCatalogContext(
+            @Nullable CatalogContext currentContext, Options tableOptions) {
+        Map<String, String> descriptorSpecified = new HashMap<>();
+        for (Map.Entry<String, String> entry : 
tableOptions.toMap().entrySet()) {
+            String key = entry.getKey();
+            if (key != null && key.startsWith(BLOB_DESCRIPTOR_PREFIX)) {
+                descriptorSpecified.put(
+                        key.substring(BLOB_DESCRIPTOR_PREFIX.length()), 
entry.getValue());
+            }
+        }
+
+        if (descriptorSpecified.isEmpty()) {
+            return currentContext == null ? 
CatalogContext.create(tableOptions) : currentContext;
+        } else {
+            return CatalogContext.create(Options.fromMap(descriptorSpecified));
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 46c1f040ec..da8af4ed78 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.PostponeUtils;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.utils.BlobDescriptorUtils;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -207,12 +208,14 @@ public class FlinkSinkBuilder {
         setParallelismIfAdaptiveConflict();
         input = trySortInput(input);
         boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+        CatalogContext contextForDescriptor =
+                BlobDescriptorUtils.getCatalogContext(
+                        table.catalogEnvironment().catalogContext(),
+                        table.coreOptions().toConfiguration());
+
         DataStream<InternalRow> input =
                 mapToInternalRow(
-                        this.input,
-                        table.rowType(),
-                        blobAsDescriptor,
-                        table.catalogEnvironment().catalogContext());
+                        this.input, table.rowType(), blobAsDescriptor, 
contextForDescriptor);
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
             SingleOutputStreamOperator<InternalRow> newInput =
                     input.forward()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
new file mode 100644
index 0000000000..a928cc1492
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ExternalBlobDescriptorITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.UriReaderFactory;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for external blob descriptors. */
+public class ExternalBlobDescriptorITCase extends CatalogITCaseBase {
+
+    private static final Random RANDOM = new Random();
+
+    @TempDir private Path warehouse;
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, 
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture', 
'blob-as-descriptor'='true')",
+                "CREATE TABLE external_blob_table_descriptor (id INT, data 
STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture', 
'blob-as-descriptor'='true', 'blob-descriptor.root-dir'='"
+                        + "isolated://"
+                        + warehouse
+                        + "')");
+    }
+
+    @Override
+    protected Map<String, String> catalogOptions() {
+        return Collections.singletonMap("root-dir", path);
+    }
+
+    @Override
+    protected String getTempDirPath() {
+        // make paimon use isolated file io.
+        return "isolated://" + super.getTempDirPath();
+    }
+
+    @Test
+    public void testWriteBlobDescriptorFromExternalStorage() throws Exception {
+        byte[] blobData = new byte[1024 * 1024];
+        RANDOM.nextBytes(blobData);
+        FileIO fileIO = new LocalFileIO();
+        String blobPath = warehouse + "/external_blob";
+        try (OutputStream outputStream =
+                fileIO.newOutputStream(new org.apache.paimon.fs.Path("file://" 
+ blobPath), true)) {
+            outputStream.write(blobData);
+        }
+
+        String isolatedPath = "isolated://" + blobPath;
+
+        // directly write should raise an error
+        Assertions.assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO blob_table_descriptor 
VALUES (1, 'paimon', sys.path_to_descriptor('"
+                                                + isolatedPath
+                                                + "'))"))
+                .isInstanceOf(Exception.class)
+                .hasRootCauseInstanceOf(UnsupportedOperationException.class);
+
+        // however, we can specify io config for input descriptors
+        // this mocks using paimon through connector mode (not catalog mode)
+        batchSql(
+                "INSERT INTO external_blob_table_descriptor VALUES (1, 
'paimon', sys.path_to_descriptor('"
+                        + isolatedPath
+                        + "'))");
+        byte[] newDescriptorBytes =
+                (byte[])
+                        batchSql("SELECT picture FROM 
external_blob_table_descriptor")
+                                .get(0)
+                                .getField(0);
+
+        BlobDescriptor newBlobDescriptor = 
BlobDescriptor.deserialize(newDescriptorBytes);
+        assertBlobEquals(blobData, newBlobDescriptor);
+
+        // alternatively, we could also set options through dynamic options
+        tEnv.getConfig()
+                .getConfiguration()
+                .setString(
+                        "paimon.*.*.blob_table_descriptor."
+                                + CoreOptions.BLOB_DESCRIPTOR_PREFIX
+                                + IsolatedDirectoryFileIO.ROOT_DIR,
+                        "isolated://" + warehouse);
+        batchSql(
+                "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', 
sys.path_to_descriptor('"
+                        + isolatedPath
+                        + "'))");
+        newDescriptorBytes =
+                (byte[]) batchSql("SELECT picture FROM 
blob_table_descriptor").get(0).getField(0);
+        newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes);
+        assertBlobEquals(blobData, newBlobDescriptor);
+    }
+
+    private void assertBlobEquals(byte[] expected, BlobDescriptor 
readDescriptor) {
+        Options options = new Options();
+        options.set("root-dir", path);
+        CatalogContext catalogContext = CatalogContext.create(options);
+        UriReaderFactory uriReaderFactory = new 
UriReaderFactory(catalogContext);
+        Blob blob =
+                
Blob.fromDescriptor(uriReaderFactory.create(readDescriptor.uri()), 
readDescriptor);
+        assertThat(blob.toData()).isEqualTo(expected);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
new file mode 100644
index 0000000000..b473b26fc1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/IsolatedDirectoryFileIO.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import java.io.IOException;
+
+/** An isolated LocalFileIO, only permitting accessing files under the root 
directory. */
+public class IsolatedDirectoryFileIO extends LocalFileIO {
+
+    public static final String ROOT_DIR = "root-dir";
+
+    private Path root;
+
+    public IsolatedDirectoryFileIO() {}
+
+    @Override
+    public void configure(CatalogContext context) {
+        root = new Path(context.options().get(ROOT_DIR));
+    }
+
+    @Override
+    public SeekableInputStream newInputStream(Path path) throws IOException {
+        checkPath(path);
+        return super.newInputStream(path);
+    }
+
+    @Override
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        checkPath(path);
+        return super.newOutputStream(path, overwrite);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        checkPath(path);
+        return super.getFileStatus(path);
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+        checkPath(path);
+        return super.listStatus(path);
+    }
+
+    @Override
+    public boolean exists(Path path) throws IOException {
+        checkPath(path);
+        return super.exists(path);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        checkPath(path);
+        return super.delete(path, recursive);
+    }
+
+    @Override
+    public boolean mkdirs(Path path) throws IOException {
+        checkPath(path);
+        return super.mkdirs(path);
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        checkPath(src);
+        checkPath(dst);
+        return super.rename(src, dst);
+    }
+
+    @Override
+    public void copyFile(Path sourcePath, Path targetPath, boolean overwrite) 
throws IOException {
+        checkPath(sourcePath);
+        checkPath(targetPath);
+        super.copyFile(sourcePath, targetPath, overwrite);
+    }
+
+    private void checkPath(Path path) {
+        if (path == null) {
+            throw new NullPointerException("path is null");
+        }
+        if (!path.toString().startsWith(root.toString())) {
+            throw new UnsupportedOperationException(
+                    "Isolated file io only supports reading child of root 
directory "
+                            + root
+                            + ", but current accessing path is "
+                            + path);
+        }
+    }
+
+    /** FileIOLoader of {@link IsolatedDirectoryFileIO}. */
+    public static final class Loader implements FileIOLoader {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public String getScheme() {
+            return "isolated";
+        }
+
+        @Override
+        public FileIO load(Path path) {
+            return new IsolatedDirectoryFileIO();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
index 0b01694979..1f1b56be00 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.paimon.flink.action.CopyFilesActionSlowFileIO$Loader
+org.apache.paimon.flink.IsolatedDirectoryFileIO$Loader
\ No newline at end of file

Reply via email to