https://github.com/python/cpython/commit/8ed20bc8bbf2e41ae2e2dada4279e9a5aa1b6d9a
commit: 8ed20bc8bbf2e41ae2e2dada4279e9a5aa1b6d9a
branch: 3.12
author: Miss Islington (bot) <[email protected]>
committer: serhiy-storchaka <[email protected]>
date: 2024-02-03T18:39:01+02:00
summary:

[3.12] gh-114959: tarfile: do not ignore errors when extract a directory on top 
of a file (GH-114960) (GH-114963)

Also, add tests common to tarfile and zipfile.
(cherry picked from commit 96bce033c4a4da7112792ba335ef3eb9a3eb0da0)

Co-authored-by: Serhiy Storchaka <[email protected]>

files:
A Lib/test/archiver_tests.py
A Misc/NEWS.d/next/Library/2024-02-03-16-59-25.gh-issue-114959.dCfAG2.rst
M Lib/tarfile.py
M Lib/test/test_tarfile.py
M Lib/test/test_zipfile/test_core.py

diff --git a/Lib/tarfile.py b/Lib/tarfile.py
index 027a19b9ed5994..3bbbcaa6211191 100755
--- a/Lib/tarfile.py
+++ b/Lib/tarfile.py
@@ -2449,7 +2449,8 @@ def makedir(self, tarinfo, targetpath):
                 # later in _extract_member().
                 os.mkdir(targetpath, 0o700)
         except FileExistsError:
-            pass
+            if not os.path.isdir(targetpath):
+                raise
 
     def makefile(self, tarinfo, targetpath):
         """Make a file called targetpath.
diff --git a/Lib/test/archiver_tests.py b/Lib/test/archiver_tests.py
new file mode 100644
index 00000000000000..1a4bbb9e5706c5
--- /dev/null
+++ b/Lib/test/archiver_tests.py
@@ -0,0 +1,155 @@
+"""Tests common to tarfile and zipfile."""
+
+import os
+import sys
+
+from test.support import os_helper
+
+class OverwriteTests:
+
+    def setUp(self):
+        os.makedirs(self.testdir)
+        self.addCleanup(os_helper.rmtree, self.testdir)
+
+    def create_file(self, path, content=b''):
+        with open(path, 'wb') as f:
+            f.write(content)
+
+    def open(self, path):
+        raise NotImplementedError
+
+    def extractall(self, ar):
+        raise NotImplementedError
+
+
+    def test_overwrite_file_as_file(self):
+        target = os.path.join(self.testdir, 'test')
+        self.create_file(target, b'content')
+        with self.open(self.ar_with_file) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.isfile(target))
+        with open(target, 'rb') as f:
+            self.assertEqual(f.read(), b'newcontent')
+
+    def test_overwrite_dir_as_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        os.mkdir(target)
+        with self.open(self.ar_with_dir) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.isdir(target))
+
+    def test_overwrite_dir_as_implicit_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        os.mkdir(target)
+        with self.open(self.ar_with_implicit_dir) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.isdir(target))
+        self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
+        with open(os.path.join(target, 'file'), 'rb') as f:
+            self.assertEqual(f.read(), b'newcontent')
+
+    def test_overwrite_dir_as_file(self):
+        target = os.path.join(self.testdir, 'test')
+        os.mkdir(target)
+        with self.open(self.ar_with_file) as ar:
+            with self.assertRaises(PermissionError if sys.platform == 'win32'
+                                   else IsADirectoryError):
+                self.extractall(ar)
+        self.assertTrue(os.path.isdir(target))
+
+    def test_overwrite_file_as_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        self.create_file(target, b'content')
+        with self.open(self.ar_with_dir) as ar:
+            with self.assertRaises(FileExistsError):
+                self.extractall(ar)
+        self.assertTrue(os.path.isfile(target))
+        with open(target, 'rb') as f:
+            self.assertEqual(f.read(), b'content')
+
+    def test_overwrite_file_as_implicit_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        self.create_file(target, b'content')
+        with self.open(self.ar_with_implicit_dir) as ar:
+            with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
+                                   else NotADirectoryError):
+                self.extractall(ar)
+        self.assertTrue(os.path.isfile(target))
+        with open(target, 'rb') as f:
+            self.assertEqual(f.read(), b'content')
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_file_symlink_as_file(self):
+        # XXX: It is potential security vulnerability.
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        self.create_file(target2, b'content')
+        os.symlink('test2', target)
+        with self.open(self.ar_with_file) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertTrue(os.path.isfile(target2))
+        with open(target2, 'rb') as f:
+            self.assertEqual(f.read(), b'newcontent')
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_broken_file_symlink_as_file(self):
+        # XXX: It is potential security vulnerability.
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        os.symlink('test2', target)
+        with self.open(self.ar_with_file) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertTrue(os.path.isfile(target2))
+        with open(target2, 'rb') as f:
+            self.assertEqual(f.read(), b'newcontent')
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_dir_symlink_as_dir(self):
+        # XXX: It is potential security vulnerability.
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        os.mkdir(target2)
+        os.symlink('test2', target, target_is_directory=True)
+        with self.open(self.ar_with_dir) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertTrue(os.path.isdir(target2))
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_dir_symlink_as_implicit_dir(self):
+        # XXX: It is potential security vulnerability.
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        os.mkdir(target2)
+        os.symlink('test2', target, target_is_directory=True)
+        with self.open(self.ar_with_implicit_dir) as ar:
+            self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertTrue(os.path.isdir(target2))
+        self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
+        with open(os.path.join(target2, 'file'), 'rb') as f:
+            self.assertEqual(f.read(), b'newcontent')
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_broken_dir_symlink_as_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        os.symlink('test2', target, target_is_directory=True)
+        with self.open(self.ar_with_dir) as ar:
+            with self.assertRaises(FileExistsError):
+                self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertFalse(os.path.exists(target2))
+
+    @os_helper.skip_unless_symlink
+    def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
+        target = os.path.join(self.testdir, 'test')
+        target2 = os.path.join(self.testdir, 'test2')
+        os.symlink('test2', target, target_is_directory=True)
+        with self.open(self.ar_with_implicit_dir) as ar:
+            with self.assertRaises(FileExistsError):
+                self.extractall(ar)
+        self.assertTrue(os.path.islink(target))
+        self.assertFalse(os.path.exists(target2))
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index 208ceaf6674c95..71489ea493f94a 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -15,6 +15,7 @@
 import unittest.mock
 import tarfile
 
+from test import archiver_tests
 from test import support
 from test.support import os_helper
 from test.support import script_helper
@@ -4089,6 +4090,38 @@ def valueerror_filter(tarinfo, path):
             self.expect_exception(TypeError)  # errorlevel is not int
 
 
+class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
+    testdir = os.path.join(TEMPDIR, "testoverwrite")
+
+    @classmethod
+    def setUpClass(cls):
+        p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar')
+        cls.addClassCleanup(os_helper.unlink, p)
+        with tarfile.open(p, 'w') as tar:
+            t = tarfile.TarInfo('test')
+            t.size = 10
+            tar.addfile(t, io.BytesIO(b'newcontent'))
+
+        p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar')
+        cls.addClassCleanup(os_helper.unlink, p)
+        with tarfile.open(p, 'w') as tar:
+            tar.addfile(tar.gettarinfo(os.curdir, 'test'))
+
+        p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar')
+        cls.ar_with_implicit_dir = p
+        cls.addClassCleanup(os_helper.unlink, p)
+        with tarfile.open(p, 'w') as tar:
+            t = tarfile.TarInfo('test/file')
+            t.size = 10
+            tar.addfile(t, io.BytesIO(b'newcontent'))
+
+    def open(self, path):
+        return tarfile.open(path, 'r')
+
+    def extractall(self, ar):
+        ar.extractall(self.testdir, filter='fully_trusted')
+
+
 def setUpModule():
     os_helper.unlink(TEMPDIR)
     os.makedirs(TEMPDIR)
diff --git a/Lib/test/test_zipfile/test_core.py 
b/Lib/test/test_zipfile/test_core.py
index a6efcc5f4c20d5..ecc3dd6d0b5aa5 100644
--- a/Lib/test/test_zipfile/test_core.py
+++ b/Lib/test/test_zipfile/test_core.py
@@ -18,6 +18,7 @@
 from tempfile import TemporaryFile
 from random import randint, random, randbytes
 
+from test import archiver_tests
 from test.support import script_helper
 from test.support import (
     findfile, requires_zlib, requires_bz2, requires_lzma,
@@ -1687,6 +1688,33 @@ def _test_extract_hackers_arcnames(self, hacknames):
             unlink(TESTFN2)
 
 
+class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
+    testdir = TESTFN
+
+    @classmethod
+    def setUpClass(cls):
+        p = cls.ar_with_file = TESTFN + '-with-file.zip'
+        cls.addClassCleanup(unlink, p)
+        with zipfile.ZipFile(p, 'w') as zipfp:
+            zipfp.writestr('test', b'newcontent')
+
+        p = cls.ar_with_dir = TESTFN + '-with-dir.zip'
+        cls.addClassCleanup(unlink, p)
+        with zipfile.ZipFile(p, 'w') as zipfp:
+            zipfp.mkdir('test')
+
+        p = cls.ar_with_implicit_dir = TESTFN + '-with-implicit-dir.zip'
+        cls.addClassCleanup(unlink, p)
+        with zipfile.ZipFile(p, 'w') as zipfp:
+            zipfp.writestr('test/file', b'newcontent')
+
+    def open(self, path):
+        return zipfile.ZipFile(path, 'r')
+
+    def extractall(self, ar):
+        ar.extractall(self.testdir)
+
+
 class OtherTests(unittest.TestCase):
     def test_open_via_zip_info(self):
         # Create the ZIP archive
diff --git 
a/Misc/NEWS.d/next/Library/2024-02-03-16-59-25.gh-issue-114959.dCfAG2.rst 
b/Misc/NEWS.d/next/Library/2024-02-03-16-59-25.gh-issue-114959.dCfAG2.rst
new file mode 100644
index 00000000000000..5c6eaa7525e3b0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-02-03-16-59-25.gh-issue-114959.dCfAG2.rst
@@ -0,0 +1,2 @@
+:mod:`tarfile` no longer ignores errors when trying to extract a directory on
+top of a file.

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]

Reply via email to