This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75b381a8930 [Enhancement](pyudf) Support parameterless calls for 
pythonUDF (#62624)
75b381a8930 is described below

commit 75b381a8930b513e382f9b95e808e6e5728916e8
Author: linrrarity <[email protected]>
AuthorDate: Fri May 8 17:23:36 2026 +0800

    [Enhancement](pyudf) Support parameterless calls for pythonUDF (#62624)
    
    Problem Summary:
    
    ```sql
    CREATE FUNCTION py_pkg_versions()
    RETURNS STRING
    PROPERTIES (
        "type" = "PYTHON_UDF",
        "symbol" = "evaluate",
        "runtime_version" = "3.12.11",
        "always_nullable" = "true"
    )
    AS $$
    import json
    import sys
    def evaluate():
        versions = {"python": sys.version}
        try:
            import numpy
            versions["numpy"] = numpy.__version__
        except:
            versions["numpy"] = "not_found"
        try:
            import pandas
            versions["pandas"] = pandas.__version__
        except:
            versions["pandas"] = "not_found"
        try:
            import jieba
            versions["jieba"] = jieba.__version__
        except:
            versions["jieba"] = "not_found"
        return json.dumps(versions)
    $$;
    ```
    
    before:
    ```sql
    SELECT py_pkg_versions();
    -- errCode = 2, detailMessage = (172.20.49.73)[INVALID_ARGUMENT]Python UDF 
input types is empty
    ```
    
    now:
    ```sql
    SELECT py_pkg_versions();
    
+------------------------------------------------------------------------------------------------------------------------------------------------------+
    | py_pkg_versions()                                                         
                                                                           |
    
+------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"python": "3.12.11 | packaged by conda-forge | (main, Jun  4 2025, 
14:45:31) [GCC 13.3.0]", "numpy": "2.4.3", "pandas": "3.0.1", "jieba": 
"0.42.1"} |
    
+------------------------------------------------------------------------------------------------------------------------------------------------------+
    ```
---
 be/src/exprs/function/function_python_udf.cpp      | 10 +++-
 .../exprs/table_function/python_udtf_function.cpp  | 14 ++++-
 be/src/format/arrow/arrow_block_convertor.cpp      |  9 +++
 be/src/format/arrow/arrow_block_convertor.h        |  4 ++
 be/src/udf/python/python_udf_meta.cpp              |  6 +-
 be/src/udf/python/python_udf_meta.h                | 10 ++--
 be/test/udf/python/python_udf_meta_test.cpp        | 63 +++++++++++++++++++-
 .../pythonudf_p0/test_pythonudf_no_input.groovy    | 64 ++++++++++++++++++++
 .../pythonudtf_p0/test_pythonudtf_no_input.groovy  | 68 ++++++++++++++++++++++
 9 files changed, 234 insertions(+), 14 deletions(-)

diff --git a/be/src/exprs/function/function_python_udf.cpp 
b/be/src/exprs/function/function_python_udf.cpp
index b874d3ce14a..54ea74ad456 100644
--- a/be/src/exprs/function/function_python_udf.cpp
+++ b/be/src/exprs/function/function_python_udf.cpp
@@ -112,7 +112,7 @@ Status PythonFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
         return Status::InternalError("Python UDF client is null");
     }
 
-    int64_t input_rows = block.rows();
+    int64_t input_rows = num_rows;
     uint32_t input_columns = block.columns();
     DCHECK(input_columns > 0 && result < input_columns &&
            _argument_types.size() == arguments.size());
@@ -141,8 +141,12 @@ Status PythonFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
     std::shared_ptr<arrow::RecordBatch> input_batch;
     std::shared_ptr<arrow::RecordBatch> output_batch;
     cctz::time_zone _timezone_obj; // default UTC
-    RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, 
arrow::default_memory_pool(),
-                                           &input_batch, _timezone_obj));
+    if (arguments.empty()) {
+        RETURN_IF_ERROR(make_zero_column_arrow_batch(schema, input_rows, 
&input_batch));
+    } else {
+        RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, 
arrow::default_memory_pool(),
+                                               &input_batch, _timezone_obj));
+    }
     RETURN_IF_ERROR(client->evaluate(*input_batch, &output_batch));
     int64_t output_rows = output_batch->num_rows();
 
diff --git a/be/src/exprs/table_function/python_udtf_function.cpp 
b/be/src/exprs/table_function/python_udtf_function.cpp
index a116a3d6785..f39ceafd982 100644
--- a/be/src/exprs/table_function/python_udtf_function.cpp
+++ b/be/src/exprs/table_function/python_udtf_function.cpp
@@ -132,17 +132,27 @@ Status PythonUDTFFunction::process_init(Block* block, 
RuntimeState* state) {
     for (uint32_t i = 0; i < child_column_idxs.size(); ++i) {
         input_block.insert(block->get_by_position(child_column_idxs[i]));
     }
+    int64_t input_rows = block->rows();
     std::shared_ptr<arrow::Schema> input_schema;
     std::shared_ptr<arrow::RecordBatch> input_batch;
     RETURN_IF_ERROR(get_arrow_schema_from_block(input_block, &input_schema,
                                                 
TimezoneUtils::default_time_zone));
-    RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema, 
arrow::default_memory_pool(),
-                                           &input_batch, _timezone_obj));
+    if (child_column_idxs.empty()) {
+        RETURN_IF_ERROR(make_zero_column_arrow_batch(input_schema, input_rows, 
&input_batch));
+    } else {
+        RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema,
+                                               arrow::default_memory_pool(), 
&input_batch,
+                                               _timezone_obj));
+    }
 
     // Step 3: Call Python UDTF to evaluate all rows at once (similar to Java 
UDTF's JNI call)
     // Python returns a ListArray where each element contains outputs for one 
input row
     std::shared_ptr<arrow::ListArray> list_array;
     RETURN_IF_ERROR(_udtf_client->evaluate(*input_batch, &list_array));
+    if (list_array->length() != input_rows) [[unlikely]] {
+        return Status::InternalError("Python UDTF output rows {} not equal to 
input rows {}",
+                                     list_array->length(), input_rows);
+    }
 
     // Step 4: Convert Python server output (ListArray) to Doris array column
     RETURN_IF_ERROR(_convert_list_array_to_array_column(list_array));
diff --git a/be/src/format/arrow/arrow_block_convertor.cpp 
b/be/src/format/arrow/arrow_block_convertor.cpp
index a6905325c13..91593898ac5 100644
--- a/be/src/format/arrow/arrow_block_convertor.cpp
+++ b/be/src/format/arrow/arrow_block_convertor.cpp
@@ -147,6 +147,15 @@ Status convert_to_arrow_batch(const Block& block, const 
std::shared_ptr<arrow::S
     return converter.convert(result);
 }
 
+Status make_zero_column_arrow_batch(const std::shared_ptr<arrow::Schema>& 
schema, int64_t rows,
+                                    std::shared_ptr<arrow::RecordBatch>* 
result) {
+    if (schema->num_fields() != 0) {
+        return Status::InvalidArgument("schema should have no fields for zero 
column batch");
+    }
+    *result = arrow::RecordBatch::Make(schema, rows, 
std::vector<std::shared_ptr<arrow::Array>> {});
+    return Status::OK();
+}
+
 Status convert_from_arrow_batch(const std::shared_ptr<arrow::RecordBatch>& 
batch,
                                 const DataTypes& types, Block* block,
                                 const cctz::time_zone& timezone_obj) {
diff --git a/be/src/format/arrow/arrow_block_convertor.h 
b/be/src/format/arrow/arrow_block_convertor.h
index 31f145c5e1b..96ee10d5215 100644
--- a/be/src/format/arrow/arrow_block_convertor.h
+++ b/be/src/format/arrow/arrow_block_convertor.h
@@ -19,6 +19,7 @@
 
 #include <cctz/time_zone.h>
 
+#include <cstdint>
 #include <memory>
 
 #include "common/status.h"
@@ -116,6 +117,9 @@ Status convert_to_arrow_batch(const Block& block, const 
std::shared_ptr<arrow::S
                               const cctz::time_zone& timezone_obj, size_t 
start_row,
                               size_t end_row);
 
+Status make_zero_column_arrow_batch(const std::shared_ptr<arrow::Schema>& 
schema, int64_t rows,
+                                    std::shared_ptr<arrow::RecordBatch>* 
result);
+
 Status convert_from_arrow_batch(const std::shared_ptr<arrow::RecordBatch>& 
batch,
                                 const DataTypes& types, Block* block,
                                 const cctz::time_zone& timezone_obj);
diff --git a/be/src/udf/python/python_udf_meta.cpp 
b/be/src/udf/python/python_udf_meta.cpp
index 88af0c9ff64..f0978dc926b 100644
--- a/be/src/udf/python/python_udf_meta.cpp
+++ b/be/src/udf/python/python_udf_meta.cpp
@@ -32,7 +32,6 @@ namespace doris {
 
 Status PythonUDFMeta::convert_types_to_schema(const DataTypes& types, const 
std::string& timezone,
                                               std::shared_ptr<arrow::Schema>* 
schema) {
-    assert(!types.empty());
     arrow::SchemaBuilder builder;
     for (size_t i = 0; i < types.size(); ++i) {
         std::shared_ptr<arrow::DataType> arrow_type;
@@ -152,8 +151,9 @@ Status PythonUDFMeta::check() const {
         return Status::InvalidArgument("Python UDF runtime version is empty");
     }
 
-    if (input_types.empty()) {
-        return Status::InvalidArgument("Python UDF input types is empty");
+    if (input_types.empty() &&
+        (client_type == PythonClientType::UDAF || type == 
PythonUDFLoadType::UNKNOWN)) {
+        return Status::InvalidArgument("Python UDAF input types is empty");
     }
 
     if (!return_type) {
diff --git a/be/src/udf/python/python_udf_meta.h 
b/be/src/udf/python/python_udf_meta.h
index 7993faf3bb7..55c49abb30a 100644
--- a/be/src/udf/python/python_udf_meta.h
+++ b/be/src/udf/python/python_udf_meta.h
@@ -33,18 +33,18 @@ enum class PythonUDFLoadType : uint8_t { INLINE = 0, MODULE 
= 1, UNKNOWN = 2 };
 enum class PythonClientType : uint8_t { UDF = 0, UDAF = 1, UDTF = 2, UNKNOWN = 
3 };
 
 struct PythonUDFMeta {
-    int64_t id;
+    int64_t id = 0;
     std::string name;
     std::string symbol;
     std::string location;
     std::string checksum;
     std::string runtime_version;
     std::string inline_code;
-    bool always_nullable;
+    bool always_nullable = false;
     DataTypes input_types;
     DataTypePtr return_type;
-    PythonUDFLoadType type;
-    PythonClientType client_type;
+    PythonUDFLoadType type = PythonUDFLoadType::UNKNOWN;
+    PythonClientType client_type = PythonClientType::UNKNOWN;
 
     static Status convert_types_to_schema(const DataTypes& types, const 
std::string& timezone,
                                           std::shared_ptr<arrow::Schema>* 
schema);
@@ -70,4 +70,4 @@ struct hash<doris::PythonUDFMeta> {
         return std::hash<int64_t>()(meta.id);
     }
 };
-} // namespace std
\ No newline at end of file
+} // namespace std
diff --git a/be/test/udf/python/python_udf_meta_test.cpp 
b/be/test/udf/python/python_udf_meta_test.cpp
index b913f49d19b..fd651ae07d0 100644
--- a/be/test/udf/python/python_udf_meta_test.cpp
+++ b/be/test/udf/python/python_udf_meta_test.cpp
@@ -109,7 +109,7 @@ TEST_F(PythonUDFMetaTest, CheckEmptyRuntimeVersion) {
     EXPECT_TRUE(status.to_string().find("runtime version is empty") != 
std::string::npos);
 }
 
-TEST_F(PythonUDFMetaTest, CheckEmptyInputTypes) {
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesAllowedForUdf) {
     PythonUDFMeta meta;
     meta.name = "test_udf";
     meta.symbol = "test_func";
@@ -117,6 +117,35 @@ TEST_F(PythonUDFMetaTest, CheckEmptyInputTypes) {
     meta.input_types = {};
     meta.return_type = nullable_int32_;
     meta.type = PythonUDFLoadType::INLINE;
+    meta.client_type = PythonClientType::UDF;
+
+    Status status = meta.check();
+    EXPECT_TRUE(status.ok()) << status.to_string();
+}
+
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesAllowedForUdtf) {
+    PythonUDFMeta meta;
+    meta.name = "test_udtf";
+    meta.symbol = "test_func";
+    meta.runtime_version = "3.9.16";
+    meta.input_types = {};
+    meta.return_type = nullable_string_;
+    meta.type = PythonUDFLoadType::INLINE;
+    meta.client_type = PythonClientType::UDTF;
+
+    Status status = meta.check();
+    EXPECT_TRUE(status.ok()) << status.to_string();
+}
+
+TEST_F(PythonUDFMetaTest, CheckEmptyInputTypesRejectedForUdaf) {
+    PythonUDFMeta meta;
+    meta.name = "test_udaf";
+    meta.symbol = "test_func";
+    meta.runtime_version = "3.9.16";
+    meta.input_types = {};
+    meta.return_type = nullable_int32_;
+    meta.type = PythonUDFLoadType::INLINE;
+    meta.client_type = PythonClientType::UDAF;
 
     Status status = meta.check();
     EXPECT_FALSE(status.ok());
@@ -401,6 +430,27 @@ TEST_F(PythonUDFMetaTest, 
SerializeToJsonMultipleInputTypes) {
     EXPECT_TRUE(doc.HasMember("input_types"));
 }
 
+TEST_F(PythonUDFMetaTest, SerializeToJsonEmptyInputTypesForUdf) {
+    PythonUDFMeta meta;
+    meta.name = "zero_arg_udf";
+    meta.symbol = "func";
+    meta.runtime_version = "3.9.16";
+    meta.input_types = {};
+    meta.return_type = nullable_int32_;
+    meta.type = PythonUDFLoadType::INLINE;
+    meta.client_type = PythonClientType::UDF;
+
+    std::string json_str;
+    Status status = meta.serialize_to_json(&json_str);
+    EXPECT_TRUE(status.ok()) << status.to_string();
+
+    rapidjson::Document doc;
+    doc.Parse(json_str.c_str());
+    EXPECT_FALSE(doc.HasParseError());
+    EXPECT_TRUE(doc.HasMember("input_types"));
+    EXPECT_FALSE(std::string(doc["input_types"].GetString()).empty());
+}
+
 // ============================================================================
 // PythonUDFMeta convert_types_to_schema() tests
 // ============================================================================
@@ -429,6 +479,17 @@ TEST_F(PythonUDFMetaTest, ConvertTypesToSchemaSingleType) {
     EXPECT_EQ(schema->num_fields(), 1);
 }
 
+TEST_F(PythonUDFMetaTest, ConvertTypesToSchemaEmpty) {
+    DataTypes types = {};
+    std::shared_ptr<arrow::Schema> schema;
+
+    Status status = PythonUDFMeta::convert_types_to_schema(types, 
TimezoneUtils::default_time_zone,
+                                                           &schema);
+    EXPECT_TRUE(status.ok()) << status.to_string();
+    EXPECT_NE(schema, nullptr);
+    EXPECT_EQ(schema->num_fields(), 0);
+}
+
 // ============================================================================
 // PythonUDFMeta serialize_arrow_schema() tests
 // ============================================================================
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy 
b/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy
new file mode 100644
index 00000000000..a0d78f4629d
--- /dev/null
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_no_input.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudf_no_input") {
+    def runtime_version = getPythonUdfRuntimeVersion()
+
+    try {
+        sql """ DROP FUNCTION IF EXISTS py_const_no_input(); """
+        sql """ DROP TABLE IF EXISTS test_pythonudf_no_input_tbl; """
+
+        sql """
+        CREATE FUNCTION py_const_no_input()
+        RETURNS INT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "evaluate",
+            "runtime_version" = "${runtime_version}"
+        )
+        AS \$\$
+def evaluate():
+    return 7
+\$\$;
+        """
+
+        assert sql(""" SELECT py_const_no_input(); """)[0][0] == 7
+
+        sql """
+        CREATE TABLE test_pythonudf_no_input_tbl (
+            id INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES("replication_num" = "1");
+        """
+
+        sql """ INSERT INTO test_pythonudf_no_input_tbl VALUES (1), (2), (3); 
"""
+
+        def rows = sql("""
+            SELECT id, py_const_no_input() AS v
+            FROM test_pythonudf_no_input_tbl
+            ORDER BY id
+        """)
+
+        assert rows.size() == 3 : "Expected 3 rows, got ${rows.size()}"
+        assert rows.collect { it[0] as int } == [1, 2, 3]
+        assert rows.every { (it[1] as int) == 7 }
+    } finally {
+        try_sql(""" DROP FUNCTION IF EXISTS py_const_no_input(); """)
+    }
+}
diff --git 
a/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy 
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy
new file mode 100644
index 00000000000..e5b90a6ca9f
--- /dev/null
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_no_input.groovy
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pythonudtf_no_input") {
+    def runtime_version = getPythonUdfRuntimeVersion()
+
+    try {
+        sql """ DROP FUNCTION IF EXISTS py_emit_no_input(); """
+        sql """ DROP TABLE IF EXISTS test_pythonudtf_no_input_tbl; """
+
+        sql """
+        CREATE TABLES FUNCTION py_emit_no_input()
+        RETURNS ARRAY<STRING>
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "emit_values",
+            "runtime_version" = "${runtime_version}"
+        )
+        AS \$\$
+def emit_values():
+    yield ('left',)
+    yield ('right',)
+\$\$;
+        """
+
+        sql """
+        CREATE TABLE test_pythonudtf_no_input_tbl (
+            id INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES("replication_num" = "1");
+        """
+
+        sql """ INSERT INTO test_pythonudtf_no_input_tbl VALUES (1), (2); """
+
+        def rows = sql("""
+            SELECT id, value
+            FROM test_pythonudtf_no_input_tbl
+            LATERAL VIEW py_emit_no_input() tmp AS value
+            ORDER BY id, value
+        """)
+
+        assert rows.size() == 4 : "Expected 4 rows, got ${rows.size()}"
+        assert rows.collect { [(it[0] as int), it[1].toString()] } == [
+            [1, "left"],
+            [1, "right"],
+            [2, "left"],
+            [2, "right"]
+        ]
+    } finally {
+        try_sql(""" DROP FUNCTION IF EXISTS py_emit_no_input(); """)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to