This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 452e6d326d [flink][spark] Introduce Built-in functions for flink and
spark (#6891)
452e6d326d is described below
commit 452e6d326d83d34b229043c633009f5e2c5f3d0c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Dec 29 22:09:53 2025 +0800
[flink][spark] Introduce Built-in functions for flink and spark (#6891)
---
docs/content/spark/sql-functions.md | 42 ++++++++++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 21 +++++++-
.../paimon/flink/function/BuiltInFunctions.java | 34 +++++++++++++
.../paimon/flink/function/DescriptorToString.java | 37 ++++++++++++++
.../paimon/flink/function/PathToDescriptor.java | 36 ++++++++++++++
.../org/apache/paimon/flink/BlobTableITCase.java | 34 +++++++++++++
.../spark/function/DescriptorToStringFunction.java | 56 ++++++++++++++++++++++
.../spark/function/DescriptorToStringUnbound.java | 55 +++++++++++++++++++++
.../spark/function/PathToDescriptorFunction.java | 56 ++++++++++++++++++++++
.../spark/function/PathToDescriptorUnbound.java | 55 +++++++++++++++++++++
.../spark/catalog/functions/PaimonFunctions.scala | 9 +++-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 37 ++++++++++++++
12 files changed, 470 insertions(+), 2 deletions(-)
diff --git a/docs/content/spark/sql-functions.md
b/docs/content/spark/sql-functions.md
index d17afc92ab..bf04badf75 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -53,6 +53,48 @@ SELECT * FROM t where pt = sys.max_pt('t');
-- a, 20250101
```
+### path_to_descriptor
+
+`sys.path_to_descriptor($file_path)`
+
+Converts a file path (STRING) to a blob descriptor (BINARY). This function is
useful when working with blob data stored in external files. It creates a blob
descriptor that references the file at the specified path.
+
+**Arguments:**
+- `file_path` (STRING): The path to the external file containing the blob data.
+
+**Returns:**
+- A BINARY value representing the serialized blob descriptor.
+
+**Example**
+
+```sql
+-- Insert blob data using path_to_descriptor function
+INSERT INTO t VALUES ('1', 'paimon',
sys.path_to_descriptor('file:///path/to/blob_file'));
+
+-- Insert with partition
+INSERT OVERWRITE TABLE t PARTITION(ds='1017', batch='test')
+VALUES ('1', 'paimon', '1024', '12345678', '20241017',
sys.path_to_descriptor('file:///path/to/blob_file'));
+```
+
+### descriptor_to_string
+
+`sys.descriptor_to_string($descriptor)`
+
+Converts a blob descriptor (BINARY) to its string representation (STRING).
This function is useful for debugging or displaying the contents of a blob
descriptor in a human-readable format.
+
+**Arguments:**
+- `descriptor` (BINARY): The blob descriptor bytes to convert.
+
+**Returns:**
+- A STRING representation of the blob descriptor.
+
+**Example**
+
+```sql
+-- Convert a blob descriptor to string for inspection
+SELECT sys.descriptor_to_string(content) FROM t WHERE id = '1';
+```
+
## User-defined Function
Paimon Spark supports two types of user-defined functions: lambda functions
and file-based functions.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index a2a9dd7fe0..7a777d557a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.flink.function.BuiltInFunctions;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
@@ -147,6 +148,7 @@ import static
org.apache.paimon.catalog.Catalog.COMMENT_PROP;
import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
@@ -1326,16 +1328,29 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public final List<String> listFunctions(String dbName) throws
CatalogException {
+ List<String> functions = new ArrayList<>();
+ if (isSystemNamespace(dbName)) {
+ functions.addAll(BuiltInFunctions.FUNCTIONS.keySet());
+ }
try {
- return catalog.listFunctions(dbName);
+ functions.addAll(catalog.listFunctions(dbName));
} catch (Catalog.DatabaseNotExistException e) {
throw new CatalogException(e.getMessage(), e);
}
+ return functions;
}
@Override
public final CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
+ if (isSystemNamespace(functionPath.getDatabaseName())) {
+ if
(BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) {
+ String builtInFunction =
+
BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName());
+ return new CatalogFunctionImpl(builtInFunction,
FunctionLanguage.JAVA);
+ }
+ }
+
try {
org.apache.paimon.function.Function function =
catalog.getFunction(toIdentifier(functionPath));
@@ -1558,6 +1573,10 @@ public class FlinkCatalog extends AbstractCatalog {
.orElseThrow(() -> new ProcedureNotExistException(name,
procedurePath));
}
+ private static boolean isSystemNamespace(String namespace) {
+ return namespace.equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+ }
+
private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
new file mode 100644
index 0000000000..a6a94faf61
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Paimon flink built in functions. */
+public class BuiltInFunctions {
+
+ public static final Map<String, String> FUNCTIONS =
+ new HashMap<String, String>() {
+ {
+ put("path_to_descriptor",
PathToDescriptor.class.getName());
+ put("descriptor_to_string",
DescriptorToString.class.getName());
+ }
+ };
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
new file mode 100644
index 0000000000..fd622bf9ef
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/** Blob descriptor to string. */
+public class DescriptorToString extends ScalarFunction {
+
+ public String eval(byte[] descriptorBytes) {
+ if (descriptorBytes == null) {
+ return null;
+ }
+
+ BlobDescriptor descriptor =
BlobDescriptor.deserialize(descriptorBytes);
+
+ return descriptor.toString();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
new file mode 100644
index 0000000000..87d136f717
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+/** File path to blob descriptor. */
+public class PathToDescriptor extends ScalarFunction {
+
+ public byte[] eval(String path) {
+ if (path == null) {
+ return null;
+ }
+
+ BlobDescriptor descriptor = new BlobDescriptor(path, 0,
Long.MAX_VALUE);
+ return descriptor.serialize();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 73c11c8f3b..f879b9c67a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -98,6 +98,40 @@ public class BlobTableITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}
+ @Test
+ public void testWriteBlobWithBuiltInFunction() throws Exception {
+ byte[] blobData = new byte[1024 * 1024];
+ RANDOM.nextBytes(blobData);
+ FileIO fileIO = new LocalFileIO();
+ String uri = "file://" + warehouse + "/external_blob";
+ try (OutputStream outputStream =
+ fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri),
true)) {
+ outputStream.write(blobData);
+ }
+
+ BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0,
blobData.length);
+ batchSql(
+ "INSERT INTO blob_table_descriptor VALUES (1, 'paimon',
sys.path_to_descriptor('"
+ + uri
+ + "'))");
+ byte[] newDescriptorBytes =
+ (byte[]) batchSql("SELECT picture FROM
blob_table_descriptor").get(0).getField(0);
+ BlobDescriptor newBlobDescriptor =
BlobDescriptor.deserialize(newDescriptorBytes);
+ Options options = new Options();
+ options.set("warehouse", warehouse.toString());
+ CatalogContext catalogContext = CatalogContext.create(options);
+ UriReaderFactory uriReaderFactory = new
UriReaderFactory(catalogContext);
+ Blob blob =
+ Blob.fromDescriptor(
+ uriReaderFactory.create(newBlobDescriptor.uri()),
blobDescriptor);
+ assertThat(blob.toData()).isEqualTo(blobData);
+ URI blobUri = URI.create(blob.toDescriptor().uri());
+ assertThat(blobUri.getScheme()).isNotNull();
+ batchSql("ALTER TABLE blob_table_descriptor SET
('blob-as-descriptor'='false')");
+ assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
+ .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
+ }
+
private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
public static String bytesToHex(byte[] bytes) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
new file mode 100644
index 0000000000..482786f12e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Function to convert blob descriptor to its string representation. */
+public class DescriptorToStringFunction implements ScalarFunction<UTF8String>,
Serializable {
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.BinaryType};
+ }
+
+ @Override
+ public DataType resultType() {
+ return DataTypes.StringType;
+ }
+
+ public UTF8String invoke(byte[] descriptorBytes) {
+ if (descriptorBytes == null) {
+ return null;
+ }
+
+ BlobDescriptor descriptor =
BlobDescriptor.deserialize(descriptorBytes);
+ return UTF8String.fromString(descriptor.toString());
+ }
+
+ @Override
+ public String name() {
+ return "descriptor_to_string";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
new file mode 100644
index 0000000000..05da656129
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link PathToDescriptorFunction}. */
+public class DescriptorToStringUnbound implements UnboundFunction {
+
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ if (inputType.fields().length != 1) {
+ throw new UnsupportedOperationException(
+ "Function 'descriptor_to_string' requires 1 argument, but
found "
+ + inputType.fields().length);
+ }
+
+ if (!(inputType.fields()[0].dataType() instanceof BinaryType)) {
+ throw new UnsupportedOperationException(
+ "The first argument of 'descriptor_to_string' must be
BINARY type, but found "
+ + inputType.fields()[0].dataType().simpleString());
+ }
+
+ return new PathToDescriptorFunction();
+ }
+
+ @Override
+ public String description() {
+ return "Convert file path to blob descriptor";
+ }
+
+ @Override
+ public String name() {
+ return "descriptor_to_string";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
new file mode 100644
index 0000000000..ea6f306cc3
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.data.BlobDescriptor;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Function to convert file path to blob descriptor. */
+public class PathToDescriptorFunction implements ScalarFunction<byte[]>,
Serializable {
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.StringType};
+ }
+
+ @Override
+ public DataType resultType() {
+ return DataTypes.BinaryType;
+ }
+
+ public byte[] invoke(UTF8String path) {
+ if (path == null) {
+ return null;
+ }
+
+ BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0,
Long.MAX_VALUE);
+ return descriptor.serialize();
+ }
+
+ @Override
+ public String name() {
+ return "path_to_descriptor";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
new file mode 100644
index 0000000000..23b7718847
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link PathToDescriptorFunction}. */
+public class PathToDescriptorUnbound implements UnboundFunction {
+
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ if (inputType.fields().length != 1) {
+ throw new UnsupportedOperationException(
+ "Function 'path_to_descriptor' requires 1 argument, but
found "
+ + inputType.fields().length);
+ }
+
+ if (!(inputType.fields()[0].dataType() instanceof StringType)) {
+ throw new UnsupportedOperationException(
+ "The first argument of 'path_to_descriptor' must be STRING
type, but found "
+ + inputType.fields()[0].dataType().simpleString());
+ }
+
+ return new PathToDescriptorFunction();
+ }
+
+ @Override
+ public String description() {
+ return "Convert file path to blob descriptor";
+ }
+
+ @Override
+ public String name() {
+ return "path_to_descriptor";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index 2876f7e7f1..a0d97c68d4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,6 +25,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
+import org.apache.paimon.spark.function.{DescriptorToStringFunction,
DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType,
LocalZonedTimestampType, MapType, RowType, TimestampType}
import org.apache.paimon.utils.ProjectedRow
@@ -43,6 +44,8 @@ object PaimonFunctions {
val PAIMON_BUCKET: String = "bucket"
val MOD_BUCKET: String = "mod_bucket"
val MAX_PT: String = "max_pt"
+ val PATH_TO_DESCRIPTOR: String = "path_to_descriptor"
+ val DESCRIPTOR_TO_STRING: String = "descriptor_to_string"
private val FUNCTIONS = ImmutableMap.of(
PAIMON_BUCKET,
@@ -50,7 +53,11 @@ object PaimonFunctions {
MOD_BUCKET,
new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD),
MAX_PT,
- new MaxPtFunction
+ new MaxPtFunction,
+ PATH_TO_DESCRIPTOR,
+ new PathToDescriptorUnbound,
+ DESCRIPTOR_TO_STRING,
+ new DescriptorToStringUnbound
)
/** The bucket function type to the function name mapping */
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 854f2eac34..e05c9ce644 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -127,6 +127,43 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: test write blob descriptor with built-in function") {
+ withTable("t") {
+ val blobData = new Array[Byte](1024 * 1024)
+ RANDOM.nextBytes(blobData)
+ val fileIO = new LocalFileIO
+ val uri = "file://" + tempDBDir.toString + "/external_blob"
+ try {
+ val outputStream = fileIO.newOutputStream(new Path(uri), true)
+ try outputStream.write(blobData)
+ finally if (outputStream != null) outputStream.close()
+ }
+
+ val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length)
+ sql(
+ "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n"
+ "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" +
"content BINARY\n" + ") \n" +
+ "PARTITIONED BY (ds STRING, batch STRING) \n" +
+ "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time'
= '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' =
'true','blob-field' = 'content','blob-as-descriptor' = 'true')")
+ sql(
+ "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES
\n('1','paimon','1024','12345678','20241017', sys.path_to_descriptor('" + uri +
"'))")
+ val newDescriptorBytes =
+ sql("SELECT content FROM t WHERE id =
'1'").collect()(0).get(0).asInstanceOf[Array[Byte]]
+ val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
+ val options = new Options()
+ options.set("warehouse", tempDBDir.toString)
+ val catalogContext = CatalogContext.create(options)
+ val uriReaderFactory = new UriReaderFactory(catalogContext)
+ val blob =
Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri),
blobDescriptor)
+ assert(util.Arrays.equals(blobData, blob.toData))
+
+ sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
+ checkAnswer(
+ sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE
id = 1"),
+ Seq(Row("1", "paimon", blobData, 0, 1))
+ )
+ }
+ }
+
test("Blob: test compaction") {
withTable("t") {
sql(