This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new cdc1ee39d4c [fix] [python client] Better Python garbage collection
management for C++-owned objects (#16535)
cdc1ee39d4c is described below
commit cdc1ee39d4ca1cb3f67fdc5982f07eceb083358a
Author: Zac Bentley <[email protected]>
AuthorDate: Thu Sep 22 09:47:53 2022 -0400
[fix] [python client] Better Python garbage collection management for
C++-owned objects (#16535)
Fixes https://github.com/apache/pulsar/issues/16527
(cherry picked from commit a8b265da323cdb933a268c6fecdd9e3538d8738c)
---
.gitignore | 1 +
pulsar-client-cpp/python/pulsar/__init__.py | 13 +++++-
pulsar-client-cpp/python/pulsar_test.py | 31 ++++++++++++++
pulsar-client-cpp/python/src/config.cc | 65 +++++++++--------------------
pulsar-client-cpp/python/src/utils.h | 20 +++++++++
5 files changed, 82 insertions(+), 48 deletions(-)
diff --git a/.gitignore b/.gitignore
index cfaf6c8ab07..c1ed7e2710b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,6 +46,7 @@ target/
# Python
*.pyc
+.python-version
# Perf tools
*.hgrm
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 96a9f6cf80a..3dec7a0bafd 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -437,8 +437,7 @@ class Client:
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
- if logger:
- conf.set_logger(logger)
+ conf.set_logger(self._prepare_logger(logger) if logger else None)
if use_tls or service_url.startswith('pulsar+ssl://') or
service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
@@ -450,6 +449,16 @@ class Client:
self._client = _pulsar.Client(service_url, conf)
self._consumers = []
+ @staticmethod
+ def _prepare_logger(logger):
+ import logging
+ def log(level, message):
+ old_threads = logging.logThreads
+ logging.logThreads = False
+ logger.log(logging.getLevelName(level), message)
+ logging.logThreads = old_threads
+ return log
+
def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index fd3656b1eed..e0099bee7e5 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -19,6 +19,8 @@
#
+import threading
+import logging
from unittest import TestCase, main
import time
import os
@@ -1200,6 +1202,35 @@ class PulsarTest(TestCase):
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)
+ def test_logger_thread_leaks(self):
+ def _do_connect(close):
+ logger = logging.getLogger(str(threading.current_thread().ident))
+ logger.setLevel(logging.INFO)
+ client = pulsar.Client(
+ service_url="pulsar://localhost:6650",
+ io_threads=4,
+ message_listener_threads=4,
+ operation_timeout_seconds=1,
+ log_conf_file_path=None,
+ authentication=None,
+ logger=logger,
+ )
+
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
+ if close:
+ client.close()
+
+ for should_close in (True, False):
+ self.assertEqual(threading.active_count(), 1, "Explicit close: {};
baseline is 1 thread".format(should_close))
+ _do_connect(should_close)
+ self.assertEqual(threading.active_count(), 1, "Explicit close: {};
synchronous connect doesn't leak threads".format(should_close))
+ threads = []
+ for _ in range(10):
+ threads.append(threading.Thread(target=_do_connect,
args=(should_close)))
+ threads[-1].start()
+ for thread in threads:
+ thread.join()
+ assert threading.active_count() == 1, "Explicit close: {};
threaded connect in parallel doesn't leak threads".format(should_close)
+
def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index 2dee1a1183d..25f4ca51087 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -93,27 +93,21 @@ static ReaderConfiguration&
ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
return conf;
}
-class LoggerWrapper : public Logger {
- PyObject* const _pyLogger;
- const int _pythonLogLevel;
+class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
const std::unique_ptr<Logger> _fallbackLogger;
- static constexpr int _getLogLevelValue(Level level) { return 10 + (level *
10); }
-
public:
- LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger*
fallbackLogger)
- : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel),
_fallbackLogger(fallbackLogger) {
- Py_XINCREF(_pyLogger);
- }
+ LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
+ : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger)
{}
LoggerWrapper(const LoggerWrapper&) = delete;
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
LoggerWrapper& operator=(LoggerWrapper&&) = delete;
- virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }
-
- bool isEnabled(Level level) { return _getLogLevelValue(level) >=
_pythonLogLevel; }
+ bool isEnabled(Level level) {
+ return true; // Python loggers are always enabled; they decide
internally whether or not to log.
+ }
void log(Level level, int line, const std::string& message) {
if (!Py_IsInitialized()) {
@@ -121,66 +115,45 @@ class LoggerWrapper : public Logger {
_fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();
-
+ PyObject *type, *value, *traceback;
+ PyErr_Fetch(&type, &value, &traceback);
try {
switch (level) {
case Logger::LEVEL_DEBUG:
- py::call_method<void>(_pyLogger, "debug",
message.c_str());
+ py::call<void>(_captive, "DEBUG", message.c_str());
break;
case Logger::LEVEL_INFO:
- py::call_method<void>(_pyLogger, "info",
message.c_str());
+ py::call<void>(_captive, "INFO", message.c_str());
break;
case Logger::LEVEL_WARN:
- py::call_method<void>(_pyLogger, "warning",
message.c_str());
+ py::call<void>(_captive, "WARNING", message.c_str());
break;
case Logger::LEVEL_ERROR:
- py::call_method<void>(_pyLogger, "error",
message.c_str());
+ py::call<void>(_captive, "ERROR", message.c_str());
break;
}
-
} catch (const py::error_already_set& e) {
+ PyErr_Print();
_fallbackLogger->log(level, line, message);
}
-
+ PyErr_Restore(type, value, traceback);
PyGILState_Release(state);
}
}
};
-class LoggerWrapperFactory : public LoggerFactory {
+class LoggerWrapperFactory : public LoggerFactory, public
CaptivePythonObjectMixin {
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new
ConsoleLoggerFactory};
- PyObject* _pyLogger;
- Optional<int> _pythonLogLevel{Optional<int>::empty()};
-
- void initializePythonLogLevel() {
- PyGILState_STATE state = PyGILState_Ensure();
-
- try {
- int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
- _pythonLogLevel = Optional<int>::of(level);
- } catch (const py::error_already_set& e) {
- // Failed to get log level from _pyLogger, set it to empty to
fallback to _fallbackLogger
- _pythonLogLevel = Optional<int>::empty();
- }
-
- PyGILState_Release(state);
- }
public:
- LoggerWrapperFactory(py::object pyLogger) {
- _pyLogger = pyLogger.ptr();
- Py_XINCREF(_pyLogger);
- initializePythonLogLevel();
- }
-
- virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
+ LoggerWrapperFactory(py::object pyLogger) :
CaptivePythonObjectMixin(pyLogger.ptr()) {}
Logger* getLogger(const std::string& fileName) {
const auto fallbackLogger =
_fallbackLoggerFactory->getLogger(fileName);
- if (_pythonLogLevel.is_present()) {
- return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(),
fallbackLogger);
- } else {
+ if (_captive == py::object().ptr()) {
return fallbackLogger;
+ } else {
+ return new LoggerWrapper(_captive, fallbackLogger);
}
}
};
diff --git a/pulsar-client-cpp/python/src/utils.h
b/pulsar-client-cpp/python/src/utils.h
index 5be44732fb7..71d20b7ce4a 100644
--- a/pulsar-client-cpp/python/src/utils.h
+++ b/pulsar-client-cpp/python/src/utils.h
@@ -49,3 +49,23 @@ struct CryptoKeyReaderWrapper {
CryptoKeyReaderWrapper();
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const
std::string& privateKeyPath);
};
+
+class CaptivePythonObjectMixin {
+ protected:
+ PyObject* _captive;
+
+ CaptivePythonObjectMixin(PyObject* captive) {
+ _captive = captive;
+ PyGILState_STATE state = PyGILState_Ensure();
+ Py_XINCREF(_captive);
+ PyGILState_Release(state);
+ }
+
+ ~CaptivePythonObjectMixin() {
+ if (Py_IsInitialized()) {
+ PyGILState_STATE state = PyGILState_Ensure();
+ Py_XDECREF(_captive);
+ PyGILState_Release(state);
+ }
+ }
+};