commit:     7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2
Author:     Arthur Zamarin <arthurzam <AT> gentoo <DOT> org>
AuthorDate: Mon Oct 10 12:42:26 2022 +0000
Commit:     Arthur Zamarin <arthurzam <AT> gentoo <DOT> org>
CommitDate: Mon Oct 10 15:55:00 2022 +0000
URL:        
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=7c1d50ad

compression.__init__: add tests

Signed-off-by: Arthur Zamarin <arthurzam <AT> gentoo.org>

 src/snakeoil/compression/__init__.py | 27 ++++-------
 src/snakeoil/compression/_util.py    | 24 +++++-----
 tests/compression/__init__.py        | 11 +++++
 tests/compression/test_bzip2.py      | 10 +---
 tests/compression/test_init.py       | 88 ++++++++++++++++++++++++++++++++++++
 5 files changed, 121 insertions(+), 39 deletions(-)

diff --git a/src/snakeoil/compression/__init__.py 
b/src/snakeoil/compression/__init__.py
index b250b1af..4b4a437c 100644
--- a/src/snakeoil/compression/__init__.py
+++ b/src/snakeoil/compression/__init__.py
@@ -1,9 +1,9 @@
 import shlex
+from functools import cached_property
 from importlib import import_module
 
-from .. import klass
+from .. import process
 from ..cli.exceptions import UserException
-from ..process import CommandNotFound, find_binary
 from ..process.spawn import spawn_get_output
 
 
@@ -12,14 +12,10 @@ class _transform_source:
     def __init__(self, name):
         self.name = name
 
-    @klass.jit_attr
+    @cached_property
     def module(self):
         return import_module(f'snakeoil.compression._{self.name}')
 
-    @klass.jit_attr
-    def parallelizable(self):
-        return bool(getattr(self.module, 'parallelizable', False))
-
     def compress_data(self, data, level, parallelize=False):
         parallelize = parallelize and self.module.parallelizable
         return self.module.compress_data(data, level, parallelize=parallelize)
@@ -81,7 +77,7 @@ class ArComp:
     def __init_subclass__(cls, **kwargs):
         """Initialize result subclasses and register archive extensions."""
         super().__init_subclass__(**kwargs)
-        if not all((cls.binary, cls.default_unpack_cmd, cls.exts)):
+        if not all((cls.binary, cls.default_unpack_cmd, cls.exts)): # pragma: 
no cover
             raise ValueError(f'class missing required attrs: {cls!r}')
         for ext in cls.exts:
             cls.known_exts[ext] = cls
@@ -89,13 +85,13 @@ class ArComp:
     def __init__(self, path, ext=None):
         self.path = path
 
-    @klass.jit_attr
+    @cached_property
     def _unpack_cmd(self):
         for b in self.binary:
             try:
-                binary = find_binary(b)
+                binary = process.find_binary(b)
                 break
-            except CommandNotFound:
+            except process.CommandNotFound:
                 continue
         else:
             choices = ', '.join(self.binary)
@@ -107,9 +103,6 @@ class ArComp:
     def unpack(self, dest=None, **kwargs):
         raise NotImplementedError
 
-    def create(self, dest):
-        raise NotImplementedError
-
 
 class _Archive:
     """Generic archive format support."""
@@ -155,16 +148,16 @@ class _Tar(_Archive, ArComp):
     compress_binary = None
     default_unpack_cmd = '{binary} xf "{path}"'
 
-    @klass.jit_attr
+    @cached_property
     def _unpack_cmd(self):
         cmd = super()._unpack_cmd
         if self.compress_binary is not None:
             for b in self.compress_binary:
                 try:
-                    find_binary(b)
+                    process.find_binary(b)
                     cmd += f' --use-compress-program={b}'
                     break
-                except CommandNotFound:
+                except process.CommandNotFound:
                     pass
             else:
                 choices = ', '.join(self.compress_binary)

diff --git a/src/snakeoil/compression/_util.py 
b/src/snakeoil/compression/_util.py
index 6a3c7258..e1af5aef 100644
--- a/src/snakeoil/compression/_util.py
+++ b/src/snakeoil/compression/_util.py
@@ -22,7 +22,7 @@ def _drive_process(args, mode, data):
 
 
 def compress_data(binary, data, compresslevel=9, extra_args=()):
-    args = [binary, '-%ic' % compresslevel]
+    args = [binary, f'-{compresslevel}c']
     args.extend(extra_args)
     return _drive_process(args, 'compression', data)
 
@@ -36,9 +36,7 @@ def decompress_data(binary, data, extra_args=()):
 class _process_handle:
 
     def __init__(self, handle, args, is_read=False):
-        self.mode = 'wb'
-        if is_read:
-            self.mode = 'rb'
+        self.mode = 'rb' if is_read else 'wb'
 
         self.args = tuple(args)
         self.is_read = is_read
@@ -55,8 +53,7 @@ class _process_handle:
         elif not isinstance(handle, int):
             if not hasattr(handle, 'fileno'):
                 raise TypeError(
-                    "handle %r isn't a string, integer, and lacks a fileno "
-                    "method" % (handle,))
+                    f"handle {handle!r} isn't a string, integer, and lacks a 
fileno method")
             handle = handle.fileno()
 
         try:
@@ -108,8 +105,8 @@ class _process_handle:
         if fwd_seek < 0:
             if self._allow_reopen is None:
                 raise TypeError(
-                    "instance %s can't do negative seeks: asked for %i, "
-                    "was at %i" % (self, position, self.position))
+                    f"instance {self} can't do negative seeks: "
+                    f"asked for {position}, was at {self.position}")
             self._terminate()
             self._open_handle(self._allow_reopen)
             return self.seek(position)
@@ -142,16 +139,17 @@ class _process_handle:
     def _terminate(self):
         try:
             self._process.terminate()
-        except EnvironmentError as e:
+        except EnvironmentError as exc:
             # allow no such process only.
-            if e.errno != errno.ESRCH:
+            if exc.errno != errno.ESRCH:
                 raise
 
     def close(self):
+        if not hasattr(self, '_process'):
+            return
         if self._process.returncode is not None:
             if self._process.returncode != 0:
-                raise Exception("%s invocation had non zero exit: %i" %
-                                (self.args, self._process.returncode))
+                raise Exception(f"{self.args} invocation had non zero exit: 
{self._process.returncode}")
             return
 
         self.handle.close()
@@ -165,7 +163,7 @@ class _process_handle:
 
 
 def compress_handle(binary_path, handle, compresslevel=9, extra_args=()):
-    args = [binary_path, '-%ic' % compresslevel]
+    args = [binary_path, f'-{compresslevel}c']
     args.extend(extra_args)
     return _process_handle(handle, args, False)
 

diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py
index e69de29b..178d4b64 100644
--- a/tests/compression/__init__.py
+++ b/tests/compression/__init__.py
@@ -0,0 +1,11 @@
+from unittest.mock import patch
+
+from snakeoil.process import CommandNotFound, find_binary
+
+def hide_binary(*binaries: str):
+    def mock_find_binary(name):
+        if name in binaries:
+            raise CommandNotFound(name)
+        return find_binary(name)
+
+    return patch('snakeoil.process.find_binary', side_effect=mock_find_binary)

diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py
index f8f1d1fb..d11d61fa 100644
--- a/tests/compression/test_bzip2.py
+++ b/tests/compression/test_bzip2.py
@@ -1,21 +1,13 @@
 import importlib
 from bz2 import decompress
 from pathlib import Path
-from unittest import mock
 
 import pytest
 from snakeoil.compression import _bzip2
 from snakeoil.process import CommandNotFound, find_binary
 from snakeoil.test import hide_imports
 
-
-def hide_binary(*binaries: str):
-    def mock_find_binary(name):
-        if name in binaries:
-            raise CommandNotFound(name)
-        return find_binary(name)
-
-    return mock.patch('snakeoil.process.find_binary', 
side_effect=mock_find_binary)
+from . import hide_binary
 
 
 def test_no_native():

diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py
new file mode 100644
index 00000000..0726571b
--- /dev/null
+++ b/tests/compression/test_init.py
@@ -0,0 +1,88 @@
+import shutil
+import subprocess
+import sys
+
+import pytest
+from snakeoil.compression import ArComp, ArCompError, _TarBZ2
+from snakeoil.contexts import chdir
+
+from . import hide_binary
+
+
+@pytest.mark.skipif(sys.platform == "darwin", reason="darwin fails with bzip2")
+class TestArComp:
+
+    @pytest.fixture(scope='class')
+    def tar_file(self, tmp_path_factory):
+        data = tmp_path_factory.mktemp("data")
+        (data / 'file1').write_text('Hello world')
+        (data / 'file2').write_text('Larry the Cow')
+        path = data / 'test 1.tar'
+        subprocess.run(['tar', 'cf', str(path), 'file1', 'file2'], cwd=data, 
check=True)
+        (data / 'file1').unlink()
+        (data / 'file2').unlink()
+        return str(path)
+
+    @pytest.fixture(scope='class')
+    def tar_bz2_file(self, tar_file):
+        subprocess.run(['bzip2', '-z', '-k', tar_file], check=True)
+        return tar_file + ".bz2"
+
+    @pytest.fixture(scope='class')
+    def tbz2_file(self, tar_bz2_file):
+        new_path = tar_bz2_file.replace('.tar.bz2', '.tbz2')
+        shutil.copyfile(tar_bz2_file, new_path)
+        return new_path
+
+    @pytest.fixture(scope='class')
+    def lzma_file(self, tmp_path_factory):
+        data = (tmp_path_factory.mktemp("data") / 'test 2.lzma')
+        with data.open('wb') as f:
+            subprocess.run(['lzma'], check=True, input=b'Hello world', 
stdout=f)
+        return str(data)
+
+    def test_unknown_extenstion(self, tmp_path):
+        file = tmp_path / 'test.file'
+        with pytest.raises(ArCompError, match='unknown compression file 
extension'):
+            ArComp(file, ext='.foo')
+
+    def test_missing_tar(self, tmp_path, tar_file):
+        with hide_binary('tar'), chdir(tmp_path):
+            with pytest.raises(ArCompError, match='required binary not found'):
+                ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
+
+    def test_tar(self, tmp_path, tar_file):
+        with chdir(tmp_path):
+            ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
+        assert (tmp_path / 'file1').read_text() == 'Hello world'
+        assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+    def test_tar_bz2(self, tmp_path, tar_bz2_file):
+        with chdir(tmp_path):
+            ArComp(tar_bz2_file, ext='.tar.bz2').unpack(dest=tmp_path)
+        assert (tmp_path / 'file1').read_text() == 'Hello world'
+        assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+    def test_tbz2(self, tmp_path, tbz2_file):
+        with chdir(tmp_path):
+            ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+        assert (tmp_path / 'file1').read_text() == 'Hello world'
+        assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+    def test_fallback_tbz2(self, tmp_path, tbz2_file):
+        with hide_binary(*_TarBZ2.compress_binary[:-1]):
+            with chdir(tmp_path):
+                ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+            assert (tmp_path / 'file1').read_text() == 'Hello world'
+            assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+    def test_no_fallback_tbz2(self, tmp_path, tbz2_file):
+        with hide_binary(*_TarBZ2.compress_binary), chdir(tmp_path):
+            with pytest.raises(ArCompError, match='no compression binary'):
+                ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+
+    def test_lzma(self, tmp_path, lzma_file):
+        dest = tmp_path / 'file'
+        with chdir(tmp_path):
+            ArComp(lzma_file, ext='.lzma').unpack(dest=dest)
+        assert (dest).read_bytes() == b'Hello world'

Reply via email to