This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54d7885340a [Fix](pyudf) clear stale UDAF state cache on drop (#63062)
54d7885340a is described below

commit 54d7885340a8db325d8e800ee9e8e45fa7c132af
Author: linrrarity <[email protected]>
AuthorDate: Tue May 12 14:06:56 2026 +0800

    [Fix](pyudf) clear stale UDAF state cache on drop (#63062)
    
    Fix Python UDAF stale cache reuse after dropping and recreating an
    inline UDAF with the same name/signature.
    
    The Python server previously keyed `UDAF state` managers by `function
    name and argument types`, so a recreated inline UDAF could reuse the old
    loaded Python class. This fix includes the FE function id in the Python
    UDAF metadata/cache key and clears `UDAF state` manager cache during
    `DROP FUNCTION` cleanup.
    
    ```sql
    set enable_sql_cache = 0;
    DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
    drop database if exists db001;
    create database db001;
    use db001;
    
    -- 0. Prepare test data
    DROP TABLE IF EXISTS t_udaf_cache_bug_test;
    CREATE TABLE t_udaf_cache_bug_test (
        id INT,
        val INT
    ) DUPLICATE KEY(id)
    DISTRIBUTED BY HASH(id) BUCKETS 1
    PROPERTIES("replication_num"="1");
    INSERT INTO t_udaf_cache_bug_test VALUES (1, 10), (2, 20), (3, 30);
    -- At this moment, the total of the entire table val is 60.
    
    -- 1. Create V1 version of UDAF (Logic: Accumulate and multiply by 10)
    DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
    select sleep(10);
    CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
    RETURNS BIGINT
    PROPERTIES (
        "type"="PYTHON_UDF",
        "symbol"="RecreateUDAF",
        "runtime_version"="3.12.11",
        "always_nullable"="true"
    )
    AS $$
    class RecreateUDAF:
        def __init__(self):
            self.total = 0
        @property
        def aggregate_state(self):
            return self.total
        def accumulate(self, val):
            if val is not None:
                self.total += val
        def merge(self, other):
            self.total += other
        def finish(self):
            return self.total * 10  # V1: 乘以 10
    $$;
    
    -- 2. Verify V1 Logic
    SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
    -- Expected Return: 600 (60 * 10)
    -- Actual Return: 600 (Correct)
    
    -- 3. Drop the old function and create a V2 version of the UDAF with the 
same name (logic: accumulate and multiply by 100)
    DROP FUNCTION IF EXISTS py_udaf_bug_repro(INT);
    select sleep(10);
    select sleep(10);
    CREATE AGGREGATE FUNCTION py_udaf_bug_repro(INT)
    RETURNS BIGINT
    PROPERTIES (
        "type"="PYTHON_UDF",
        "symbol"="RecreateUDAF",
        "runtime_version"="3.12.11",
        "always_nullable"="true"
    )
    AS $$
    class RecreateUDAF:
        def __init__(self):
            self.total = 0
        @property
        def aggregate_state(self):
            return self.total
        def accumulate(self, val):
            if val is not None:
                self.total += val
        def merge(self, other):
            self.total += other
        def finish(self):
            return self.total * 100  # V2: Logic modified to multiply by 100
    $$;
    
    -- 4. Verify V2 Logic
    SELECT py_udaf_bug_repro(val) FROM t_udaf_cache_bug_test;
    -- Expected Return: 6000 (60 * 100)
    -- Actual Return: 600  ([Bug occurs] Still outputs the old cached 600)
    ```
---
 be/src/agent/task_worker_pool.cpp                  |  2 +
 be/src/udf/python/python_server.cpp                | 25 +++++--
 be/src/udf/python/python_server.h                  | 11 +++
 be/src/udf/python/python_server.py                 | 79 ++++++++++++++++++++--
 be/src/udf/python/python_udf_meta.cpp              |  2 +
 be/src/udf/python/python_udf_runtime.h             |  6 +-
 be/test/udf/python/python_server_test.cpp          | 45 +++++++++++-
 be/test/udf/python/python_udf_meta_test.cpp        |  3 +
 .../pythonudaf_p0/test_pythonudaf_drop.groovy      | 63 +++++++++++++++++
 9 files changed, 220 insertions(+), 16 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 97125ffe6a8..ecbd01aac15 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -92,6 +92,7 @@
 #include "storage/task/engine_storage_migration_task.h"
 #include "storage/txn/txn_manager.h"
 #include "storage/utils.h"
+#include "udf/python/python_server.h"
 #include "util/brpc_client_cache.h"
 #include "util/debug_points.h"
 #include "util/jni-util.h"
@@ -2596,6 +2597,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& 
req) {
 
     if (clean_req.__isset.function_id && clean_req.function_id > 0) {
         
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
+        
PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
     }
 
     LOG(INFO) << "clean udf cache finish: function_signature=" << 
clean_req.function_signature;
diff --git a/be/src/udf/python/python_server.cpp 
b/be/src/udf/python/python_server.cpp
index 7e6e4cddbc1..5ea1ef41409 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -33,6 +33,7 @@
 
 #include "arrow/flight/client.h"
 #include "common/config.h"
+#include "common/status.h"
 #include "udf/python/python_udaf_client.h"
 #include "udf/python/python_udf_client.h"
 #include "udf/python/python_udtf_client.h"
@@ -437,7 +438,20 @@ Status PythonServerManager::clear_module_cache(const 
std::string& location) {
     }
 
     std::string body = fmt::format(R"({{"location": "{}"}})", location);
+    return _broadcast_action_to_processes("clear_module_cache", body,
+                                          fmt::format("location={}", 
location));
+}
+
+void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
+    std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
+    WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", 
body,
+                                                 fmt::format("function_id={}", 
function_id)),
+                  "failed to clear Python UDAF state cache");
+}
 
+Status PythonServerManager::_broadcast_action_to_processes(const std::string& 
action_type,
+                                                           const std::string& 
body,
+                                                           const std::string& 
log_name) {
     int success_count = 0;
     int fail_count = 0;
     bool has_active_process = false;
@@ -465,7 +479,7 @@ Status PythonServerManager::clear_module_cache(const 
std::string& location) {
                 auto client = std::move(*client_result);
 
                 arrow::flight::Action action;
-                action.type = "clear_module_cache";
+                action.type = action_type;
                 action.body = arrow::Buffer::FromString(body);
 
                 auto result_stream = client->DoAction(action);
@@ -491,13 +505,12 @@ Status PythonServerManager::clear_module_cache(const 
std::string& location) {
         return Status::OK();
     }
 
-    LOG(INFO) << "clear_module_cache completed for location=" << location
-              << ", success=" << success_count << ", failed=" << fail_count;
+    LOG(INFO) << action_type << " completed for " << log_name << ", success=" 
<< success_count
+              << ", failed=" << fail_count;
 
     if (fail_count > 0) {
-        return Status::InternalError(
-                "clear_module_cache failed for location={}, success={}, 
failed={}", location,
-                success_count, fail_count);
+        return Status::InternalError("{} failed for {}, success={}, 
failed={}", action_type,
+                                     log_name, success_count, fail_count);
     }
 
     return Status::OK();
diff --git a/be/src/udf/python/python_server.h 
b/be/src/udf/python/python_server.h
index 7aa452740c7..78cef72e06d 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -52,6 +52,9 @@ public:
     // Clear Python module cache for a specific UDF location across all 
processes
     Status clear_module_cache(const std::string& location);
 
+    // Clear Python UDAF runtime state after DROP FUNCTION
+    void clear_udaf_state_cache(int64_t function_id);
+
     void shutdown();
 
 #ifdef BE_TEST
@@ -62,6 +65,12 @@ public:
                                    bool initialized = true);
 
     std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& 
version);
+
+    Status broadcast_action_to_processes_for_test(const std::string& 
action_type,
+                                                  const std::string& body,
+                                                  const std::string& log_name) 
{
+        return _broadcast_action_to_processes(action_type, body, log_name);
+    }
 #endif
 
 private:
@@ -108,6 +117,8 @@ private:
     std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const 
PythonVersion& version);
     std::vector<std::pair<PythonVersion, 
std::shared_ptr<VersionedProcessPool>>>
     _snapshot_process_pools();
+    Status _broadcast_action_to_processes(const std::string& action_type, 
const std::string& body,
+                                          const std::string& log_name);
 
     std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> 
_process_pools;
     // Protects the version -> pool handle map only. Per-version process 
operations are guarded
diff --git a/be/src/udf/python/python_server.py 
b/be/src/udf/python/python_server.py
index d16fc352178..3290acc6bc6 100644
--- a/be/src/udf/python/python_server.py
+++ b/be/src/udf/python/python_server.py
@@ -455,6 +455,7 @@ class PythonUDFMeta:
 
     def __init__(
         self,
+        function_id: int,
         name: str,
         symbol: str,
         location: str,
@@ -470,6 +471,7 @@ class PythonUDFMeta:
         Initialize Python UDF metadata.
 
         Args:
+            function_id: FE catalog function id
             name: UDF function name
             symbol: Symbol to load (function name or module.function)
             location: File path or directory containing the UDF
@@ -481,6 +483,7 @@ class PythonUDFMeta:
             output_type: PyArrow data type for return value
             client_type: 0 for UDF, 1 for UDAF, 2 for UDTF
         """
+        self.id = function_id
         self.name = name
         self.symbol = symbol
         self.location = location
@@ -508,7 +511,7 @@ class PythonUDFMeta:
         """Returns a string representation of the UDF metadata."""
         udf_load_type_str = "INLINE" if self.udf_load_type == 0 else "MODULE"
         return (
-            f"PythonUDFMeta(name={self.name}, symbol={self.symbol}, "
+            f"PythonUDFMeta(id={self.id}, name={self.name}, 
symbol={self.symbol}, "
             f"location={self.location}, udf_load_type={udf_load_type_str}, 
runtime_version={self.runtime_version}, "
             f"always_nullable={self.always_nullable}, 
client_type={self.client_type.name}, "
             f"input_types={self.input_types}, output_type={self.output_type})"
@@ -1575,8 +1578,9 @@ class FlightServer(flight.FlightServerBase):
             location: Unix socket path for the server
         """
         super().__init__(location)
-        # Use a dictionary to maintain separate state managers for each UDAF 
function
-        # Key: function signature (name + input_types), Value: 
UDAFStateManager instance
+        # Use a dictionary to maintain separate state managers for each UDAF 
function.
+        # Key includes function_id so DROP/CREATE with the same name and 
signature
+        # cannot reuse a class loaded from old inline code.
         self.udaf_state_managers: Dict[str, UDAFStateManager] = {}
         self.udaf_managers_lock = threading.Lock()
 
@@ -1593,19 +1597,50 @@ class FlightServer(flight.FlightServerBase):
         Returns:
             UDAFStateManager instance for this specific UDAF
         """
-        # Create a unique key based on function name and argument types
         type_names = [str(field.type) for field in 
python_udaf_meta.input_types]
-        func_key = f"{python_udaf_meta.name}({','.join(type_names)})"
+        func_key = (
+            
f"{python_udaf_meta.id}:{python_udaf_meta.name}({','.join(type_names)})"
+        )
 
         with self.udaf_managers_lock:
-            if func_key not in self.udaf_state_managers:
+            manager = self.udaf_state_managers.get(func_key)
+            if manager is None:
                 manager = UDAFStateManager()
                 # Load and set the UDAF class for this manager using 
UDAFClassLoader
                 udaf_class = UDAFClassLoader.load_udaf_class(python_udaf_meta)
                 manager.set_udaf_class(udaf_class)
                 self.udaf_state_managers[func_key] = manager
 
-        return self.udaf_state_managers[func_key]
+            # Return the manager while holding the lock so a concurrent DROP 
cleanup
+            # cannot pop the key between lookup and return.
+            return manager
+
+    def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int:
+        """
+        Clear UDAF managers for a dropped function id.
+
+        DROP FUNCTION cache cleanup is asynchronous. The runtime key still 
includes
+        function_id for correctness, while this action detaches dropped 
functions
+        from the manager registry so new exchanges cannot reuse the old UDAF 
class.
+        """
+        prefix = f"{function_id}:"
+        cleared = 0
+
+        with self.udaf_managers_lock:
+            keys_to_remove = [
+                key for key in self.udaf_state_managers if 
key.startswith(prefix)
+            ]
+            for key in keys_to_remove:
+                # Do not clear manager.states here. An already-started Flight
+                # exchange may still hold this manager and continue with later
+                # SERIALIZE/FINALIZE/DESTROY calls for its place_ids.
+                self.udaf_state_managers.pop(key, None)
+                cleared += 1
+
+        if cleared:
+            gc.collect()
+
+        return cleared
 
     @staticmethod
     def parse_python_udf_meta(
@@ -1623,6 +1658,7 @@ class FlightServer(flight.FlightServerBase):
             return None
 
         cmd_json = json.loads(descriptor.command)
+        function_id = cmd_json["id"]
         name = cmd_json["name"]
         symbol = cmd_json["symbol"]
         location = cmd_json["location"]
@@ -1648,6 +1684,7 @@ class FlightServer(flight.FlightServerBase):
         output_type = output_schema.field(0).type
 
         python_udf_meta = PythonUDFMeta(
+            function_id=function_id,
             name=name,
             symbol=symbol,
             location=location,
@@ -2513,14 +2550,42 @@ class FlightServer(flight.FlightServerBase):
         Supported actions:
         - "clear_module_cache": Clear Python module cache for a specific 
location
           Body: JSON with "location" field (the UDF cache directory path)
+        - "clear_udaf_state_cache": Clear UDAF runtime state for a dropped 
function id
+          Body: JSON with "function_id" field
         """
         action_type = action.type
 
         if action_type == "clear_module_cache":
             yield from 
self._handle_clear_module_cache(action.body.to_pybytes())
+        elif action_type == "clear_udaf_state_cache":
+            yield from 
self._handle_clear_udaf_state_cache(action.body.to_pybytes())
         else:
             raise flight.FlightUnavailableError(f"Unknown action: 
{action_type}")
 
+    def _handle_clear_udaf_state_cache(self, body: bytes):
+        """
+        Clear cached UDAF state managers for a dropped function id.
+        """
+        try:
+            params = json.loads(body.decode("utf-8"))
+            function_id = int(params["function_id"])
+
+            cleared_managers = 
self._clear_udaf_state_cache_by_function_id(function_id)
+
+            result = {
+                "success": True,
+                "cleared_managers": cleared_managers,
+                "function_id": function_id,
+            }
+            yield flight.Result(json.dumps(result).encode("utf-8"))
+
+        except Exception as e:
+            logging.error("clear_udaf_state_cache failed: %s", e)
+            yield flight.Result(json.dumps({
+                "success": False,
+                "error": str(e)
+            }).encode("utf-8"))
+
     def _handle_clear_module_cache(self, body: bytes):
         """
         Clear Python module cache for a specific UDF location.
diff --git a/be/src/udf/python/python_udf_meta.cpp 
b/be/src/udf/python/python_udf_meta.cpp
index f0978dc926b..4f21a045ab1 100644
--- a/be/src/udf/python/python_udf_meta.cpp
+++ b/be/src/udf/python/python_udf_meta.cpp
@@ -55,6 +55,7 @@ Status PythonUDFMeta::serialize_arrow_schema(const 
std::shared_ptr<arrow::Schema
     json format:
     {
         "name": "xxx",
+        "id": 123,
         "symbol": "xxx",
         "location": "xxx",
         "udf_load_type": 0 or 1,
@@ -71,6 +72,7 @@ Status PythonUDFMeta::serialize_to_json(std::string* 
json_str) const {
     doc.SetObject();
     auto& allocator = doc.GetAllocator();
     doc.AddMember("name", rapidjson::Value().SetString(name.c_str(), 
allocator), allocator);
+    doc.AddMember("id", rapidjson::Value().SetInt64(id), allocator);
     doc.AddMember("symbol", rapidjson::Value().SetString(symbol.c_str(), 
allocator), allocator);
     doc.AddMember("location", rapidjson::Value().SetString(location.c_str(), 
allocator), allocator);
     doc.AddMember("udf_load_type", 
rapidjson::Value().SetInt(static_cast<int>(type)), allocator);
diff --git a/be/src/udf/python/python_udf_runtime.h 
b/be/src/udf/python/python_udf_runtime.h
index d107c702eb1..269b6d1a98e 100644
--- a/be/src/udf/python/python_udf_runtime.h
+++ b/be/src/udf/python/python_udf_runtime.h
@@ -83,6 +83,10 @@ public:
 
     bool operator!=(const PythonUDFProcess& other) const { return !(*this == 
other); }
 
+#ifdef BE_TEST
+    void set_uri_for_test(std::string uri) { _uri = std::move(uri); }
+#endif
+
 private:
     constexpr static int TERMINATE_RETRY_TIMES = 10;
     constexpr static size_t MAX_ACCUMULATED_LOG_SIZE = 65536;
@@ -96,4 +100,4 @@ private:
     std::string _accumulated_log;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/udf/python/python_server_test.cpp 
b/be/test/udf/python/python_server_test.cpp
index 8fb006f35b7..f21e52e0730 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -21,21 +21,22 @@
 #include <sys/socket.h>
 #include <sys/un.h>
 
+#include <boost/process.hpp>
 #include <filesystem>
 #include <fstream>
-#include <future>
 #include <string>
+#include <vector>
 
 #include "common/config.h"
 #include "common/status.h"
 #include "udf/python/python_env.h"
 #include "udf/python/python_udf_client.h"
 #include "udf/python/python_udf_meta.h"
-#include "udf/python/python_udf_runtime.h"
 
 namespace doris {
 
 namespace fs = std::filesystem;
+namespace bp = boost::process;
 
 class PythonServerTest : public ::testing::Test {
 protected:
@@ -136,6 +137,13 @@ protected:
         ofs << "# fake server\n";
         ofs.close();
     }
+
+    ProcessPtr create_sleep_process() {
+        bp::ipstream output_stream;
+        std::string sleep_path = fs::exists("/bin/sleep") ? "/bin/sleep" : 
"/usr/bin/sleep";
+        bp::child child(sleep_path, "60", bp::std_out > output_stream, 
bp::std_err > bp::null);
+        return std::make_shared<PythonUDFProcess>(std::move(child), 
std::move(output_stream));
+    }
 };
 
 // ============================================================================
@@ -304,6 +312,39 @@ TEST_F(PythonServerTest, 
ShutdownAfterFailedInitializationDoesNotCrash) {
     EXPECT_NO_THROW(mgr.shutdown());
 }
 
+TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) {
+    PythonServerManager mgr;
+
+    EXPECT_NO_THROW(mgr.clear_udaf_state_cache(12345));
+}
+
+TEST_F(PythonServerTest, ClearModuleCacheWithoutProcessesIsNoOp) {
+    PythonServerManager mgr;
+
+    auto status = mgr.clear_module_cache("/tmp/python_udf_cache");
+    EXPECT_TRUE(status.ok()) << status.to_string();
+}
+
+TEST_F(PythonServerTest, BroadcastActionWithInvalidProcessUriReturnsError) {
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, test_dir_ + "/bin/python3");
+    ProcessPtr process = create_sleep_process();
+    ASSERT_NE(process, nullptr);
+    ASSERT_TRUE(process->is_alive());
+    process->set_uri_for_test("invalid-python-flight-uri");
+
+    mgr.set_process_pool_for_test(version, {process});
+    auto status = mgr.broadcast_action_to_processes_for_test(
+            "clear_udaf_state_cache", R"({"function_id": 12345})", 
"function_id=12345");
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("clear_udaf_state_cache failed for 
function_id=12345"),
+              std::string::npos);
+    EXPECT_NE(status.to_string().find("success=0, failed=1"), 
std::string::npos);
+
+    mgr.shutdown();
+}
+
 // ============================================================================
 // PythonServerManager::get_client() - client retrieval test
 // ============================================================================
diff --git a/be/test/udf/python/python_udf_meta_test.cpp 
b/be/test/udf/python/python_udf_meta_test.cpp
index fd651ae07d0..43085430510 100644
--- a/be/test/udf/python/python_udf_meta_test.cpp
+++ b/be/test/udf/python/python_udf_meta_test.cpp
@@ -352,6 +352,9 @@ TEST_F(PythonUDFMetaTest, SerializeToJsonBasic) {
     doc.Parse(json_str.c_str());
     EXPECT_FALSE(doc.HasParseError());
 
+    EXPECT_TRUE(doc.HasMember("id"));
+    EXPECT_EQ(doc["id"].GetInt64(), 1);
+
     EXPECT_TRUE(doc.HasMember("name"));
     EXPECT_STREQ(doc["name"].GetString(), "test_udf");
 
diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy 
b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
index 4b64921676f..e0b0ed8c466 100644
--- a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
+++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
@@ -124,10 +124,73 @@ suite('test_pythonudaf_drop', "nonConcurrent") {
 
         qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM 
py_udaf_drop_tbl;'''
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
+
+        // Case 4: inline UDAF drop/recreate must not reuse the old Python 
class.
+        // The Python server caches UDAF state managers, so this verifies the 
cache key
+        // and drop cleanup both use the FE function id, not just name + 
argument types.
+        sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
+        sql """
+            CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT)
+            RETURNS BIGINT
+            PROPERTIES (
+                "type" = "PYTHON_UDF",
+                "symbol" = "InlineDropRecreateUdaf",
+                "runtime_version" = "${runtime_version}",
+                "always_nullable" = "true"
+            )
+            AS \$\$
+class InlineDropRecreateUdaf:
+    def __init__(self):
+        self.total = 0
+    @property
+    def aggregate_state(self):
+        return self.total
+    def accumulate(self, val):
+        if val is not None:
+            self.total += val
+    def merge(self, other):
+        self.total += other
+    def finish(self):
+        return self.total * 10
+\$\$
+        """
+        def inlineOldResult = sql '''SELECT py_drop_inline_recreate(v) FROM 
py_udaf_drop_tbl;'''
+        assert inlineOldResult[0][0].toString() == '60'
+
+        sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
+        sql """
+            CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT)
+            RETURNS BIGINT
+            PROPERTIES (
+                "type" = "PYTHON_UDF",
+                "symbol" = "InlineDropRecreateUdaf",
+                "runtime_version" = "${runtime_version}",
+                "always_nullable" = "true"
+            )
+            AS \$\$
+class InlineDropRecreateUdaf:
+    def __init__(self):
+        self.total = 0
+    @property
+    def aggregate_state(self):
+        return self.total
+    def accumulate(self, val):
+        if val is not None:
+            self.total += val
+    def merge(self, other):
+        self.total += other
+    def finish(self):
+        return self.total * 100
+\$\$
+        """
+        def inlineNewResult = sql '''SELECT py_drop_inline_recreate(v) FROM 
py_udaf_drop_tbl;'''
+        assert inlineNewResult[0][0].toString() == '600'
+        sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
     } finally {
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
+        try_sql('DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT);')
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to