This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 452e6d326d [flink][spark] Introduce Built-in functions for flink and 
spark (#6891)
452e6d326d is described below

commit 452e6d326d83d34b229043c633009f5e2c5f3d0c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Dec 29 22:09:53 2025 +0800

    [flink][spark] Introduce Built-in functions for flink and spark (#6891)
---
 docs/content/spark/sql-functions.md                | 42 ++++++++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 21 +++++++-
 .../paimon/flink/function/BuiltInFunctions.java    | 34 +++++++++++++
 .../paimon/flink/function/DescriptorToString.java  | 37 ++++++++++++++
 .../paimon/flink/function/PathToDescriptor.java    | 36 ++++++++++++++
 .../org/apache/paimon/flink/BlobTableITCase.java   | 34 +++++++++++++
 .../spark/function/DescriptorToStringFunction.java | 56 ++++++++++++++++++++++
 .../spark/function/DescriptorToStringUnbound.java  | 55 +++++++++++++++++++++
 .../spark/function/PathToDescriptorFunction.java   | 56 ++++++++++++++++++++++
 .../spark/function/PathToDescriptorUnbound.java    | 55 +++++++++++++++++++++
 .../spark/catalog/functions/PaimonFunctions.scala  |  9 +++-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala | 37 ++++++++++++++
 12 files changed, 470 insertions(+), 2 deletions(-)

diff --git a/docs/content/spark/sql-functions.md 
b/docs/content/spark/sql-functions.md
index d17afc92ab..bf04badf75 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -53,6 +53,48 @@ SELECT * FROM t where pt = sys.max_pt('t');
 -- a, 20250101
 ```
 
+### path_to_descriptor
+
+`sys.path_to_descriptor($file_path)`
+
+Converts a file path (STRING) to a blob descriptor (BINARY). This function is 
useful when working with blob data stored in external files. It creates a blob 
descriptor that references the file at the specified path.
+
+**Arguments:**
+- `file_path` (STRING): The path to the external file containing the blob data.
+
+**Returns:**
+- A BINARY value representing the serialized blob descriptor.
+
+**Example**
+
+```sql
+-- Insert blob data using path_to_descriptor function
+INSERT INTO t VALUES ('1', 'paimon', 
sys.path_to_descriptor('file:///path/to/blob_file'));
+
+-- Insert with partition
+INSERT OVERWRITE TABLE t PARTITION(ds='1017', batch='test')
+VALUES ('1', 'paimon', '1024', '12345678', '20241017', 
sys.path_to_descriptor('file:///path/to/blob_file'));
+```
+
+### descriptor_to_string
+
+`sys.descriptor_to_string($descriptor)`
+
+Converts a blob descriptor (BINARY) to its string representation (STRING). 
This function is useful for debugging or displaying the contents of a blob 
descriptor in a human-readable format.
+
+**Arguments:**
+- `descriptor` (BINARY): The blob descriptor bytes to convert.
+
+**Returns:**
+- A STRING representation of the blob descriptor.
+
+**Example**
+
+```sql
+-- Convert a blob descriptor to string for inspection
+SELECT sys.descriptor_to_string(content) FROM t WHERE id = '1';
+```
+
 ## User-defined Function
 
 Paimon Spark supports two types of user-defined functions: lambda functions 
and file-based functions.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index a2a9dd7fe0..7a777d557a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.flink.function.BuiltInFunctions;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
@@ -147,6 +148,7 @@ import static 
org.apache.paimon.catalog.Catalog.COMMENT_PROP;
 import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
 import static 
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
 import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
@@ -1326,16 +1328,29 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public final List<String> listFunctions(String dbName) throws 
CatalogException {
+        List<String> functions = new ArrayList<>();
+        if (isSystemNamespace(dbName)) {
+            functions.addAll(BuiltInFunctions.FUNCTIONS.keySet());
+        }
         try {
-            return catalog.listFunctions(dbName);
+            functions.addAll(catalog.listFunctions(dbName));
         } catch (Catalog.DatabaseNotExistException e) {
             throw new CatalogException(e.getMessage(), e);
         }
+        return functions;
     }
 
     @Override
     public final CatalogFunction getFunction(ObjectPath functionPath)
             throws FunctionNotExistException, CatalogException {
+        if (isSystemNamespace(functionPath.getDatabaseName())) {
+            if 
(BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) {
+                String builtInFunction =
+                        
BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName());
+                return new CatalogFunctionImpl(builtInFunction, 
FunctionLanguage.JAVA);
+            }
+        }
+
         try {
             org.apache.paimon.function.Function function =
                     catalog.getFunction(toIdentifier(functionPath));
@@ -1558,6 +1573,10 @@ public class FlinkCatalog extends AbstractCatalog {
                 .orElseThrow(() -> new ProcedureNotExistException(name, 
procedurePath));
     }
 
+    private static boolean isSystemNamespace(String namespace) {
+        return namespace.equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+    }
+
     private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
         StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
         for (StackTraceElement stackTraceElement : stackTrace) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
new file mode 100644
index 0000000000..a6a94faf61
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Paimon flink built in functions. */
+public class BuiltInFunctions {
+
+    public static final Map<String, String> FUNCTIONS =
+            new HashMap<String, String>() {
+                {
+                    put("path_to_descriptor", 
PathToDescriptor.class.getName());
+                    put("descriptor_to_string", 
DescriptorToString.class.getName());
+                }
+            };
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
new file mode 100644
index 0000000000..fd622bf9ef
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/** Blob descriptor to string. */
+public class DescriptorToString extends ScalarFunction {
+
+    public String eval(byte[] descriptorBytes) {
+        if (descriptorBytes == null) {
+            return null;
+        }
+
+        BlobDescriptor descriptor = 
BlobDescriptor.deserialize(descriptorBytes);
+
+        return descriptor.toString();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
new file mode 100644
index 0000000000..87d136f717
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/** File path to blob descriptor. */
+public class PathToDescriptor extends ScalarFunction {
+
+    public byte[] eval(String path) {
+        if (path == null) {
+            return null;
+        }
+
+        BlobDescriptor descriptor = new BlobDescriptor(path, 0, 
Long.MAX_VALUE);
+        return descriptor.serialize();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 73c11c8f3b..f879b9c67a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -98,6 +98,40 @@ public class BlobTableITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
     }
 
+    @Test
+    public void testWriteBlobWithBuiltInFunction() throws Exception {
+        byte[] blobData = new byte[1024 * 1024];
+        RANDOM.nextBytes(blobData);
+        FileIO fileIO = new LocalFileIO();
+        String uri = "file://" + warehouse + "/external_blob";
+        try (OutputStream outputStream =
+                fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), 
true)) {
+            outputStream.write(blobData);
+        }
+
+        BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, 
blobData.length);
+        batchSql(
+                "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', 
sys.path_to_descriptor('"
+                        + uri
+                        + "'))");
+        byte[] newDescriptorBytes =
+                (byte[]) batchSql("SELECT picture FROM 
blob_table_descriptor").get(0).getField(0);
+        BlobDescriptor newBlobDescriptor = 
BlobDescriptor.deserialize(newDescriptorBytes);
+        Options options = new Options();
+        options.set("warehouse", warehouse.toString());
+        CatalogContext catalogContext = CatalogContext.create(options);
+        UriReaderFactory uriReaderFactory = new 
UriReaderFactory(catalogContext);
+        Blob blob =
+                Blob.fromDescriptor(
+                        uriReaderFactory.create(newBlobDescriptor.uri()), 
blobDescriptor);
+        assertThat(blob.toData()).isEqualTo(blobData);
+        URI blobUri = URI.create(blob.toDescriptor().uri());
+        assertThat(blobUri.getScheme()).isNotNull();
+        batchSql("ALTER TABLE blob_table_descriptor SET 
('blob-as-descriptor'='false')");
+        assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
+                .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
+    }
+
     private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
 
     public static String bytesToHex(byte[] bytes) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
new file mode 100644
index 0000000000..482786f12e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Function to convert blob descriptor to its string representation. */
+public class DescriptorToStringFunction implements ScalarFunction<UTF8String>, 
Serializable {
+
+    @Override
+    public DataType[] inputTypes() {
+        return new DataType[] {DataTypes.BinaryType};
+    }
+
+    @Override
+    public DataType resultType() {
+        return DataTypes.StringType;
+    }
+
+    public UTF8String invoke(byte[] descriptorBytes) {
+        if (descriptorBytes == null) {
+            return null;
+        }
+
+        BlobDescriptor descriptor = 
BlobDescriptor.deserialize(descriptorBytes);
+        return UTF8String.fromString(descriptor.toString());
+    }
+
+    @Override
+    public String name() {
+        return "descriptor_to_string";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
new file mode 100644
index 0000000000..05da656129
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link PathToDescriptorFunction}. */
+public class DescriptorToStringUnbound implements UnboundFunction {
+
+    @Override
+    public BoundFunction bind(StructType inputType) {
+        if (inputType.fields().length != 1) {
+            throw new UnsupportedOperationException(
+                    "Function 'descriptor_to_string' requires 1 argument, but 
found "
+                            + inputType.fields().length);
+        }
+
+        if (!(inputType.fields()[0].dataType() instanceof BinaryType)) {
+            throw new UnsupportedOperationException(
+                    "The first argument of 'descriptor_to_string' must be 
BINARY type, but found "
+                            + inputType.fields()[0].dataType().simpleString());
+        }
+
+        return new PathToDescriptorFunction();
+    }
+
+    @Override
+    public String description() {
+        return "Convert file path to blob descriptor";
+    }
+
+    @Override
+    public String name() {
+        return "descriptor_to_string";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
new file mode 100644
index 0000000000..ea6f306cc3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Function to convert file path to blob descriptor. */
+public class PathToDescriptorFunction implements ScalarFunction<byte[]>, 
Serializable {
+
+    @Override
+    public DataType[] inputTypes() {
+        return new DataType[] {DataTypes.StringType};
+    }
+
+    @Override
+    public DataType resultType() {
+        return DataTypes.BinaryType;
+    }
+
+    public byte[] invoke(UTF8String path) {
+        if (path == null) {
+            return null;
+        }
+
+        BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0, 
Long.MAX_VALUE);
+        return descriptor.serialize();
+    }
+
+    @Override
+    public String name() {
+        return "path_to_descriptor";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
new file mode 100644
index 0000000000..23b7718847
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link PathToDescriptorFunction}. */
+public class PathToDescriptorUnbound implements UnboundFunction {
+
+    @Override
+    public BoundFunction bind(StructType inputType) {
+        if (inputType.fields().length != 1) {
+            throw new UnsupportedOperationException(
+                    "Function 'path_to_descriptor' requires 1 argument, but 
found "
+                            + inputType.fields().length);
+        }
+
+        if (!(inputType.fields()[0].dataType() instanceof StringType)) {
+            throw new UnsupportedOperationException(
+                    "The first argument of 'path_to_descriptor' must be STRING 
type, but found "
+                            + inputType.fields()[0].dataType().simpleString());
+        }
+
+        return new PathToDescriptorFunction();
+    }
+
+    @Override
+    public String description() {
+        return "Convert file path to blob descriptor";
+    }
+
+    @Override
+    public String name() {
+        return "path_to_descriptor";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index 2876f7e7f1..a0d97c68d4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,6 +25,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
 import org.apache.paimon.spark.SparkInternalRowWrapper
 import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
+import org.apache.paimon.spark.function.{DescriptorToStringFunction, 
DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, 
LocalZonedTimestampType, MapType, RowType, TimestampType}
 import org.apache.paimon.utils.ProjectedRow
@@ -43,6 +44,8 @@ object PaimonFunctions {
   val PAIMON_BUCKET: String = "bucket"
   val MOD_BUCKET: String = "mod_bucket"
   val MAX_PT: String = "max_pt"
+  val PATH_TO_DESCRIPTOR: String = "path_to_descriptor"
+  val DESCRIPTOR_TO_STRING: String = "descriptor_to_string"
 
   private val FUNCTIONS = ImmutableMap.of(
     PAIMON_BUCKET,
@@ -50,7 +53,11 @@ object PaimonFunctions {
     MOD_BUCKET,
     new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD),
     MAX_PT,
-    new MaxPtFunction
+    new MaxPtFunction,
+    PATH_TO_DESCRIPTOR,
+    new PathToDescriptorUnbound,
+    DESCRIPTOR_TO_STRING,
+    new DescriptorToStringUnbound
   )
 
   /** The bucket function type to the function name mapping */
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 854f2eac34..e05c9ce644 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -127,6 +127,43 @@ class BlobTestBase extends PaimonSparkTestBase {
     }
   }
 
+  test("Blob: test write blob descriptor with built-in function") {
+    withTable("t") {
+      val blobData = new Array[Byte](1024 * 1024)
+      RANDOM.nextBytes(blobData)
+      val fileIO = new LocalFileIO
+      val uri = "file://" + tempDBDir.toString + "/external_blob"
+      try {
+        val outputStream = fileIO.newOutputStream(new Path(uri), true)
+        try outputStream.write(blobData)
+        finally if (outputStream != null) outputStream.close()
+      }
+
+      val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length)
+      sql(
+        "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" 
+ "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + 
"content BINARY\n" + ") \n" +
+          "PARTITIONED BY (ds STRING, batch STRING) \n" +
+          "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' 
= '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 
'true','blob-field' = 'content','blob-as-descriptor' = 'true')")
+      sql(
+        "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES 
\n('1','paimon','1024','12345678','20241017', sys.path_to_descriptor('" + uri + 
"'))")
+      val newDescriptorBytes =
+        sql("SELECT content FROM t WHERE id = 
'1'").collect()(0).get(0).asInstanceOf[Array[Byte]]
+      val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
+      val options = new Options()
+      options.set("warehouse", tempDBDir.toString)
+      val catalogContext = CatalogContext.create(options)
+      val uriReaderFactory = new UriReaderFactory(catalogContext)
+      val blob = 
Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri), 
blobDescriptor)
+      assert(util.Arrays.equals(blobData, blob.toData))
+
+      sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
+      checkAnswer(
+        sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE 
id = 1"),
+        Seq(Row("1", "paimon", blobData, 0, 1))
+      )
+    }
+  }
+
   test("Blob: test compaction") {
     withTable("t") {
       sql(

Reply via email to