github-actions[bot] commented on code in PR #63062:
URL: https://github.com/apache/doris/pull/63062#discussion_r3205870180
##########
be/src/udf/python/python_server.py:
##########
@@ -1607,6 +1612,31 @@ def _get_udaf_state_manager(
return self.udaf_state_managers[func_key]
+ def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int:
+ """
+ Clear UDAF managers for a dropped function id.
+
+ DROP FUNCTION cache cleanup is asynchronous. The runtime key still
includes
+ function_id for correctness, while this action releases old states and
class
+ objects after the drop task reaches this Python process.
+ """
+ prefix = f"{function_id}:"
+ cleared = 0
+
+ with self.udaf_managers_lock:
+ keys_to_remove = [
+ key for key in self.udaf_state_managers if
key.startswith(prefix)
+ ]
+ for key in keys_to_remove:
+ manager = self.udaf_state_managers.pop(key)
+ manager.states.clear()
+ cleared += 1
Review Comment:
Clearing `manager.states` can invalidate in-flight UDAF queries. The DROP
cleanup task is submitted asynchronously after FE removes the function, while
an already-started query can still have a Flight exchange using this same
`UDAFStateManager` for the old function id. If this action runs between that
query's CREATE/ACCUMULATE and later SERIALIZE/FINALIZE/DESTROY calls, those
operations will find their `place_id` entries removed and fail with
`KeyError`/failed UDAF results. Since adding `function_id` to the key already
prevents a recreated function from reusing the old class, cleanup should detach
the manager from `udaf_state_managers` without clearing a manager that active
exchanges may still reference, or add explicit lifecycle/ref-count
coordination. Also consider returning the manager from
`_get_udaf_state_manager()` while still under the lock so a concurrent pop
cannot occur between lookup and return.
##########
be/src/udf/python/python_server.cpp:
##########
@@ -413,7 +414,19 @@ Status PythonServerManager::clear_module_cache(const
std::string& location) {
}
std::string body = fmt::format(R"({{"location": "{}"}})", location);
+ return _broadcast_action_to_processes("clear_module_cache", body,
+ fmt::format("location={}",
location));
+}
+
+void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
+ std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
+ THROW_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache",
body,
+
fmt::format("function_id={}", function_id)));
Review Comment:
This can crash the BE from a DROP FUNCTION cleanup task.
`_broadcast_action_to_processes()` returns an error whenever an active Python
process fails `DoAction`/`Next`, and `THROW_IF_ERROR` converts that into a
`doris::Exception`. The caller is `clean_udf_cache_callback()`, which is run by
`TaskWorkerPool` via `_callback(task)` without any catch boundary, so an
uncaught exception terminates the worker thread/process instead of reporting a
best-effort cache cleanup failure. The existing module-cache cleanup path logs
and continues; this should return `Status` and be handled with a warning (or
use the existing cleanup-style `WARN_IF_ERROR`) rather than throwing out of the
task callback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]