This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2cfa2426e41 [Fix](pyudf) Fix python udf error propagation (#62613)
2cfa2426e41 is described below
commit 2cfa2426e41a9f1fcf02d607e5e49ef6d43ebf70
Author: linrrarity <[email protected]>
AuthorDate: Wed May 13 15:54:05 2026 +0800
[Fix](pyudf) Fix python udf error propagation (#62613)
Problem Summary:
before
```sql
CREATE FUNCTION py_err_stats_test(INT)
RETURNS INT
PROPERTIES (
"type"="PYTHON_UDF",
"symbol"="evaluate",
"runtime_version"="3.12.11",
"always_nullable"="true"
) AS $$
def evaluate(x):
raise TypeError("consistent_error_42")
$$;
SELECT py_err_stats_test(1);
+----------------------+
| py_err_stats_test(1) |
+----------------------+
| NULL |
+----------------------+
```
now
```sql
SELECT py_err_stats_test(1);
-- ERROR 1105 (HY000): errCode = 2, detailMessage =
(127.0.0.1)[RUNTIME_ERROR]Flight stream finish failed with message: Error in
scalar UDF execution at row 0: consistent_error_42. Detail: Python exception:
Traceback (most recent call last):
File
"/mnt/disk7/linzhenqi/dv/doris/output/be/plugins/python_udf/python_server.py",
line 614, in _scalar_call
res = self._eval_func(*converted_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 3, in evaluate
TypeError: consistent_error_42
```
---
be/src/udf/python/python_server.py | 25 +-
be/src/udf/python/python_udaf_client.cpp | 82 +++++--
be/src/udf/python/python_udaf_client.h | 10 +
be/test/udf/python/python_udaf_client_test.cpp | 155 ++++++++++++
.../test_python_raise_error_propagation.groovy | 269 +++++++++++++++++++++
.../suites/pythonudaf_p0/udaf_scripts/pyudaf.zip | Bin 7858 -> 9153 bytes
.../udaf_scripts/udaf_errors.py} | 30 ++-
.../test_pythonudf_file_protocol.groovy | 11 +-
.../pythonudf_p0/udf_scripts/array_int_test.py | 2 +
.../udf_scripts/array_return_array_int_test.py | 2 +
.../udf_scripts/array_return_array_string_test.py | 2 +
.../pythonudf_p0/udf_scripts/array_string_test.py | 2 +
.../suites/pythonudf_p0/udf_scripts/float_test.py | 2 +
.../suites/pythonudf_p0/udf_scripts/int_test.py | 2 +
.../suites/pythonudf_p0/udf_scripts/pyudf.zip | Bin 6086 -> 15967 bytes
.../{array_int_test.py => udf_errors.py} | 10 +-
.../suites/pythonudtf_p0/udtf_scripts/pyudtf.zip | Bin 10216 -> 10924 bytes
.../udtf_scripts/pyudtf_module/exceptions_udtf.py | 7 +
18 files changed, 562 insertions(+), 49 deletions(-)
diff --git a/be/src/udf/python/python_server.py
b/be/src/udf/python/python_server.py
index 3290acc6bc6..ecdbef691b9 100644
--- a/be/src/udf/python/python_server.py
+++ b/be/src/udf/python/python_server.py
@@ -631,11 +631,9 @@ class AdaptivePythonUDF:
converted_args,
traceback.format_exc(),
)
- # Return None for failed rows if always_nullable is True
- if self.python_udf_meta.always_nullable:
- result.append(None)
- else:
- raise
+ raise RuntimeError(
+ f"Error in scalar UDF execution at row {i}: {e}"
+ ) from e
return pa.array(result, type=self._get_output_type())
@@ -1768,7 +1766,7 @@ class FlightServer(flight.FlightServerBase):
place_id,
e,
)
- success = False
+ raise RuntimeError(str(e)) from e
return pa.RecordBatch.from_arrays(
[pa.array([success], type=pa.bool_())], ["success"]
@@ -1916,7 +1914,7 @@ class FlightServer(flight.FlightServerBase):
place_id,
e,
)
- serialized = b""
+ raise RuntimeError(str(e)) from e
return pa.RecordBatch.from_arrays(
[pa.array([serialized], type=pa.binary())], ["serialized_state"]
@@ -1945,7 +1943,7 @@ class FlightServer(flight.FlightServerBase):
place_id,
e,
)
- success = False
+ raise RuntimeError(str(e)) from e
return pa.RecordBatch.from_arrays(
[pa.array([success], type=pa.bool_())], ["success"]
@@ -1969,7 +1967,7 @@ class FlightServer(flight.FlightServerBase):
place_id,
e,
)
- result = None
+ raise RuntimeError(str(e)) from e
return pa.RecordBatch.from_arrays(
[pa.array([result], type=output_type)], ["result"]
@@ -1991,7 +1989,7 @@ class FlightServer(flight.FlightServerBase):
place_id,
e,
)
- success = False
+ raise RuntimeError(str(e)) from e
return pa.RecordBatch.from_arrays(
[pa.array([success], type=pa.bool_())], ["success"]
@@ -2127,6 +2125,7 @@ class FlightServer(flight.FlightServerBase):
* ACCUMULATE: use success + rows_processed (number of rows processed)
* SERIALIZE: use success + serialized_data (serialized_state)
* FINALIZE: use success + serialized_data (serialized result)
+ * Any failed operation: use success=false + serialized_data (UTF-8
error message)
"""
# Get or create state manager for this specific UDAF function
@@ -2299,8 +2298,12 @@ class FlightServer(flight.FlightServerBase):
e,
traceback.format_exc(),
)
+ # Keep the UDAF Flight stream alive so C++ can still send
DESTROY.
+ # On failure, serialized_data carries the user-visible Python
error text.
result_batch = self._create_unified_response(
- success=False, rows_processed=0, data=b""
+ success=False,
+ rows_processed=0,
+ data=str(e).encode("utf-8", errors="replace"),
)
# Begin stream with unified schema on first call
diff --git a/be/src/udf/python/python_udaf_client.cpp
b/be/src/udf/python/python_udaf_client.cpp
index 6a6f6035ea5..4308750ccec 100644
--- a/be/src/udf/python/python_udaf_client.cpp
+++ b/be/src/udf/python/python_udaf_client.cpp
@@ -32,6 +32,7 @@
#include "format/arrow/arrow_utils.h"
#include "udf/python/python_udf_meta.h"
#include "udf/python/python_udf_runtime.h"
+#include "util/unaligned.h"
namespace doris {
@@ -42,6 +43,7 @@ namespace doris {
// - ACCUMULATE: use success + rows_processed (number of rows processed)
// - SERIALIZE: use success + data (serialized_state)
// - FINALIZE: use success + data (serialized result, may be null)
+// - Any failed operation: use success=false + data (UTF-8 error message)
//
// This unified schema allows all operations to return consistent format,
// solving Arrow Flight's limitation that all responses must have the same
schema.
@@ -51,6 +53,47 @@ static const std::shared_ptr<arrow::Schema>
kUnifiedUDAFResponseSchema = arrow::
arrow::field("serialized_data", arrow::binary()),
});
+Status PythonUDAFClient::make_udaf_failure_status(
+ const std::shared_ptr<arrow::RecordBatch>& response, const char*
operation,
+ int64_t place_id) {
+ if (response == nullptr || response->num_rows() != 1 ||
+ response->num_columns() != kUnifiedUDAFResponseSchema->num_fields())
[[unlikely]] {
+ return Status::InternalError("Invalid {} failure response for
place_id={}", operation,
+ place_id);
+ }
+
+ auto data_array =
std::static_pointer_cast<arrow::BinaryArray>(response->column(2));
+ if (data_array->IsNull(0)) {
+ return Status::InternalError("{} operation failed for place_id={}",
operation, place_id);
+ }
+
+ const auto* offsets = data_array->raw_value_offsets();
+ if (offsets == nullptr) [[unlikely]] {
+ return Status::InternalError("Invalid {} failure response for
place_id={}: null offsets",
+ operation, place_id);
+ }
+ // Arrow Flight buffers may be unaligned after IPC deserialization
+ int32_t offset_start = unaligned_load<int32_t>(offsets);
+ int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
+
+ int32_t length = offset_end - offset_start;
+ if (length <= 0) {
+ return Status::InternalError("{} operation failed for place_id={}",
operation, place_id);
+ }
+ const uint8_t* data = data_array->value_data()->data() + offset_start;
+ std::string error_message(reinterpret_cast<const char*>(data), length);
+ return Status::InternalError("{} operation failed for place_id={}: {}",
operation, place_id,
+ error_message);
+}
+
+#ifdef BE_TEST
+Status PythonUDAFClient::make_udaf_failure_status_for_test(
+ const std::shared_ptr<arrow::RecordBatch>& response, const char*
operation,
+ int64_t place_id) {
+ return make_udaf_failure_status(response, operation, place_id);
+}
+#endif
+
Status PythonUDAFClient::create(const PythonUDFMeta& func_meta, ProcessPtr
process,
const std::shared_ptr<arrow::Schema>&
data_schema,
PythonUDAFClientPtr* client) {
@@ -89,7 +132,7 @@ Status PythonUDAFClient::create(int64_t place_id) {
auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response_batch->column(0));
if (!success_array->Value(0)) {
- return Status::InternalError("CREATE operation failed for
place_id={}", place_id);
+ return make_udaf_failure_status(response_batch, "CREATE", place_id);
}
_created_place_id = place_id;
@@ -142,16 +185,15 @@ Status PythonUDAFClient::accumulate(int64_t place_id,
bool is_single_place,
auto rows_processed_array =
std::static_pointer_cast<arrow::Int64Array>(response->column(1));
if (!success_array->Value(0)) {
- return Status::InternalError("ACCUMULATE operation failed for
place_id={}", place_id);
+ return make_udaf_failure_status(response, "ACCUMULATE", place_id);
}
- // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
- const uint8_t* raw_ptr = reinterpret_cast<const
uint8_t*>(rows_processed_array->raw_values());
+ // Arrow Flight buffers may be unaligned after IPC deserialization.
+ const auto* raw_ptr = rows_processed_array->raw_values();
if (raw_ptr == nullptr) {
return Status::InternalError("ACCUMULATE response has null
rows_processed array");
}
- int64_t rows_processed;
- memcpy(&rows_processed, raw_ptr, sizeof(int64_t));
+ int64_t rows_processed = unaligned_load<int64_t>(raw_ptr);
int64_t expected_rows = row_end - row_start;
@@ -185,17 +227,16 @@ Status PythonUDAFClient::serialize(int64_t place_id,
auto data_array =
std::static_pointer_cast<arrow::BinaryArray>(response->column(2));
if (!success_array->Value(0)) {
- return Status::InternalError("SERIALIZE operation failed for
place_id={}", place_id);
+ return make_udaf_failure_status(response, "SERIALIZE", place_id);
}
- // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
- const uint8_t* offsets = reinterpret_cast<const
uint8_t*>(data_array->raw_value_offsets());
+ // Arrow Flight buffers may be unaligned after IPC deserialization.
+ const auto* offsets = data_array->raw_value_offsets();
if (offsets == nullptr) {
return Status::InternalError("SERIALIZE response has null offsets");
}
- int32_t offset_start, offset_end;
- memcpy(&offset_start, offsets, sizeof(int32_t));
- memcpy(&offset_end, offsets + sizeof(int32_t), sizeof(int32_t));
+ int32_t offset_start = unaligned_load<int32_t>(offsets);
+ int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
int32_t length = offset_end - offset_start;
@@ -233,7 +274,7 @@ Status PythonUDAFClient::merge(int64_t place_id,
auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
if (!success_array->Value(0)) {
- return Status::InternalError("MERGE operation failed for place_id={}",
place_id);
+ return make_udaf_failure_status(response, "MERGE", place_id);
}
return Status::OK();
@@ -260,17 +301,16 @@ Status PythonUDAFClient::finalize(int64_t place_id,
std::shared_ptr<arrow::Recor
auto data_array =
std::static_pointer_cast<arrow::BinaryArray>(response_batch->column(2));
if (!success_array->Value(0)) {
- return Status::InternalError("FINALIZE operation failed for
place_id={}", place_id);
+ return make_udaf_failure_status(response_batch, "FINALIZE", place_id);
}
- // Cast to uint8_t* first to avoid UBSAN misaligned pointer errors
- const uint8_t* offsets = reinterpret_cast<const
uint8_t*>(data_array->raw_value_offsets());
+ // Arrow Flight buffers may be unaligned after IPC deserialization.
+ const auto* offsets = data_array->raw_value_offsets();
if (offsets == nullptr) {
return Status::InternalError("FINALIZE response has null offsets");
}
- int32_t offset_start, offset_end;
- memcpy(&offset_start, offsets, sizeof(int32_t));
- memcpy(&offset_end, offsets + sizeof(int32_t), sizeof(int32_t));
+ int32_t offset_start = unaligned_load<int32_t>(offsets);
+ int32_t offset_end = unaligned_load<int32_t>(offsets + 1);
int32_t length = offset_end - offset_start;
@@ -324,7 +364,7 @@ Status PythonUDAFClient::reset(int64_t place_id) {
auto success_array =
std::static_pointer_cast<arrow::BooleanArray>(response->column(0));
if (!success_array->Value(0)) {
- return Status::InternalError("RESET operation failed for place_id={}",
place_id);
+ return make_udaf_failure_status(response, "RESET", place_id);
}
return Status::OK();
@@ -363,7 +403,7 @@ Status PythonUDAFClient::destroy(int64_t place_id) {
if (!success_array->Value(0)) {
LOG(WARNING) << "DESTROY operation failed for place_id=" << place_id;
- return Status::InternalError("DESTROY operation failed for
place_id={}", place_id);
+ return make_udaf_failure_status(response, "DESTROY", place_id);
}
return Status::OK();
diff --git a/be/src/udf/python/python_udaf_client.h
b/be/src/udf/python/python_udaf_client.h
index 078c34a39ea..471716651a4 100644
--- a/be/src/udf/python/python_udaf_client.h
+++ b/be/src/udf/python/python_udaf_client.h
@@ -17,6 +17,7 @@
#pragma once
+#include <arrow/record_batch.h>
#include <arrow/status.h>
#include "udf/python/python_client.h"
@@ -173,9 +174,18 @@ public:
*/
Status close();
+#ifdef BE_TEST
+ static Status make_udaf_failure_status_for_test(
+ const std::shared_ptr<arrow::RecordBatch>& response, const char*
operation,
+ int64_t place_id);
+#endif
+
private:
DISALLOW_COPY_AND_ASSIGN(PythonUDAFClient);
+ static Status make_udaf_failure_status(const
std::shared_ptr<arrow::RecordBatch>& response,
+ const char* operation, int64_t
place_id);
+
/**
* Send RecordBatch request to Python server with app_metadata
* @param metadata UDAFMetadata structure (will be sent as app_metadata)
diff --git a/be/test/udf/python/python_udaf_client_test.cpp
b/be/test/udf/python/python_udaf_client_test.cpp
new file mode 100644
index 00000000000..eb1ab5242b8
--- /dev/null
+++ b/be/test/udf/python/python_udaf_client_test.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "udf/python/python_udaf_client.h"
+
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+namespace doris {
+
+std::shared_ptr<arrow::RecordBatch> make_udaf_response(const
std::optional<std::string>& error) {
+ arrow::BooleanBuilder success_builder;
+ std::shared_ptr<arrow::Array> success_array;
+ EXPECT_TRUE(success_builder.Append(false).ok());
+ EXPECT_TRUE(success_builder.Finish(&success_array).ok());
+
+ arrow::Int64Builder rows_processed_builder;
+ std::shared_ptr<arrow::Array> rows_processed_array;
+ EXPECT_TRUE(rows_processed_builder.Append(0).ok());
+ EXPECT_TRUE(rows_processed_builder.Finish(&rows_processed_array).ok());
+
+ arrow::BinaryBuilder data_builder;
+ std::shared_ptr<arrow::Array> data_array;
+ if (error.has_value()) {
+ EXPECT_TRUE(data_builder.Append(error->data(),
static_cast<int32_t>(error->size())).ok());
+ } else {
+ EXPECT_TRUE(data_builder.AppendNull().ok());
+ }
+ EXPECT_TRUE(data_builder.Finish(&data_array).ok());
+
+ auto schema = arrow::schema({
+ arrow::field("success", arrow::boolean()),
+ arrow::field("rows_processed", arrow::int64()),
+ arrow::field("serialized_data", arrow::binary()),
+ });
+ return arrow::RecordBatch::Make(schema, 1, {success_array,
rows_processed_array, data_array});
+}
+
+std::shared_ptr<arrow::RecordBatch> make_udaf_response_with_data_array(
+ const std::shared_ptr<arrow::Array>& data_array) {
+ arrow::BooleanBuilder success_builder;
+ std::shared_ptr<arrow::Array> success_array;
+ EXPECT_TRUE(success_builder.Append(false).ok());
+ EXPECT_TRUE(success_builder.Finish(&success_array).ok());
+
+ arrow::Int64Builder rows_processed_builder;
+ std::shared_ptr<arrow::Array> rows_processed_array;
+ EXPECT_TRUE(rows_processed_builder.Append(0).ok());
+ EXPECT_TRUE(rows_processed_builder.Finish(&rows_processed_array).ok());
+
+ auto schema = arrow::schema({
+ arrow::field("success", arrow::boolean()),
+ arrow::field("rows_processed", arrow::int64()),
+ arrow::field("serialized_data", arrow::binary()),
+ });
+ return arrow::RecordBatch::Make(schema, 1, {success_array,
rows_processed_array, data_array});
+}
+
+TEST(PythonUDAFClientTest, FailureStatusIncludesPythonErrorMessage) {
+ auto response = make_udaf_response("finish failed");
+ Status status =
PythonUDAFClient::make_udaf_failure_status_for_test(response, "FINALIZE", 7);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("FINALIZE operation failed for
place_id=7: finish failed"),
+ std::string::npos);
+}
+
+TEST(PythonUDAFClientTest, FailureStatusHandlesUnalignedBinaryOffsets) {
+ std::string error = "finalize failed";
+ std::vector<uint8_t> offset_storage(1 + 2 * sizeof(int32_t));
+ uint8_t* unaligned_offsets = offset_storage.data() + 1;
+ int32_t offset_start = 0;
+ int32_t offset_end = static_cast<int32_t>(error.size());
+ memcpy(unaligned_offsets, &offset_start, sizeof(int32_t));
+ memcpy(unaligned_offsets + sizeof(int32_t), &offset_end, sizeof(int32_t));
+
+ auto offset_buffer = arrow::Buffer::Wrap(unaligned_offsets, 2 *
sizeof(int32_t));
+ auto value_buffer =
+ arrow::Buffer::Wrap(reinterpret_cast<const
uint8_t*>(error.data()), error.size());
+ auto data_array = std::make_shared<arrow::BinaryArray>(1, offset_buffer,
value_buffer);
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data_array->raw_value_offsets()) %
alignof(int32_t), 1);
+
+ Status status = PythonUDAFClient::make_udaf_failure_status_for_test(
+ make_udaf_response_with_data_array(data_array), "FINALIZE", 13);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("FINALIZE operation failed for
place_id=13: finalize failed"),
+ std::string::npos);
+}
+
+TEST(PythonUDAFClientTest,
FailureStatusFallsBackWhenErrorMessageIsNullOrEmpty) {
+ Status null_status = PythonUDAFClient::make_udaf_failure_status_for_test(
+ make_udaf_response(std::nullopt), "RESET", 8);
+ EXPECT_FALSE(null_status.ok());
+ EXPECT_NE(null_status.to_string().find("RESET operation failed for
place_id=8"),
+ std::string::npos);
+
+ Status empty_status =
+
PythonUDAFClient::make_udaf_failure_status_for_test(make_udaf_response(""),
"MERGE", 9);
+ EXPECT_FALSE(empty_status.ok());
+ EXPECT_NE(empty_status.to_string().find("MERGE operation failed for
place_id=9"),
+ std::string::npos);
+}
+
+TEST(PythonUDAFClientTest, FailureStatusRejectsInvalidResponseShape) {
+ Status null_status =
+ PythonUDAFClient::make_udaf_failure_status_for_test(nullptr,
"ACCUMULATE", 10);
+ EXPECT_FALSE(null_status.ok());
+ EXPECT_NE(null_status.to_string().find("Invalid ACCUMULATE failure
response for place_id=10"),
+ std::string::npos);
+
+ auto zero_row_response = make_udaf_response("accumulate failed")->Slice(0,
0);
+ Status zero_row_status =
PythonUDAFClient::make_udaf_failure_status_for_test(zero_row_response,
+
"ACCUMULATE", 11);
+ EXPECT_FALSE(zero_row_status.ok());
+ EXPECT_NE(
+ zero_row_status.to_string().find("Invalid ACCUMULATE failure
response for place_id=11"),
+ std::string::npos);
+
+ auto response = make_udaf_response("reset failed");
+ auto two_column_response = arrow::RecordBatch::Make(
+ arrow::schema({response->schema()->field(0),
response->schema()->field(1)}), 1,
+ {response->column(0), response->column(1)});
+ Status two_column_status =
+
PythonUDAFClient::make_udaf_failure_status_for_test(two_column_response,
"RESET", 12);
+ EXPECT_FALSE(two_column_status.ok());
+ EXPECT_NE(two_column_status.to_string().find("Invalid RESET failure
response for place_id=12"),
+ std::string::npos);
+}
+
+} // namespace doris
diff --git
a/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
new file mode 100644
index 00000000000..63e63a43a1f
--- /dev/null
+++
b/regression-test/suites/pythonudaf_p0/test_python_raise_error_propagation.groovy
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_python_raise_error_propagation") {
+ // Keep using the existing type-wide archives under regression-test/suites.
+ // This avoids introducing extra zip names and preserves the same
loader/cache path shape as p0.
+ def suitePath = context.file.parent + "/.."
+ def udfPath = """${suitePath}/pythonudf_p0/udf_scripts/pyudf.zip"""
+ def udafPath = """${suitePath}/pythonudaf_p0/udaf_scripts/pyudaf.zip"""
+ def udtfPath = """${suitePath}/pythonudtf_p0/udtf_scripts/pyudtf.zip"""
+ scp_udf_file_to_all_be(udfPath)
+ scp_udf_file_to_all_be(udafPath)
+ scp_udf_file_to_all_be(udtfPath)
+ def runtime_version = getPythonUdfRuntimeVersion()
+ log.info("Python UDF zip path: ${udfPath}".toString())
+ log.info("Python UDAF zip path: ${udafPath}".toString())
+ log.info("Python UDTF zip path: ${udtfPath}".toString())
+
+ try {
+ sql """ DROP TABLE IF EXISTS python_raise_error_test; """
+ sql """
+ CREATE TABLE python_raise_error_test (
+ id INT,
+ val INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+
+ sql """
+ INSERT INTO python_raise_error_test VALUES
+ (1, 1),
+ (2, 2);
+ """
+
+ sql """ DROP FUNCTION IF EXISTS py_inline_raise_udf(INT); """
+ sql """
+ CREATE FUNCTION py_inline_raise_udf(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ ) AS \$\$
+def evaluate(x):
+ raise TypeError("inline_udf_error_42")
+\$\$;
+ """
+
+ test {
+ sql """ SELECT py_inline_raise_udf(1); """
+ exception "inline_udf_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_module_raise_udf(INT); """
+ sql """
+ CREATE FUNCTION py_module_raise_udf(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${udfPath}",
+ "symbol" = "udf_errors.raise_in_module",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ );
+ """
+
+ test {
+ sql """ SELECT py_module_raise_udf(1); """
+ exception "module_udf_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf(INT); """
+ sql """
+ CREATE AGGREGATE FUNCTION py_inline_raise_udaf(INT)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "InlineFinishErrorUDAF",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ ) AS \$\$
+class InlineFinishErrorUDAF:
+ def __init__(self):
+ self.count = 0
+
+ @property
+ def aggregate_state(self):
+ return self.count
+
+ def accumulate(self, value):
+ if value is not None:
+ self.count += 1
+
+ def merge(self, other_state):
+ self.count += other_state
+
+ def finish(self):
+ raise TypeError("inline_udaf_error_42")
+\$\$;
+ """
+
+ test {
+ sql """ SELECT py_inline_raise_udaf(val) FROM
python_raise_error_test; """
+ exception "inline_udaf_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf_accumulate(INT);
"""
+ sql """
+ CREATE AGGREGATE FUNCTION py_inline_raise_udaf_accumulate(INT)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "InlineAccumulateErrorUDAF",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ ) AS \$\$
+class InlineAccumulateErrorUDAF:
+ def __init__(self):
+ self.count = 0
+
+ @property
+ def aggregate_state(self):
+ return self.count
+
+ def accumulate(self, value):
+ if value == 2:
+ raise TypeError("inline_udaf_accumulate_error_42")
+ self.count += 1
+
+ def merge(self, other_state):
+ self.count += other_state
+
+ def finish(self):
+ return self.count
+\$\$;
+ """
+
+ test {
+ sql """ SELECT py_inline_raise_udaf_accumulate(val) FROM
python_raise_error_test; """
+ exception "inline_udaf_accumulate_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_inline_raise_udaf_merge(INT); """
+ sql """
+ CREATE AGGREGATE FUNCTION py_inline_raise_udaf_merge(INT)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "InlineMergeErrorUDAF",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ ) AS \$\$
+class InlineMergeErrorUDAF:
+ def __init__(self):
+ self.count = 0
+
+ @property
+ def aggregate_state(self):
+ return self.count
+
+ def accumulate(self, value):
+ if value is not None:
+ self.count += 1
+
+ def merge(self, other_state):
+ raise TypeError("inline_udaf_merge_error_42")
+
+ def finish(self):
+ return self.count
+\$\$;
+ """
+
+ test {
+ sql """ SELECT py_inline_raise_udaf_merge(val) FROM
python_raise_error_test; """
+ exception "inline_udaf_merge_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_module_raise_udaf(INT); """
+ sql """
+ CREATE AGGREGATE FUNCTION py_module_raise_udaf(INT)
+ RETURNS BIGINT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${udafPath}",
+ "symbol" = "udaf_errors.ModuleFinishErrorUDAF",
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
+ );
+ """
+
+ test {
+ sql """ SELECT py_module_raise_udaf(val) FROM
python_raise_error_test; """
+ exception "module_udaf_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_inline_raise_udtf(INT); """
+ sql """
+ CREATE TABLES FUNCTION py_inline_raise_udtf(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "inline_raise_udtf",
+ "runtime_version" = "${runtime_version}"
+ ) AS \$\$
+def inline_raise_udtf(x):
+ if False:
+ yield x
+ raise TypeError("inline_udtf_error_42")
+\$\$;
+ """
+
+ test {
+ sql """
+ SELECT tmp.col
+ FROM python_raise_error_test
+ LATERAL VIEW py_inline_raise_udtf(val) tmp AS col;
+ """
+ exception "inline_udtf_error_42"
+ }
+
+ sql """ DROP FUNCTION IF EXISTS py_module_raise_udtf(INT); """
+ sql """
+ CREATE TABLES FUNCTION py_module_raise_udtf(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${udtfPath}",
+ "symbol" = "pyudtf_module.exceptions_udtf.raise_in_module_udtf",
+ "runtime_version" = "${runtime_version}"
+ );
+ """
+
+ test {
+ sql """
+ SELECT tmp.col
+ FROM python_raise_error_test
+ LATERAL VIEW py_module_raise_udtf(val) tmp AS col;
+ """
+ exception "module_udtf_error_42"
+ }
+ } finally {
+ try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udf(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_module_raise_udf(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udaf(INT);")
+ try_sql("DROP FUNCTION IF EXISTS
py_inline_raise_udaf_accumulate(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udaf_merge(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_module_raise_udaf(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_inline_raise_udtf(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_module_raise_udtf(INT);")
+ try_sql("DROP TABLE IF EXISTS python_raise_error_test;")
+ }
+}
diff --git a/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip
b/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip
index 1dc76099d43..835aa5af0ad 100644
Binary files a/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip and
b/regression-test/suites/pythonudaf_p0/udaf_scripts/pyudaf.zip differ
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
b/regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
similarity index 58%
copy from regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
copy to regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
index ef3020985d4..e7ea9ed3c3d 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudaf_p0/udaf_scripts/udaf_errors.py
@@ -15,10 +15,28 @@
# specific language governing permissions and limitations
# under the License.
+"""Module-based UDAF error cases for regression tests."""
-def evaluate(res):
- value = 0
- for data in res:
- if data is not None:
- value += data
- return value
\ No newline at end of file
+
+class ModuleFinishErrorUDAF:
+ """Raise a stable error from finish() to verify propagation."""
+
+ def __init__(self):
+ self.count = 0
+
+ @property
+ def aggregate_state(self):
+ return self.count
+
+ def accumulate(self, value):
+ if value is not None:
+ self.count += 1
+
+ def merge(self, other_state):
+ self.count += other_state
+
+ def reset(self):
+ self.count = 0
+
+ def finish(self):
+ raise TypeError("module_udaf_error_42")
diff --git
a/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
index 151de115aba..bbea4b94bf2 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_file_protocol.groovy
@@ -55,19 +55,20 @@ suite("test_pythonudf_file_protocol") {
qt_select_file_string """ SELECT py_file_string_mask('1234567890', 3,
3) AS result; """
// Test 3: Load float_test.py from zip package using file:// protocol
- sql """ DROP FUNCTION IF EXISTS py_file_float_process(FLOAT); """
+ sql """ DROP FUNCTION IF EXISTS py_file_float_process(FLOAT, FLOAT);
"""
sql """
- CREATE FUNCTION py_file_float_process(FLOAT)
+ CREATE FUNCTION py_file_float_process(FLOAT, FLOAT)
RETURNS FLOAT
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file://${zipPath}",
"symbol" = "float_test.evaluate",
- "runtime_version" = "${runtime_version}"
+ "runtime_version" = "${runtime_version}",
+ "always_nullable" = "true"
);
"""
- qt_select_file_float """ SELECT py_file_float_process(3.14) AS result;
"""
+ qt_select_file_float """ SELECT py_file_float_process(3.14, null) AS
result; """
// Test 4: Load boolean_test.py from zip package using file:// protocol
sql """ DROP FUNCTION IF EXISTS py_file_bool_not(BOOLEAN); """
@@ -120,7 +121,7 @@ suite("test_pythonudf_file_protocol") {
} finally {
try_sql("DROP FUNCTION IF EXISTS py_file_int_add(INT);")
try_sql("DROP FUNCTION IF EXISTS py_file_string_mask(STRING, INT,
INT);")
- try_sql("DROP FUNCTION IF EXISTS py_file_float_process(FLOAT);")
+ try_sql("DROP FUNCTION IF EXISTS py_file_float_process(FLOAT, FLOAT);")
try_sql("DROP FUNCTION IF EXISTS py_file_bool_not(BOOLEAN);")
try_sql("DROP TABLE IF EXISTS file_protocol_test_table;")
}
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
index ef3020985d4..2e751c80ffa 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
@@ -17,6 +17,8 @@
def evaluate(res):
+ if res is None:
+ return None
value = 0
for data in res:
if data is not None:
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
index 7781d788f07..45292ab5499 100644
---
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
+++
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_int_test.py
@@ -17,6 +17,8 @@
def evaluate(res):
+ if res is None:
+ return None
value = 0
for data in res:
if data is not None:
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
index 92864bc800c..4991fb39595 100644
---
a/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
+++
b/regression-test/suites/pythonudf_p0/udf_scripts/array_return_array_string_test.py
@@ -17,6 +17,8 @@
def evaluate(res):
+ if res is None:
+ return None
value = ""
for data in res:
if data is not None:
diff --git
a/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
index ede02c1201e..4539a27cb6a 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/array_string_test.py
@@ -17,6 +17,8 @@
def evaluate(res):
+ if res is None:
+ return None
value = ""
for data in res:
if data is not None:
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
index 3b2d726ff40..1ce8ca82010 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/float_test.py
@@ -17,4 +17,6 @@
def evaluate(arg1, arg2):
+ if arg1 is None or arg2 is None:
+ return None
return arg1 - arg2
\ No newline at end of file
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
index b96f6b0d402..23df4b06ce6 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/int_test.py
@@ -17,4 +17,6 @@
def evaluate(arg):
+ if arg is None:
+ return None
return int(arg + 1)
\ No newline at end of file
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip
b/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip
index b4ed70a402b..7a33bc3e20d 100644
Binary files a/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip and
b/regression-test/suites/pythonudf_p0/udf_scripts/pyudf.zip differ
diff --git a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
b/regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
similarity index 85%
copy from regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
copy to regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
index ef3020985d4..336167e37b1 100644
--- a/regression-test/suites/pythonudf_p0/udf_scripts/array_int_test.py
+++ b/regression-test/suites/pythonudf_p0/udf_scripts/udf_errors.py
@@ -15,10 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""Module-based UDF error cases for regression tests."""
-def evaluate(res):
- value = 0
- for data in res:
- if data is not None:
- value += data
- return value
\ No newline at end of file
+
+def raise_in_module(value):
+ raise TypeError("module_udf_error_42")
diff --git a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip
index f04942c9784..7e4bbe08205 100644
Binary files a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip and
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf.zip differ
diff --git
a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
index b663c7aa878..83742674240 100644
---
a/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
+++
b/regression-test/suites/pythonudtf_p0/udtf_scripts/pyudtf_module/exceptions_udtf.py
@@ -211,3 +211,10 @@ def validate_date(dt):
status = 'normal'
yield (dt, year, is_leap, status)
+
+
+def raise_in_module_udtf(value):
+ """Raise a stable error to verify UDTF exception propagation."""
+ if False:
+ yield value
+ raise TypeError("module_udtf_error_42")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]