https://github.com/python/cpython/commit/c13b59204af562bfb022eb8f6a5c03eb82659531
commit: c13b59204af562bfb022eb8f6a5c03eb82659531
branch: main
author: Alper <[email protected]>
committer: kumaraditya303 <[email protected]>
date: 2025-11-12T02:01:55+05:30
summary:

gh-116738: use `PyMutex` in `lzma` module (#140711)

Co-authored-by: Kumar Aditya <[email protected]>

files:
A Lib/test/test_free_threading/test_lzma.py
M Modules/_lzmamodule.c

diff --git a/Lib/test/test_free_threading/test_lzma.py 
b/Lib/test/test_free_threading/test_lzma.py
new file mode 100644
index 00000000000000..38d7e5db489426
--- /dev/null
+++ b/Lib/test/test_free_threading/test_lzma.py
@@ -0,0 +1,56 @@
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+lzma = import_helper.import_module("lzma")
+from lzma import LZMACompressor, LZMADecompressor
+
+from test.test_lzma import INPUT
+
+
+NTHREADS = 10
+
+
+@threading_helper.requires_working_threading()
+class TestLZMA(unittest.TestCase):
+    def test_compressor(self):
+        lzc = LZMACompressor()
+
+        # First compress() outputs LZMA header
+        header = lzc.compress(INPUT)
+        self.assertGreater(len(header), 0)
+
+        def worker():
+            # it should return empty bytes as it buffers data internally
+            data = lzc.compress(INPUT)
+            self.assertEqual(data, b"")
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
+        full_compressed = header + lzc.flush()
+        decompressed = lzma.decompress(full_compressed)
+        # The decompressed data should be INPUT repeated NTHREADS times
+        self.assertEqual(decompressed, INPUT * NTHREADS)
+
+    def test_decompressor(self):
+        chunk_size = 128
+        chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
+        input_data = b"".join(chunks)
+        compressed = lzma.compress(input_data)
+
+        lzd = LZMADecompressor()
+        output = []
+
+        def worker():
+            data = lzd.decompress(compressed, chunk_size)
+            self.assertEqual(len(data), chunk_size)
+            output.append(data)
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS)
+        self.assertEqual(len(output), NTHREADS)
+        # Verify the expected chunks (order doesn't matter due to append race)
+        self.assertSetEqual(set(output), set(chunks))
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Modules/_lzmamodule.c b/Modules/_lzmamodule.c
index 6fc072f6d0a382..5876623399837b 100644
--- a/Modules/_lzmamodule.c
+++ b/Modules/_lzmamodule.c
@@ -72,13 +72,6 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
 }
 
 
-#define ACQUIRE_LOCK(obj) do { \
-    if (!PyThread_acquire_lock((obj)->lock, 0)) { \
-        Py_BEGIN_ALLOW_THREADS \
-        PyThread_acquire_lock((obj)->lock, 1); \
-        Py_END_ALLOW_THREADS \
-    } } while (0)
-#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
 
 typedef struct {
     PyTypeObject *lzma_compressor_type;
@@ -111,7 +104,7 @@ typedef struct {
     lzma_allocator alloc;
     lzma_stream lzs;
     int flushed;
-    PyThread_type_lock lock;
+    PyMutex mutex;
 } Compressor;
 
 typedef struct {
@@ -124,7 +117,7 @@ typedef struct {
     char needs_input;
     uint8_t *input_buffer;
     size_t input_buffer_size;
-    PyThread_type_lock lock;
+    PyMutex mutex;
 } Decompressor;
 
 #define Compressor_CAST(op)     ((Compressor *)(op))
@@ -617,14 +610,14 @@ _lzma_LZMACompressor_compress_impl(Compressor *self, 
Py_buffer *data)
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->flushed) {
         PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
     }
     else {
         result = compress(self, data->buf, data->len, LZMA_RUN);
     }
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -644,14 +637,14 @@ _lzma_LZMACompressor_flush_impl(Compressor *self)
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->flushed) {
         PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
     } else {
         self->flushed = 1;
         result = compress(self, NULL, 0, LZMA_FINISH);
     }
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -820,12 +813,7 @@ Compressor_new(PyTypeObject *type, PyObject *args, 
PyObject *kwargs)
     self->alloc.free = PyLzma_Free;
     self->lzs.allocator = &self->alloc;
 
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
+    self->mutex = (PyMutex){0};
 
     self->flushed = 0;
     switch (format) {
@@ -867,10 +855,8 @@ static void
 Compressor_dealloc(PyObject *op)
 {
     Compressor *self = Compressor_CAST(op);
+    assert(!PyMutex_IsLocked(&self->mutex));
     lzma_end(&self->lzs);
-    if (self->lock != NULL) {
-        PyThread_free_lock(self->lock);
-    }
     PyTypeObject *tp = Py_TYPE(self);
     tp->tp_free(self);
     Py_DECREF(tp);
@@ -1146,12 +1132,12 @@ _lzma_LZMADecompressor_decompress_impl(Decompressor 
*self, Py_buffer *data,
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->eof)
         PyErr_SetString(PyExc_EOFError, "Already at end of stream");
     else
         result = decompress(self, data->buf, data->len, max_length);
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -1244,12 +1230,7 @@ _lzma_LZMADecompressor_impl(PyTypeObject *type, int 
format,
     self->lzs.allocator = &self->alloc;
     self->lzs.next_in = NULL;
 
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
+    self->mutex = (PyMutex){0};
 
     self->check = LZMA_CHECK_UNKNOWN;
     self->needs_input = 1;
@@ -1304,14 +1285,13 @@ static void
 Decompressor_dealloc(PyObject *op)
 {
     Decompressor *self = Decompressor_CAST(op);
+    assert(!PyMutex_IsLocked(&self->mutex));
+
     if(self->input_buffer != NULL)
         PyMem_Free(self->input_buffer);
 
     lzma_end(&self->lzs);
     Py_CLEAR(self->unused_data);
-    if (self->lock != NULL) {
-        PyThread_free_lock(self->lock);
-    }
     PyTypeObject *tp = Py_TYPE(self);
     tp->tp_free(self);
     Py_DECREF(tp);

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to