This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new cdc1ee39d4c [fix] [python client] Better Python garbage collection 
management for C++-owned objects (#16535)
cdc1ee39d4c is described below

commit cdc1ee39d4ca1cb3f67fdc5982f07eceb083358a
Author: Zac Bentley <[email protected]>
AuthorDate: Thu Sep 22 09:47:53 2022 -0400

    [fix] [python client] Better Python garbage collection management for 
C++-owned objects (#16535)
    
    Fixes https://github.com/apache/pulsar/issues/16527
    
    (cherry picked from commit a8b265da323cdb933a268c6fecdd9e3538d8738c)
---
 .gitignore                                  |  1 +
 pulsar-client-cpp/python/pulsar/__init__.py | 13 +++++-
 pulsar-client-cpp/python/pulsar_test.py     | 31 ++++++++++++++
 pulsar-client-cpp/python/src/config.cc      | 65 +++++++++--------------------
 pulsar-client-cpp/python/src/utils.h        | 20 +++++++++
 5 files changed, 82 insertions(+), 48 deletions(-)

diff --git a/.gitignore b/.gitignore
index cfaf6c8ab07..c1ed7e2710b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,6 +46,7 @@ target/
 
 # Python
 *.pyc
+.python-version
 
 # Perf tools
 *.hgrm
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 96a9f6cf80a..3dec7a0bafd 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -437,8 +437,7 @@ class Client:
         conf.concurrent_lookup_requests(concurrent_lookup_requests)
         if log_conf_file_path:
             conf.log_conf_file_path(log_conf_file_path)
-        if logger:
-            conf.set_logger(logger)
+        conf.set_logger(self._prepare_logger(logger) if logger else None)
         if use_tls or service_url.startswith('pulsar+ssl://') or 
service_url.startswith('https://'):
             conf.use_tls(True)
         if tls_trust_certs_file_path:
@@ -450,6 +449,16 @@ class Client:
         self._client = _pulsar.Client(service_url, conf)
         self._consumers = []
 
+    @staticmethod
+    def _prepare_logger(logger):
+        import logging
+        def log(level, message):
+            old_threads = logging.logThreads
+            logging.logThreads = False
+            logger.log(logging.getLevelName(level), message)
+            logging.logThreads = old_threads
+        return log
+
     def create_producer(self, topic,
                         producer_name=None,
                         schema=schema.BytesSchema(),
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index fd3656b1eed..e0099bee7e5 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,6 +19,8 @@
 #
 
 
+import threading
+import logging
 from unittest import TestCase, main
 import time
 import os
@@ -1200,6 +1202,35 @@ class PulsarTest(TestCase):
         second_encode = schema.encode(record)
         self.assertEqual(first_encode, second_encode)
 
+    def test_logger_thread_leaks(self):
+        def _do_connect(close):
+            logger = logging.getLogger(str(threading.current_thread().ident))
+            logger.setLevel(logging.INFO)
+            client = pulsar.Client(
+                service_url="pulsar://localhost:6650",
+                io_threads=4,
+                message_listener_threads=4,
+                operation_timeout_seconds=1,
+                log_conf_file_path=None,
+                authentication=None,
+                logger=logger,
+            )
+            
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
+            if close:
+                client.close()
+
+        for should_close in (True, False):
+            self.assertEqual(threading.active_count(), 1, "Explicit close: {}; 
baseline is 1 thread".format(should_close))
+            _do_connect(should_close)
+            self.assertEqual(threading.active_count(), 1, "Explicit close: {}; 
synchronous connect doesn't leak threads".format(should_close))
+            threads = []
+            for _ in range(10):
+                threads.append(threading.Thread(target=_do_connect, 
args=(should_close)))
+                threads[-1].start()
+            for thread in threads:
+                thread.join()
+            assert threading.active_count() == 1, "Explicit close: {}; 
threaded connect in parallel doesn't leak threads".format(should_close)
+
     def _check_value_error(self, fun):
         with self.assertRaises(ValueError):
             fun()
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 2dee1a1183d..25f4ca51087 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -93,27 +93,21 @@ static ReaderConfiguration& 
ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
     return conf;
 }
 
-class LoggerWrapper : public Logger {
-    PyObject* const _pyLogger;
-    const int _pythonLogLevel;
+class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
     const std::unique_ptr<Logger> _fallbackLogger;
 
-    static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 
10); }
-
    public:
-    LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* 
fallbackLogger)
-        : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), 
_fallbackLogger(fallbackLogger) {
-        Py_XINCREF(_pyLogger);
-    }
+    LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
+        : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) 
{}
 
     LoggerWrapper(const LoggerWrapper&) = delete;
     LoggerWrapper(LoggerWrapper&&) noexcept = delete;
     LoggerWrapper& operator=(const LoggerWrapper&) = delete;
     LoggerWrapper& operator=(LoggerWrapper&&) = delete;
 
-    virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }
-
-    bool isEnabled(Level level) { return _getLogLevelValue(level) >= 
_pythonLogLevel; }
+    bool isEnabled(Level level) {
+        return true;  // Python loggers are always enabled; they decide 
internally whether or not to log.
+    }
 
     void log(Level level, int line, const std::string& message) {
         if (!Py_IsInitialized()) {
@@ -121,66 +115,45 @@ class LoggerWrapper : public Logger {
             _fallbackLogger->log(level, line, message);
         } else {
             PyGILState_STATE state = PyGILState_Ensure();
-
+            PyObject *type, *value, *traceback;
+            PyErr_Fetch(&type, &value, &traceback);
             try {
                 switch (level) {
                     case Logger::LEVEL_DEBUG:
-                        py::call_method<void>(_pyLogger, "debug", 
message.c_str());
+                        py::call<void>(_captive, "DEBUG", message.c_str());
                         break;
                     case Logger::LEVEL_INFO:
-                        py::call_method<void>(_pyLogger, "info", 
message.c_str());
+                        py::call<void>(_captive, "INFO", message.c_str());
                         break;
                     case Logger::LEVEL_WARN:
-                        py::call_method<void>(_pyLogger, "warning", 
message.c_str());
+                        py::call<void>(_captive, "WARNING", message.c_str());
                         break;
                     case Logger::LEVEL_ERROR:
-                        py::call_method<void>(_pyLogger, "error", 
message.c_str());
+                        py::call<void>(_captive, "ERROR", message.c_str());
                         break;
                 }
-
             } catch (const py::error_already_set& e) {
+                PyErr_Print();
                 _fallbackLogger->log(level, line, message);
             }
-
+            PyErr_Restore(type, value, traceback);
             PyGILState_Release(state);
         }
     }
 };
 
-class LoggerWrapperFactory : public LoggerFactory {
+class LoggerWrapperFactory : public LoggerFactory, public 
CaptivePythonObjectMixin {
     std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new 
ConsoleLoggerFactory};
-    PyObject* _pyLogger;
-    Optional<int> _pythonLogLevel{Optional<int>::empty()};
-
-    void initializePythonLogLevel() {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
-            _pythonLogLevel = Optional<int>::of(level);
-        } catch (const py::error_already_set& e) {
-            // Failed to get log level from _pyLogger, set it to empty to 
fallback to _fallbackLogger
-            _pythonLogLevel = Optional<int>::empty();
-        }
-
-        PyGILState_Release(state);
-    }
 
    public:
-    LoggerWrapperFactory(py::object pyLogger) {
-        _pyLogger = pyLogger.ptr();
-        Py_XINCREF(_pyLogger);
-        initializePythonLogLevel();
-    }
-
-    virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
+    LoggerWrapperFactory(py::object pyLogger) : 
CaptivePythonObjectMixin(pyLogger.ptr()) {}
 
     Logger* getLogger(const std::string& fileName) {
         const auto fallbackLogger = 
_fallbackLoggerFactory->getLogger(fileName);
-        if (_pythonLogLevel.is_present()) {
-            return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), 
fallbackLogger);
-        } else {
+        if (_captive == py::object().ptr()) {
             return fallbackLogger;
+        } else {
+            return new LoggerWrapper(_captive, fallbackLogger);
         }
     }
 };
diff --git a/pulsar-client-cpp/python/src/utils.h 
b/pulsar-client-cpp/python/src/utils.h
index 5be44732fb7..71d20b7ce4a 100644
--- a/pulsar-client-cpp/python/src/utils.h
+++ b/pulsar-client-cpp/python/src/utils.h
@@ -49,3 +49,23 @@ struct CryptoKeyReaderWrapper {
     CryptoKeyReaderWrapper();
     CryptoKeyReaderWrapper(const std::string& publicKeyPath, const 
std::string& privateKeyPath);
 };
+
+class CaptivePythonObjectMixin {
+   protected:
+    PyObject* _captive;
+
+    CaptivePythonObjectMixin(PyObject* captive) {
+        _captive = captive;
+        PyGILState_STATE state = PyGILState_Ensure();
+        Py_XINCREF(_captive);
+        PyGILState_Release(state);
+    }
+
+    ~CaptivePythonObjectMixin() {
+        if (Py_IsInitialized()) {
+            PyGILState_STATE state = PyGILState_Ensure();
+            Py_XDECREF(_captive);
+            PyGILState_Release(state);
+        }
+    }
+};

Reply via email to