This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cfa2426e41 [Fix](pyudf) Fix python udf error propagation (#62613)
2cfa2426e41 is described below

commit 2cfa2426e41a9f1fcf02d607e5e49ef6d43ebf70
Author: linrrarity <[email protected]>
AuthorDate: Wed May 13 15:54:05 2026 +0800

    [Fix](pyudf) Fix python udf error propagation (#62613)
    
    Problem Summary:
    
    before
    ```sql
    CREATE FUNCTION py_err_stats_test(INT)
    RETURNS INT
    PROPERTIES (
        "type"="PYTHON_UDF",
        "symbol"="evaluate",
        "runtime_version"="3.12.11",
        "always_nullable"="true"
    ) AS $$
    def evaluate(x):
        raise TypeError("consistent_error_42")
    $$;
    
    SELECT py_err_stats_test(1);
    +----------------------+
    | py_err_stats_test(1) |
    +----------------------+
    |                 NULL |
    +----------------------+
    ```
    
    now
    ```sql
    SELECT py_err_stats_test(1);
    -- ERROR 1105 (HY000): errCode = 2, detailMessage = 
(127.0.0.1)[RUNTIME_ERROR]Flight stream finish failed with message: Error in 
scalar UDF execution at row 0: consistent_error_42. Detail: Python exception: 
Traceback (most recent call last):
      File 
"/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py", 
line 614, in _scalar_call
        res = self._eval_func(*converted_args)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "<string>", line 3, in evaluate
    TypeError: consistent_error_42
    ```
---
 be/src/udf/python/python_server.py                 |  25 +-
 be/src/udf/python/python_udaf_client.cpp           |  82 +++++--
 be/src/udf/python/python_udaf_client.h             |  10 +
 be/test/udf/python/python_udaf_client_test.cpp     | 155 ++++++++++++
 .../test_python_raise_error_propagation.groovy     | 269 +++++++++++++++++++++
 .../suites/pythonudaf_p0/udaf_scripts/pyudaf.zip   | Bin 7858 -> 9153 bytes
 .../udaf_scripts/udaf_errors.py}                   |  30 ++-
 .../test_pythonudf_file_protocol.groovy            |  11 +-
 .../pythonudf_p0/udf_scripts/array_int_test.py     |   2 +
 .../udf_scripts/array_return_array_int_test.py     |   2 +
 .../udf_scripts/array_return_array_string_test.py  |   2 +
 .../pythonudf_p0/udf_scripts/array_string_test.py  |   2 +
 .../suites/pythonudf_p0/udf_scripts/float_test.py  |   2 +
 .../suites/pythonudf_p0/udf_scripts/int_test.py    |   2 +
 .../suites/pythonudf_p0/udf_scripts/pyudf.zip      | Bin 6086 -> 15967 bytes
 .../{array_int_test.py => udf_errors.py}           |  10 +-
 .../suites/pythonudtf_p0/udtf_scripts/pyudtf.zip   | Bin 10216 -> 10924 bytes
 .../udtf_scripts/pyudtf_module/exceptions_udtf.py  |   7 +
 18 files changed, 562 insertions(+), 49 deletions(-)

diff --git a/be/src/udf/python/python_server.py 
b/be/src/udf/python/python_server.py
index 3290acc6bc6..ecdbef691b9 100644
--- a/be/src/udf/python/python_server.py
+++ b/be/src/udf/python/python_server.py
@@ -631,11 +631,9 @@ class AdaptivePythonUDF:
                     converted_args,
                     traceback.format_exc(),
                 )
-                # Return None for failed rows if always_nullable is True
-                if self.python_udf_meta.always_nullable:
-                    result.append(None)
-                else:
-                    raise
+                raise RuntimeError(
+                    f"Error in scalar UDF execution at row {i}: {e}"
+                ) from e
 
         return pa.array(result, type=self._get_output_type())
 
@@ -1768,7 +1766,7 @@ class FlightServer(flight.FlightServerBase):
                 place_id,
                 e,
             )
-            success = False
+            raise RuntimeError(str(e)) from e
 
         return pa.RecordBatch.from_arrays(
             [pa.array([success], type=pa.bool_())], ["success"]
@@ -1916,7 +1914,7 @@ class FlightServer(flight.FlightServerBase):
                 place_id,
                 e,
             )
-            serialized = b""
+            raise RuntimeError(str(e)) from e
 
         return pa.RecordBatch.from_arrays(
             [pa.array([serialized], type=pa.binary())], ["serialized_state"]
@@ -1945,7 +1943,7 @@ class FlightServer(flight.FlightServerBase):
                 place_id,
                 e,
             )
-            success = False
+            raise RuntimeError(str(e)) from e
 
         return pa.RecordBatch.from_arrays(
             [pa.array([success], type=pa.bool_())], ["success"]
@@ -1969,7 +1967,7 @@ class FlightServer(flight.FlightServerBase):
                 place_id,
                 e,
             )
-            result = None
+            raise RuntimeError(str(e)) from e
 
         return pa.RecordBatch.from_arrays(
             [pa.array([result], type=output_type)], ["result"]
@@ -1991,7 +1989,7 @@ class FlightServer(flight.FlightServerBase):
                 place_id,
                 e,
             )
-            success = False
+            raise RuntimeError(str(e)) from e
 
         return pa.RecordBatch.from_arrays(
             [pa.array([success], type=pa.bool_())], ["success"]
@@ -2127,6 +2125,7 @@ class FlightServer(flight.FlightServerBase):
           * ACCUMULATE: use success + rows_processed (number of rows processed)
           * SERIALIZE: use success + serialized_data (serialized_state)
           * FINALIZE: use success + serialized_data (serialized result)
+          * Any failed operation: use success=false + serialized_data (UTF-8 
error message)
         """
 
         # Get or create state manager for this specific UDAF function
@@ -2299,8 +2298,12 @@ class FlightServer(flight.FlightServerBase):
                     e,
                     traceback.format_exc(),
                 )
+                # Keep the UDAF Flight stream alive so C++ can still send 
DESTROY.
+                # On failure, serialized_data carries the user-visible Python 
error text.
                 result_batch = self._create_unified_response(
-                    success=False, rows_processed=0, data=b""
+                    success=False,
+                    rows_processed=0,
+                    data=str(e).encode("utf-8", errors="replace"),
                 )
 
             # Begin stream with unified schema on first call
diff --git a/be/src/udf/python/python_udaf_client.cpp 
b/be/src/udf/python/python_udaf_client.cpp
index 6a6f6035ea5..4308750ccec 100644
--- a/be/src/udf/python/python_udaf_client.cpp
+++ b/be/src/udf/python/python_udaf_client.cpp
@@ -32,6 +32,7 @@
 #include "format/arrow/arrow_utils.h"
 #include "udf/python/python_udf_meta.h"
 #include "udf/python/python_udf_runtime.h"
+#include "util/unaligned.h"
 
 namespace doris {
 
@@ -42,6 +43,7 @@ namespace doris {
 // - ACCUMULATE: use success + rows_processed (number of rows processed)
 // - SERIALIZE: use success + data (serialized_state)
 // - FINALIZE: use success + data (serialized result, may be null)
+// - Any failed operation: use success=false + data (UTF-8 error message)
 //
 // This unified schema allows all operations to return consistent format,
 // solving Arrow Flight's limitation that all responses must have the same 
schema.
@@ -51,6 +53,47 @@ static const std::shared_ptr<arrow::Schema> 
kUnifiedUDAFResponseSchema = arrow::
         arrow::field("serialized_data", arrow::binary()),
 });
 
+Status PythonUDAFClient::make_udaf_failure_status(
+        const std::shared_ptr<arrow::RecordBatch>& response, const char* 
operation,
+        int64_t place_id) {
+    if (response == nullptr || response->num_rows() != 1 ||
+        response->num_columns() != kUnifiedUDAFResponseSchema->num_fields()) 
[[unlikely]] {
+        return Status::InternalError("Invalid {} failure response for 
place_id={}", operation,
+                                     place_id);
+    }
+
+    auto data_array = 
std::static_pointer_cast<arrow::BinaryArray>(response->column(2));
+    if (data_array->IsNull(0)) {
+        return Status::InternalError("{} operation failed for place_id={}", 
operation, place_id);
+    }
+
+    const auto* offsets = data_array->raw_value_offsets();
+    if (offsets == nullptr) [[unlikely]] {
+        return Status::InternalError("Invalid {} failure response for 
place_id={}: null offsets",
+                                     operation, place_id);
+    }
+    // Arrow Flight buffers may be unaligned after IPC deserialization
+    int32_t offset_start = unaligned_load<int32_t>(offsets);
+    int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
+
+    int32_t length = offset_end - offset_start;
+    if (length <= 0) {
+        return Status::InternalError("{} operation failed for place_id={}", 
operation, place_id);
+    }
+    const uint8_t* data = data_array->value_data()->data() + offset_start;
+    std::string error_message(reinterpret_cast<const char*>(data), length);
+    return Status::InternalError("{} operation failed for place_id={}: {}", 
operation, place_id,
+                                 error_message);
+}
+
+#ifdef BE_TEST
+Status PythonUDAFClient::make_udaf_failure_status_for_test(
+        const std::shared_ptr<arrow::RecordBatch>& response, const char* 
operation,
+        int64_t place_id) {
+    return make_udaf_failure_status(response, operation, place_id);
+}
+#endif
+
 Status PythonUDAFClient::create(const PythonUDFMeta& func_meta, ProcessPtr 
process,
                                 const std::shared_ptr<arrow::Schema>& 
data_schema,
                                 PythonUDAFClientPtr* client) {
@@ -89,7 +132,7 @@ Status PythonUDAFClient::create(int64_t place_id) {
 
     auto success_array = 
std::static_pointer_cast<arrow::BooleanArray>(response_batch->column(0));
     if (!success_array->Value(0)) {
-        return Status::InternalError("CREATE operation failed for 
place_id={}", place_id);
+        return make_udaf_failure_status(response_batch, "CREATE", place_id);
     }
 
     _created_place_id = place_id;
@@ -142,16 +185,15 @@ Status PythonUDAFClient::accumulate(int64_t place_id, 
bool is_single_place,
     auto rows_processed_array = 
std::static_pointer_cast<arrow::Int64Array>(response->column(1));
 
     if (!success_array->Value(0)) {
-        return Status::InternalError("ACCUMULATE operation failed for 
place_id={}", place_id);
+        return make_udaf_failure_status(response, "ACCUMULATE", place_id);
     }
 
-    // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
-    const uint8_t* raw_ptr = reinterpret_cast<const 
uint8_t*>(rows_processed_array->raw_values());
+    // Arrow Flight buffers may be unaligned after IPC deserialization.
+    const auto* raw_ptr = rows_processed_array->raw_values();
     if (raw_ptr == nullptr) {
         return Status::InternalError("ACCUMULATE response has null 
rows_processed array");
     }
-    int64_t rows_processed;
-    memcpy(&rows_processed, raw_ptr, sizeof(int64_t));
+    int64_t rows_processed = unaligned_load<int64_t>(raw_ptr);
 
     int64_t expected_rows = row_end - row_start;
 
@@ -185,17 +227,16 @@ Status PythonUDAFClient::serialize(int64_t place_id,
     auto data_array = 
std::static_pointer_cast<arrow::BinaryArray>(response->column(2));
 
     if (!success_array->Value(0)) {
-        return Status::InternalError("SERIALIZE operation failed for 
place_id={}", place_id);
+        return make_udaf_failure_status(response, "SERIALIZE", place_id);
     }
 
-    // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
-    const uint8_t* offsets = reinterpret_cast<const 
uint8_t*>(data_array->raw_value_offsets());
+    // Arrow Flight buffers may be unaligned after IPC deserialization.
+    const auto* offsets = data_array->raw_value_offsets();
     if (offsets == nullptr) {
         return Status::InternalError("SERIALIZE response has null offsets");
     }
-    int32_t offset_start, offset_end;
-    memcpy(&offset_start, offsets, sizeof(int32_t));
-    memcpy(&offset_end, offsets + sizeof(int32_t), sizeof(int32_t));
+    int32_t offset_start = unaligned_load<int32_t>(offsets);
+    int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
 
     int32_t length = offset_end - offset_start;
 
@@ -233,7 +274,7 @@ Status PythonUDAFClient::merge(int64_t place_id,
 
     auto success_array = 
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
     if (!success_array->Value(0)) {
-        return Status::InternalError("MERGE operation failed for place_id={}", 
place_id);
+        return make_udaf_failure_status(response, "MERGE", place_id);
     }
 
     return Status::OK();
@@ -260,17 +301,16 @@ Status PythonUDAFClient::finalize(int64_t place_id, 
std::shared_ptr<arrow::Recor
     auto data_array = 
std::static_pointer_cast<arrow::BinaryArray>(response_batch->column(2));
 
     if (!success_array->Value(0)) {
-        return Status::InternalError("FINALIZE operation failed for 
place_id={}", place_id);
+        return make_udaf_failure_status(response_batch, "FINALIZE", place_id);
     }
 
-    // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
-    const uint8_t* offsets = reinterpret_cast<const 
uint8_t*>(data_array->raw_value_offsets());
+    // Arrow Flight buffers may be unaligned after IPC deserialization.
+    const auto* offsets = data_array->raw_value_offsets();
     if (offsets == nullptr) {
         return Status::InternalError("FINALIZE response has null offsets");
     }
-    int32_t offset_start, offset_end;
-    memcpy(&offset_start, offsets, sizeof(int32_t));
-    memcpy(&offset_end, offsets + sizeof(int32_t), sizeof(int32_t));
+    int32_t offset_start = unaligned_load<int32_t>(offsets);
+    int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
 
     int32_t length = offset_end - offset_start;
 
@@ -324,7 +364,7 @@ Status PythonUDAFClient::reset(int64_t place_id) {
 
     auto success_array = 
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
     if (!success_array->Value(0)) {
-        return Status::InternalError("RESET operation failed for place_id={}", 
place_id);
+        return make_udaf_failure_status(response, "RESET", place_id);
     }
 
     return Status::OK();
@@ -363,7 +403,7 @@ Status PythonUDAFClient::destroy(int64_t place_id) {
 
     if (!success_array->Value(0)) {
         LOG(WARNING) << "DESTROY operation failed for place_id=" << place_id;
-        return Status::InternalError("DESTROY operation failed for 
place_id={}", place_id);
+        return make_udaf_failure_status(response, "DESTROY", place_id);
     }
 
     return Status::OK();
diff --git a/be/src/udf/python/python_udaf_client.h 
b/be/src/udf/python/python_udaf_client.h
index 078c34a39ea..471716651a4 100644
--- a/be/src/udf/python/python_udaf_client.h
+++ b/be/src/udf/python/python_udaf_client.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <arrow/record_batch.h>
 #include <arrow/status.h>
 
 #include "udf/python/python_client.h"
@@ -173,9 +174,18 @@ public:
      */
     Status close();
 
+#ifdef BE_TEST
+    static Status make_udaf_failure_status_for_test(
+            const std::shared_ptr<arrow::RecordBatch>& response, const char* 
operation,
+            int64_t place_id);
+#endif
+
 private:
     DISALLOW_COPY_AND_ASSIGN(PythonUDAFClient);
 
+    static Status make_udaf_failure_status(const 
std::shared_ptr<arrow::RecordBatch>& response,
+                                           const char* operation, int64_t 
place_id);
+
     /**
      * Send RecordBatch request to Python server with app_metadata
      * @param metadata UDAFMetadata structure (will be sent as app_metadata)
diff --git a/be/test/udf/python/python_udaf_client_test.cpp 
b/be/test/udf/python/python_udaf_client_test.cpp
new file mode 100644
index 00000000000..eb1ab5242b8
--- /dev/null
+++ b/be/test/udf/python/python_udaf_client_test.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "udf/python/python_udaf_client.h"
+
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+namespace doris {
+
+std::shared_ptr<arrow::RecordBatch> make_udaf_response(const 
std::optional<std::string>& error) {
+    arrow::BooleanBuilder success_builder;
+    std::shared_ptr<arrow::Array> success_array;
+    EXPECT_TRUE(success_builder.Append(false).ok());
+    EXPECT_TRUE(success_builder.Finish(&success_array).ok());
+
+    arrow::Int64Builder rows_processed_builder;
+    std::shared_ptr<arrow::Array> rows_processed_array;
+    EXPECT_TRUE(rows_processed_builder.Append(0).ok());
+    EXPECT_TRUE(rows_processed_builder.Finish(&rows_processed_array).ok());
+
+    arrow::BinaryBuilder data_builder;
+    std::shared_ptr<arrow::Array> data_array;
+    if (error.has_value()) {
+        EXPECT_TRUE(data_builder.Append(error->data(), 
static_cast<int32_t>(error->size())).ok());
+    } else {
+        EXPECT_TRUE(data_builder.AppendNull().ok());
+    }
+    EXPECT_TRUE(data_builder.Finish(&data_array).ok());
+
+    auto schema = arrow::schema({
+            arrow::field("success", arrow::boolean()),
+            arrow::field("rows_processed", arrow::int64()),
+            arrow::field("serialized_data", arrow::binary()),
+    });
+    return arrow::RecordBatch::Make(schema, 1, {success_array, 
rows_processed_array, data_array});
+}
+
+std::shared_ptr<arrow::RecordBatch> make_udaf_response_with_data_array(
+        const std::shared_ptr<arrow::Array>& data_array) {
+    arrow::BooleanBuilder success_builder;
+    std::shared_ptr<arrow::Array> success_array;
+    EXPECT_TRUE(success_builder.Append(false).ok());
+    EXPECT_TRUE(success_builder.Finish(&success_array).ok());
+
+    arrow::Int64Builder rows_processed_builder;
+    std::shared_ptr<arrow::Array> rows_processed_array;
+    EXPECT_TRUE(rows_processed_builder.Append(0).ok());
+    EXPECT_TRUE(rows_processed_builder.Finish(&rows_processed_array).ok());
+
+    auto schema = arrow::schema({
+            arrow::field("success", arrow::boolean()),
+            arrow::field("rows_processed", arrow::int64()),
+            arrow::field("serialized_data", arrow::binary()),
+    });
+    return arrow::RecordBatch::Make(schema, 1, {success_array, 
rows_processed_array, data_array});
+}
+
+TEST(PythonUDAFClientTest, FailureStatusIncludesPythonErrorMessage) {
+    auto response = make_udaf_response("finish failed");
+    Status status = 
PythonUDAFClient::make_udaf_failure_status_for_test(response, "FINALIZE", 7);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("FINALIZE operation failed for 
place_id=7: finish failed"),
+              std::string::npos);
+}
+
+TEST(PythonUDAFClientTest, FailureStatusHandlesUnalignedBinaryOffsets) {
+    std::string error = "finalize failed";
+    std::vector<uint8_t> offset_storage(1 + 2 * sizeof(int32_t));
+    uint8_t* unaligned_offsets = offset_storage.data() + 1;
+    int32_t offset_start = 0;
+    int32_t offset_end = static_cast<int32_t>(error.size());
+    memcpy(unaligned_offsets, &offset_start, sizeof(int32_t));
+    memcpy(unaligned_offsets + sizeof(int32_t), &offset_end, sizeof(int32_t));
+
+    auto offset_buffer = arrow::Buffer::Wrap(unaligned_offsets, 2 * 
sizeof(int32_t));
+    auto value_buffer =
+            arrow::Buffer::Wrap(reinterpret_cast<const 
uint8_t*>(error.data()), error.size());
+    auto data_array = std::make_shared<arrow::BinaryArray>(1, offset_buffer, 
value_buffer);
+    ASSERT_EQ(reinterpret_cast<uintptr_t>(data_array->raw_value_offsets()) % 
alignof(int32_t), 1);
+
+    Status status = PythonUDAFClient::make_udaf_failure_status_for_test(
+            make_udaf_response_with_data_array(data_array), "FINALIZE", 13);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("FINALIZE operation failed for 
place_id=13: finalize failed"),
+              std::string::npos);
+}
+
+TEST(PythonUDAFClientTest, 
FailureStatusFallsBackWhenErrorMessageIsNullOrEmpty) {
+    Status null_status = PythonUDAFClient::make_udaf_failure_status_for_test(
+            make_udaf_response(std::nullopt), "RESET", 8);
+    EXPECT_FALSE(null_status.ok());
+    EXPECT_NE(null_status.to_string().find("RESET operation failed for 
place_id=8"),
+              std::string::npos);
+
+    Status empty_status =
+            
PythonUDAFClient::make_udaf_failure_status_for_test(make_udaf_response(""), 
"MERGE", 9);
+    EXPECT_FALSE(empty_status.ok());
+    EXPECT_NE(empty_status.to_string().find("MERGE operation failed for 
place_id=9"),
+              std::string::npos);
+}
+
+TEST(PythonUDAFClientTest, FailureStatusRejectsInvalidResponseShape) {
+    Status null_status =
+            PythonUDAFClient::make_udaf_failure_status_for_test(nullptr, 
"ACCUMULATE", 10);
+    EXPECT_FALSE(null_status.ok());
+    EXPECT_NE(null_status.to_string().find("Invalid ACCUMULATE failure 
response for place_id=10"),
+              std::string::npos);
+
+    auto zero_row_response = make_udaf_response("accumulate failed")->Slice(0, 
0);
+    Status zero_row_status = 
PythonUDAFClient::make_udaf_failure_status_for_test(zero_row_response,
+                                                                               
  "ACCUMULATE", 11);
+    EXPECT_FALSE(zero_row_status.ok());
+    EXPECT_NE(
+            zero_row_status.to_string().find("Invalid ACCUMULATE failure 
response for place_id=11"),
+            std::string::npos);
+
+    auto response = make_udaf_response("reset failed");
+    auto two_column_response = arrow::RecordBatch::Make(
+            arrow::schema({response->schema()->field(0), 
response->schema()->field(1)}), 1,
+            {response->column(0), response->column(1)});
+    Status two_column_status =
+            
PythonUDAFClient::make_udaf_failure_status_for_test(two_column_response, 
"RESET", 12);
+    EXPECT_FALSE(two_column_status.ok());
+    EXPECT_NE(two_column_status.to_string().find("Invalid RESET failure 
response for place_id=12"),
+              std::string::npos);
+}
+
+} // namespace doris
diff --git 
a/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
 
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
new file mode 100644
index 00000000000..63e63a43a1f
--- /dev/null
+++ 
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_python_raise_error_propagation") {
+    // Keep using the existing type-wide archives under regression-test/suites.
+    // This avoids introducing extra zip names and preserves the same 
loader/cache path shape as p0.
+    def suitePath = context.file.parent + "/.."
+    def udfPath = """${suitePath}/pythonudf_p0/udf_scripts/pyudf.zip"""
+    def udafPath = """${suitePath}/pythonudaf_p0/udaf_scripts/pyudaf.zip"""
+    def udtfPath = """${suitePath}/pythonudtf_p0/udtf_scripts/pyudtf.zip"""
+    scp_udf_file_to_all_be(udfPath)
+    scp_udf_file_to_all_be(udafPath)
+    scp_udf_file_to_all_be(udtfPath)
+    def runtime_version = getPythonUdfRuntimeVersion()
+    log.info("Python UDF zip path: ${udfPath}".toString())
+    log.info("Python UDAF zip path: ${udafPath}".toString())
+    log.info("Python UDTF zip path: ${udtfPath}".toString())
+
+    try {
+        sql """ DROP TABLE IF EXISTS python_raise_error_test; """
+        sql """
+        CREATE TABLE python_raise_error_test (
+            id INT,
+            val INT
+        ) ENGINE=OLAP
+        DUPLICATE KEY(id)
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES("replication_num" = "1");
+        """
+
+        sql """
+        INSERT INTO python_raise_error_test VALUES
+        (1, 1),
+        (2, 2);
+        """
+
+        sql """ DROP FUNCTION IF EXISTS py_inline_raise_udf(INT); """
+        sql """
+        CREATE FUNCTION py_inline_raise_udf(INT)
+        RETURNS INT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "evaluate",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        ) AS \$\$
+def evaluate(x):
+    raise TypeError("inline_udf_error_42")
+\$\$;
+        """
+
+        test {
+            sql """ SELECT py_inline_raise_udf(1); """
+            exception "inline_udf_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_module_raise_udf(INT); """
+        sql """
+        CREATE FUNCTION py_module_raise_udf(INT)
+        RETURNS INT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "file" = "file://${udfPath}",
+            "symbol" = "udf_errors.raise_in_module",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        );
+        """
+
+        test {
+            sql """ SELECT py_module_raise_udf(1); """
+            exception "module_udf_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf(INT); """
+        sql """
+        CREATE AGGREGATE FUNCTION py_inline_raise_udaf(INT)
+        RETURNS BIGINT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "InlineFinishErrorUDAF",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        ) AS \$\$
+class InlineFinishErrorUDAF:
+    def __init__(self):
+        self.count = 0
+
+    @property
+    def aggregate_state(self):
+        return self.count
+
+    def accumulate(self, value):
+        if value is not None:
+            self.count += 1
+
+    def merge(self, other_state):
+        self.count += other_state
+
+    def finish(self):
+        raise TypeError("inline_udaf_error_42")
+\$\$;
+        """
+
+        test {
+            sql """ SELECT py_inline_raise_udaf(val) FROM 
python_raise_error_test; """
+            exception "inline_udaf_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf_accumulate(INT); 
"""
+        sql """
+        CREATE AGGREGATE FUNCTION py_inline_raise_udaf_accumulate(INT)
+        RETURNS BIGINT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "InlineAccumulateErrorUDAF",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        ) AS \$\$
+class InlineAccumulateErrorUDAF:
+    def __init__(self):
+        self.count = 0
+
+    @property
+    def aggregate_state(self):
+        return self.count
+
+    def accumulate(self, value):
+        if value == 2:
+            raise TypeError("inline_udaf_accumulate_error_42")
+        self.count += 1
+
+    def merge(self, other_state):
+        self.count += other_state
+
+    def finish(self):
+        return self.count
+\$\$;
+        """
+
+        test {
+            sql """ SELECT py_inline_raise_udaf_accumulate(val) FROM 
python_raise_error_test; """
+            exception "inline_udaf_accumulate_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf_merge(INT); """
+        sql """
+        CREATE AGGREGATE FUNCTION py_inline_raise_udaf_merge(INT)
+        RETURNS BIGINT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "InlineMergeErrorUDAF",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        ) AS \$\$
+class InlineMergeErrorUDAF:
+    def __init__(self):
+        self.count = 0
+
+    @property
+    def aggregate_state(self):
+        return self.count
+
+    def accumulate(self, value):
+        if value is not None:
+            self.count += 1
+
+    def merge(self, other_state):
+        raise TypeError("inline_udaf_merge_error_42")
+
+    def finish(self):
+        return self.count
+\$\$;
+        """
+
+        test {
+            sql """ SELECT py_inline_raise_udaf_merge(val) FROM 
python_raise_error_test; """
+            exception "inline_udaf_merge_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_module_raise_udaf(INT); """
+        sql """
+        CREATE AGGREGATE FUNCTION py_module_raise_udaf(INT)
+        RETURNS BIGINT
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "file" = "file://${udafPath}",
+            "symbol" = "udaf_errors.ModuleFinishErrorUDAF",
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
+        );
+        """
+
+        test {
+            sql """ SELECT py_module_raise_udaf(val) FROM 
python_raise_error_test; """
+            exception "module_udaf_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_inline_raise_udtf(INT); """
+        sql """
+        CREATE TABLES FUNCTION py_inline_raise_udtf(INT)
+        RETURNS ARRAY<INT>
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "symbol" = "inline_raise_udtf",
+            "runtime_version" = "${runtime_version}"
+        ) AS \$\$
+def inline_raise_udtf(x):
+    if False:
+        yield x
+    raise TypeError("inline_udtf_error_42")
+\$\$;
+        """
+
+        test {
+            sql """
+            SELECT tmp.col
+            FROM python_raise_error_test
+            LATERAL VIEW py_inline_raise_udtf(val) tmp AS col;
+            """
+            exception "inline_udtf_error_42"
+        }
+
+        sql """ DROP FUNCTION IF EXISTS py_module_raise_udtf(INT); """
+        sql """
+        CREATE TABLES FUNCTION py_module_raise_udtf(INT)
+        RETURNS ARRAY<INT>
+        PROPERTIES (
+            "type" = "PYTHON_UDF",
+            "file" = "file://${udtfPath}",
+            "symbol" = "pyudtf_module.exceptions_udtf.raise_in_module_udtf",
+            "runtime_version" = "${runtime_version}"
+        );
+        """
+
+        test {
+            sql """
+            SELECT tmp.col
+            FROM python_raise_error_test
+            LATERAL VIEW py_module_raise_udtf(val) tmp AS col;
+            """
+            exception "module_udtf_error_42"
+        }
+    } finally {
+        try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udf(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_module_raise_udf(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udaf(INT);")
+        try_sql("DROP FUNCTION IF EXISTS 
py_inline_raise_udaf_accumulate(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udaf_merge(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_module_raise_udaf(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udtf(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_module_raise_udtf(INT);")
+        try_sql("DROP TABLE IF EXISTS python_raise_error_test;")
+    }
+}
diff --git a/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip 
b/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip
index 1dc76099d43..835aa5af0ad 100644
Binary files a/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip and 
b/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip differ
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py 
b/regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
similarity index 58%
copy from regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
copy to regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
index ef3020985d4..e7ea9ed3c3d 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
@@ -15,10 +15,28 @@
 # specific language governing permissions and limitations
 # under the License.
 
+"""Module-based UDAF error cases for regression tests."""
 
-def evaluate(res):
-    value = 0
-    for data in res:
-        if data is not None:
-            value += data
-    return value
\ No newline at end of file
+
+class ModuleFinishErrorUDAF:
+    """Raise a stable error from finish() to verify propagation."""
+
+    def __init__(self):
+        self.count = 0
+
+    @property
+    def aggregate_state(self):
+        return self.count
+
+    def accumulate(self, value):
+        if value is not None:
+            self.count += 1
+
+    def merge(self, other_state):
+        self.count += other_state
+
+    def reset(self):
+        self.count = 0
+
+    def finish(self):
+        raise TypeError("module_udaf_error_42")
diff --git 
a/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy 
b/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
index 151de115aba..bbea4b94bf2 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
@@ -55,19 +55,20 @@ suite("test_pythonudf_file_protocol") {
         qt_select_file_string """ SELECT py_file_string_mask('1234567890', 3, 
3) AS result; """
         
         // Test 3: Load float_test.py from zip package using file:// protocol
-        sql """ DROP FUNCTION IF EXISTS py_file_float_process(FLOAT); """
+        sql """ DROP FUNCTION IF EXISTS py_file_float_process(FLOAT, FLOAT); 
"""
         sql """
-        CREATE FUNCTION py_file_float_process(FLOAT) 
+        CREATE FUNCTION py_file_float_process(FLOAT, FLOAT)
         RETURNS FLOAT 
         PROPERTIES (
             "type" = "PYTHON_UDF",
             "file" = "file://${zipPath}",
             "symbol" = "float_test.evaluate",
-            "runtime_version" = "${runtime_version}"
+            "runtime_version" = "${runtime_version}",
+            "always_nullable" = "true"
         );
         """
         
-        qt_select_file_float """ SELECT py_file_float_process(3.14) AS result; 
"""
+        qt_select_file_float """ SELECT py_file_float_process(3.14, null) AS 
result; """
         
         // Test 4: Load boolean_test.py from zip package using file:// protocol
         sql """ DROP FUNCTION IF EXISTS py_file_bool_not(BOOLEAN); """
@@ -120,7 +121,7 @@ suite("test_pythonudf_file_protocol") {
     } finally {
         try_sql("DROP FUNCTION IF EXISTS py_file_int_add(INT);")
         try_sql("DROP FUNCTION IF EXISTS py_file_string_mask(STRING, INT, 
INT);")
-        try_sql("DROP FUNCTION IF EXISTS py_file_float_process(FLOAT);")
+        try_sql("DROP FUNCTION IF EXISTS py_file_float_process(FLOAT, FLOAT);")
         try_sql("DROP FUNCTION IF EXISTS py_file_bool_not(BOOLEAN);")
         try_sql("DROP TABLE IF EXISTS file_protocol_test_table;")
     }
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
index ef3020985d4..2e751c80ffa 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
@@ -17,6 +17,8 @@
 
 
 def evaluate(res):
+    if res is None:
+        return None
     value = 0
     for data in res:
         if data is not None:
diff --git 
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
index 7781d788f07..45292ab5499 100644
--- 
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
+++ 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
@@ -17,6 +17,8 @@
 
 
 def evaluate(res):
+    if res is None:
+        return None
     value = 0
     for data in res:
         if data is not None:
diff --git 
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
index 92864bc800c..4991fb39595 100644
--- 
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
+++ 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
@@ -17,6 +17,8 @@
 
 
 def evaluate(res):
+    if res is None:
+        return None
     value = ""
     for data in res:
         if data is not None:
diff --git 
a/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py 
b/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
index ede02c1201e..4539a27cb6a 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
@@ -17,6 +17,8 @@
 
 
 def evaluate(res):
+    if res is None:
+        return None
     value = ""
     for data in res:
         if data is not None:
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py 
b/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
index 3b2d726ff40..1ce8ca82010 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
@@ -17,4 +17,6 @@
 
 
 def evaluate(arg1, arg2):
+    if arg1 is None or arg2 is None:
+        return None
     return arg1 - arg2
\ No newline at end of file
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py 
b/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
index b96f6b0d402..23df4b06ce6 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
@@ -17,4 +17,6 @@
 
 
 def evaluate(arg):
+    if arg is None:
+        return None
     return int(arg + 1)
\ No newline at end of file
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip 
b/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip
index b4ed70a402b..7a33bc3e20d 100644
Binary files a/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip and 
b/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip differ
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py 
b/regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
similarity index 85%
copy from regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
copy to regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
index ef3020985d4..336167e37b1 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
@@ -15,10 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+"""Module-based UDF error cases for regression tests."""
 
-def evaluate(res):
-    value = 0
-    for data in res:
-        if data is not None:
-            value += data
-    return value
\ No newline at end of file
+
+def raise_in_module(value):
+    raise TypeError("module_udf_error_42")
diff --git a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip 
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip
index f04942c9784..7e4bbe08205 100644
Binary files a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip and 
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip differ
diff --git 
a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
 
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
index b663c7aa878..83742674240 100644
--- 
a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
+++ 
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
@@ -211,3 +211,10 @@ def validate_date(dt):
             status = 'normal'
         
         yield (dt, year, is_leap, status)
+
+
+def raise_in_module_udtf(value):
+    """Raise a stable error to verify UDTF exception propagation."""
+    if False:
+        yield value
+    raise TypeError("module_udtf_error_42")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to