Author: Matti Picus <matti.pi...@gmail.com> Branch: win32-stdlib Changeset: r54314:ba500a555707 Date: 2012-04-12 17:35 +0300 http://bitbucket.org/pypy/pypy/changeset/ba500a555707/
Log: rework zipfile and test_zipfile to respect open files diff --git a/lib-python/2.7/test/test_zipfile.py b/lib-python/modified-2.7/test/test_zipfile.py copy from lib-python/2.7/test/test_zipfile.py copy to lib-python/modified-2.7/test/test_zipfile.py --- a/lib-python/2.7/test/test_zipfile.py +++ b/lib-python/modified-2.7/test/test_zipfile.py @@ -234,8 +234,9 @@ # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: - for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): - self.assertEqual(zipline, line + '\n') + with zipfp.open(TESTFN) as f: + for line, zipline in zip(self.line_gen, f): + self.assertEqual(zipline, line + '\n') def test_readline_read_stored(self): # Issue #7610: calls to readline() interleaved with calls to read(). @@ -340,7 +341,8 @@ produces the expected result.""" with zipfile.ZipFile(TESTFN2, "w") as zipfp: zipfp.write(TESTFN) - self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read()) + with open(TESTFN) as f: + self.assertEqual(zipfp.read(TESTFN), f.read()) @skipUnless(zlib, "requires zlib") def test_per_file_compression(self): @@ -382,7 +384,8 @@ self.assertEqual(writtenfile, correctfile) # make sure correct data is in correct file - self.assertEqual(fdata, open(writtenfile, "rb").read()) + with open(writtenfile, "rb") as fid: + self.assertEqual(fdata, fid.read()) os.remove(writtenfile) # remove the test file subdirectories @@ -401,24 +404,25 @@ else: outfile = os.path.join(os.getcwd(), fpath) - self.assertEqual(fdata, open(outfile, "rb").read()) + with open(outfile, "rb") as fid: + self.assertEqual(fdata, fid.read()) os.remove(outfile) # remove the test file subdirectories shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) def test_writestr_compression(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) - if zlib: - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) + if zlib: + zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) - info = zipfp.getinfo('a.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_STORED) + info = zipfp.getinfo('a.txt') + self.assertEqual(info.compress_type, zipfile.ZIP_STORED) - if zlib: - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) + if zlib: + info = zipfp.getinfo('b.txt') + self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) def zip_test_writestr_permissions(self, f, compression): @@ -646,7 +650,8 @@ def test_write_non_pyfile(self): with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: - open(TESTFN, 'w').write('most definitely not a python file') + with open(TESTFN, 'w') as f: + f.write('most definitely not a python file') self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) os.remove(TESTFN) @@ -795,7 +800,8 @@ self.assertRaises(RuntimeError, zipf.open, "foo.txt") self.assertRaises(RuntimeError, zipf.testzip) self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") - open(TESTFN, 'w').write('zipfile test data') + with open(TESTFN, 'w') as fp: + fp.write('zipfile test data') self.assertRaises(RuntimeError, zipf.write, TESTFN) def test_bad_constructor_mode(self): @@ -803,7 +809,6 @@ self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") def test_bad_open_mode(self): - """Check that bad modes passed to ZipFile.open are caught.""" with zipfile.ZipFile(TESTFN, mode="w") as zipf: zipf.writestr("foo.txt", "O, for a Muse of Fire!") @@ -851,7 +856,6 @@ def test_comments(self): """Check that comments on the archive are handled properly.""" - # check default comment is empty with zipfile.ZipFile(TESTFN, mode="w") as zipf: self.assertEqual(zipf.comment, '') @@ -953,14 +957,16 @@ with zipfile.ZipFile(TESTFN, mode="w") as zipf: pass try: - zipf = zipfile.ZipFile(TESTFN, mode="r") + with zipfile.ZipFile(TESTFN, mode="r") as zipf: + pass except zipfile.BadZipfile: self.fail("Unable to create empty ZIP file in 'w' mode") with zipfile.ZipFile(TESTFN, mode="a") as zipf: pass try: - zipf = zipfile.ZipFile(TESTFN, mode="r") + with zipfile.ZipFile(TESTFN, mode="r") as zipf: + pass except: self.fail("Unable to create empty ZIP file in 'a' mode") @@ -1160,6 +1166,8 @@ data1 += zopen1.read(500) data2 += zopen2.read(500) self.assertEqual(data1, data2) + zopen1.close() + zopen2.close() def test_different_file(self): # Verify that (when the ZipFile is in control of creating file objects) @@ -1207,9 +1215,9 @@ def test_store_dir(self): os.mkdir(os.path.join(TESTFN2, "x")) - zipf = zipfile.ZipFile(TESTFN, "w") - zipf.write(os.path.join(TESTFN2, "x"), "x") - self.assertTrue(zipf.filelist[0].filename.endswith("x/")) + with zipfile.ZipFile(TESTFN, "w") as zipf: + zipf.write(os.path.join(TESTFN2, "x"), "x") + self.assertTrue(zipf.filelist[0].filename.endswith("x/")) def tearDown(self): shutil.rmtree(TESTFN2) @@ -1226,7 +1234,8 @@ for n, s in enumerate(self.seps): self.arcdata[s] = s.join(self.line_gen) + s self.arcfiles[s] = '%s-%d' % (TESTFN, n) - open(self.arcfiles[s], "wb").write(self.arcdata[s]) + with open(self.arcfiles[s], "wb") as f: + f.write(self.arcdata[s]) def make_test_archive(self, f, compression): # Create the ZIP archive @@ -1295,8 +1304,9 @@ # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): - self.assertEqual(zipline, line + '\n') + with zipfp.open(fn, "rU") as f: + for line, zipline in zip(self.line_gen, f): + self.assertEqual(zipline, line + '\n') def test_read_stored(self): for f in (TESTFN2, TemporaryFile(), StringIO()): diff --git a/lib-python/2.7/zipfile.py b/lib-python/modified-2.7/zipfile.py copy from lib-python/2.7/zipfile.py copy to lib-python/modified-2.7/zipfile.py --- a/lib-python/2.7/zipfile.py +++ b/lib-python/modified-2.7/zipfile.py @@ -648,6 +648,10 @@ return data +class ZipExtFileWithClose(ZipExtFile): + def close(self): + self._fileobj.close() + class ZipFile: """ Class with methods to open, read, write, close, list zip files. @@ -843,9 +847,9 @@ try: # Read by chunks, to avoid an OverflowError or a # MemoryError with very large embedded files. - f = self.open(zinfo.filename, "r") - while f.read(chunk_size): # Check CRC-32 - pass + with self.open(zinfo.filename, "r") as f: + while f.read(chunk_size): # Check CRC-32 + pass except BadZipfile: return zinfo.filename @@ -864,7 +868,9 @@ def read(self, name, pwd=None): """Return file bytes (as a string) for name.""" - return self.open(name, "r", pwd).read() + with self.open(name, "r", pwd) as f: + retval = f.read() + return retval def open(self, name, mode="r", pwd=None): """Return file-like object for 'name'.""" @@ -881,59 +887,66 @@ else: zef_file = open(self.filename, 'rb') - # Make sure we have an info object - if isinstance(name, ZipInfo): - # 'name' is already an info object - zinfo = name + try: + # Make sure we have an info object + if isinstance(name, ZipInfo): + # 'name' is already an info object + zinfo = name + else: + # Get info object for name + zinfo = self.getinfo(name) + + zef_file.seek(zinfo.header_offset, 0) + + # Skip the file header: + fheader = zef_file.read(sizeFileHeader) + if fheader[0:4] != stringFileHeader: + raise BadZipfile, "Bad magic number for file header" + + fheader = struct.unpack(structFileHeader, fheader) + fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) + if fheader[_FH_EXTRA_FIELD_LENGTH]: + zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) + + if fname != zinfo.orig_filename: + raise BadZipfile, \ + 'File name in directory "%s" and header "%s" differ.' % ( + zinfo.orig_filename, fname) + + # check for encrypted flag & handle password + is_encrypted = zinfo.flag_bits & 0x1 + zd = None + if is_encrypted: + if not pwd: + pwd = self.pwd + if not pwd: + raise RuntimeError, "File %s is encrypted, " \ + "password required for extraction" % name + + zd = _ZipDecrypter(pwd) + # The first 12 bytes in the cypher stream is an encryption header + # used to strengthen the algorithm. The first 11 bytes are + # completely random, while the 12th contains the MSB of the CRC, + # or the MSB of the file time depending on the header type + # and is used to check the correctness of the password. + bytes = zef_file.read(12) + h = map(zd, bytes[0:12]) + if zinfo.flag_bits & 0x8: + # compare against the file type from extended local headers + check_byte = (zinfo._raw_time >> 8) & 0xff + else: + # compare against the CRC otherwise + check_byte = (zinfo.CRC >> 24) & 0xff + if ord(h[11]) != check_byte: + raise RuntimeError("Bad password for file", name) + except: + if not self._filePassed: + zef_file.close() + raise + if self._filePassed: + return ZipExtFile(zef_file, mode, zinfo, zd) else: - # Get info object for name - zinfo = self.getinfo(name) - - zef_file.seek(zinfo.header_offset, 0) - - # Skip the file header: - fheader = zef_file.read(sizeFileHeader) - if fheader[0:4] != stringFileHeader: - raise BadZipfile, "Bad magic number for file header" - - fheader = struct.unpack(structFileHeader, fheader) - fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) - if fheader[_FH_EXTRA_FIELD_LENGTH]: - zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) - - if fname != zinfo.orig_filename: - raise BadZipfile, \ - 'File name in directory "%s" and header "%s" differ.' % ( - zinfo.orig_filename, fname) - - # check for encrypted flag & handle password - is_encrypted = zinfo.flag_bits & 0x1 - zd = None - if is_encrypted: - if not pwd: - pwd = self.pwd - if not pwd: - raise RuntimeError, "File %s is encrypted, " \ - "password required for extraction" % name - - zd = _ZipDecrypter(pwd) - # The first 12 bytes in the cypher stream is an encryption header - # used to strengthen the algorithm. The first 11 bytes are - # completely random, while the 12th contains the MSB of the CRC, - # or the MSB of the file time depending on the header type - # and is used to check the correctness of the password. - bytes = zef_file.read(12) - h = map(zd, bytes[0:12]) - if zinfo.flag_bits & 0x8: - # compare against the file type from extended local headers - check_byte = (zinfo._raw_time >> 8) & 0xff - else: - # compare against the CRC otherwise - check_byte = (zinfo.CRC >> 24) & 0xff - if ord(h[11]) != check_byte: - raise RuntimeError("Bad password for file", name) - - return ZipExtFile(zef_file, mode, zinfo, zd) + return ZipExtFileWithClose(zef_file, mode, zinfo, zd) def extract(self, member, path=None, pwd=None): """Extract a member from the archive to the current working directory, @@ -989,7 +1002,6 @@ if not os.path.isdir(targetpath): os.mkdir(targetpath) return targetpath - source = self.open(member, pwd=pwd) target = file(targetpath, "wb") shutil.copyfileobj(source, target) _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit