Author: Richard Plangger <planri...@gmail.com> Branch: py3.5 Changeset: r87440:43ca4b14e86e Date: 2016-09-29 11:00 +0200 http://bitbucket.org/pypy/pypy/changeset/43ca4b14e86e/
Log: merge bz2 and lzma changes diff --git a/lib_pypy/_lzma.py b/lib_pypy/_lzma.py --- a/lib_pypy/_lzma.py +++ b/lib_pypy/_lzma.py @@ -445,6 +445,9 @@ self.eof = False self.lzs = _new_lzma_stream() self._bufsiz = max(8192, io.DEFAULT_BUFFER_SIZE) + self.needs_input = True + self._input_buffer = ffi.NULL + self._input_buffer_size = 0 if format == FORMAT_AUTO: catch_lzma_error(m.lzma_auto_decoder, self.lzs, memlimit, decoder_flags) @@ -473,9 +476,67 @@ else: raise ValueError("invalid...") - def decompress(self, data): + def pre_decompress_left_data(self, buf, buf_size): + # in this case there is data left that needs to be processed before the first + # argument can be processed + + lzs = self.lzs + + addr_input_buffer = int(ffi.cast('uintptr_t', self._input_buffer)) + addr_next_in = int(ffi.cast('uintptr_t', lzs.next_in)) + avail_now = (addr_input_buffer + self._input_buffer_size) - \ + (addr_next_in + lzs.avail_in) + avail_total = self._input_buffer_size - lzs.avail_in + if avail_total < buf_size: + # resize the buffer, it is too small! + offset = addr_next_in - addr_input_buffer + new_size = self._input_buffer_size + buf_size - avail_now + # there is no realloc? + tmp = ffi.cast("uint8_t*",m.malloc(new_size)) + if tmp == ffi.NULL: + raise MemoryError + ffi.memmove(tmp, lzs.next_in, lzs.avail_in) + lzs.next_in = tmp + m.free(self._input_buffer) + self._input_buffer = tmp + self._input_buffer_size = new_size + elif avail_now < buf_size: + # the buffer is not too small, but we cannot append it! + # move all data to the front + ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in) + lzs.next_in = self._input_buffer + ffi.memmove(lzs.next_in+lzs.avail_in, buf, buf_size) + lzs.avail_in += buf_size + return lzs.next_in, lzs.avail_in + + def post_decompress_avail_data(self): + lzs = self.lzs + # free buffer it is to small + if self._input_buffer is not ffi.NULL and \ + self._input_buffer_size < lzs.avail_in: + m.free(self._input_buffer) + self._input_buffer = ffi.NONE + + # allocate if necessary + if self._input_buffer is ffi.NULL: + self._input_buffer = ffi.cast("uint8_t*",m.malloc(lzs.avail_in)) + if self._input_buffer == ffi.NULL: + raise MemoryError + self._input_buffer_size = lzs.avail_in + + ffi.memmove(self._input_buffer, lzs.next_in, lzs.avail_in) + lzs.next_in = self._input_buffer + + def clear_input_buffer(self): + # clean the buffer + if self._input_buffer is not ffi.NULL: + m.free(self._input_buffer) + self._input_buffer = ffi.NULL + self._input_buffer_size = 0 + + def decompress(self, data, max_length=-1): """ - decompress(data) -> bytes + decompress(data, max_length=-1) -> bytes Provide data to the decompressor object. Returns a chunk of decompressed data if possible, or b"" otherwise. @@ -484,21 +545,53 @@ reached raises an EOFError. Any data found after the end of the stream is ignored, and saved in the unused_data attribute. """ + if not isinstance(max_length, int): + raise TypeError("max_length parameter object cannot be interpreted as an integer") with self.lock: if self.eof: raise EOFError("Already...") - return self._decompress(data) + lzs = self.lzs + data = to_bytes(data) + buf = ffi.new('uint8_t[]', data) + buf_size = len(data) - def _decompress(self, data): + if lzs.next_in: + buf, buf_size = self.pre_decompress_left_data(buf, buf_size) + used__input_buffer = True + else: + lzs.avail_in = buf_size + lzs.next_in = ffi.cast("uint8_t*",buf) + used__input_buffer = False + + # actual decompression + result = self._decompress(buf, buf_size, max_length) + + if self.eof: + self.needs_input = False + if lzs.avail_in > 0: + self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:] + self.clear_input_buffer() + elif lzs.avail_in == 0: + # completed successfully! + self.needs_input = True + lzs.next_in = ffi.NULL + self.clear_input_buffer() + else: + self.needs_input = False + if not used__input_buffer: + self.post_decompress_avail_data() + + return result + + def _decompress(self, buf, buf_len, max_length): lzs = self.lzs - # we need in_ so that lzs.next_in doesn't get garbage collected until - # in_ goes out of scope - data = to_bytes(data) - lzs.next_in = in_ = ffi.new('char[]', data) - lzs.avail_in = len(data) + lzs.next_in = buf + lzs.avail_in = buf_len bufsiz = self._bufsiz + if not (max_length < 0 or max_length > io.DEFAULT_BUFFER_SIZE): + bufsiz = max_length lzs.next_out = orig_out = m.malloc(bufsiz) if orig_out == ffi.NULL: @@ -519,13 +612,13 @@ if ret == m.LZMA_STREAM_END: self.eof = True - if lzs.avail_in > 0: - self.unused_data = ffi.buffer(lzs.next_in, lzs.avail_in)[:] break elif lzs.avail_in == 0: # it ate everything break elif lzs.avail_out == 0: + if data_size == max_length: + break # ran out of space in the output buffer, let's grow it bufsiz += (bufsiz >> 3) + 6 next_out = m.realloc(orig_out, bufsiz) @@ -639,9 +732,9 @@ lzs = self.lzs - lzs.next_in = input_ = ffi.new('char[]', to_bytes(data)) + lzs.next_in = input_ = ffi.new('uint8_t[]', to_bytes(data)) lzs.avail_in = len(data) - outs = [ffi.new('char[]', BUFSIZ)] + outs = [ffi.new('uint8_t[]', BUFSIZ)] lzs.next_out, = outs lzs.avail_out = BUFSIZ @@ -658,7 +751,7 @@ # ran out of space in the output buffer #siz = (BUFSIZ << 1) + 6 siz = 512 - outs.append(ffi.new('char[]', siz)) + outs.append(ffi.new('uint8_t[]', siz)) lzs.next_out = outs[-1] lzs.avail_out = siz last_out = outs.pop() diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py --- a/pypy/module/bz2/interp_bz2.py +++ b/pypy/module/bz2/interp_bz2.py @@ -96,9 +96,11 @@ BZ_SEQUENCE_ERROR = cConfig.BZ_SEQUENCE_ERROR if BUFSIZ < 8192: - SMALLCHUNK = 8192 + INITIAL_BUFFER_SIZE = 8192 else: - SMALLCHUNK = BUFSIZ + INITIAL_BUFFER_SIZE = 8192 + +UINT_MAX = 2**32-1 if rffi.sizeof(rffi.INT) > 4: BIGCHUNK = 512 * 32 @@ -187,12 +189,21 @@ encapsulate the logic of setting up the fields of 'bzs' and allocating raw memory as needed. """ - def __init__(self, bzs, initial_size=SMALLCHUNK): + def __init__(self, bzs, initial_size=INITIAL_BUFFER_SIZE, max_length=-1): # when the constructor is called, allocate a piece of memory # of length 'piece_size' and make bzs ready to dump there. self.temp = [] self.bzs = bzs - self._allocate_chunk(initial_size) + self.max_length = max_length + if max_length < 0 or max_length >= initial_size: + size = initial_size + else: + size = max_length + self._allocate_chunk(size) + self.left = 0 + + def get_data_size(self): + return self.current_size - rffi.getintfield(self.bzs, 'c_avail_out') def _allocate_chunk(self, size): self.raw_buf, self.gc_buf, self.case_num = rffi.alloc_buffer(size) @@ -214,7 +225,10 @@ def prepare_next_chunk(self): size = self.current_size self.temp.append(self._get_chunk(size)) - self._allocate_chunk(_new_buffer_size(size)) + newsize = size + if self.max_length == -1: + newsize = _new_buffer_size(size) + self._allocate_chunk(newsize) def make_result_string(self): count_unoccupied = rffi.getintfield(self.bzs, 'c_avail_out') @@ -357,7 +371,6 @@ W_BZ2Decompressor.__init__(x, space) return space.wrap(x) - class W_BZ2Decompressor(W_Root): """BZ2Decompressor() -> decompressor object @@ -372,6 +385,9 @@ try: self.running = False self.unused_data = "" + self.needs_input = True + self.input_buffer = "" + self.left_to_process = 0 self._init_bz2decomp() except: @@ -397,15 +413,56 @@ def descr_getstate(self): raise oefmt(self.space.w_TypeError, "cannot serialize '%T' object", self) + def needs_input_w(self, space): + """ True if more input is needed before more decompressed + data can be produced. """ + return space.wrap(self.needs_input) + def eof_w(self, space): if self.running: return space.w_False else: return space.w_True - @unwrap_spec(data='bufferstr') - def decompress(self, data): - """decompress(data) -> string + def _decompress_buf(self, data, max_length): + in_bufsize = len(data) + with rffi.scoped_nonmovingbuffer(data) as in_buf: + # setup the input and the size it can consume + self.bzs.c_next_in = in_buf + rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize) + + with OutBuffer(self.bzs, max_length=max_length) as out: + while True: + bzreturn = BZ2_bzDecompress(self.bzs) + # add up the size that has not been processed + avail_in = rffi.getintfield(self.bzs, 'c_avail_in') + self.left_to_process = avail_in + if bzreturn == BZ_STREAM_END: + self.running = False + break + if bzreturn != BZ_OK: + _catch_bz2_error(self.space, bzreturn) + + if self.left_to_process == 0: + break + elif rffi.getintfield(self.bzs, 'c_avail_out') == 0: + if out.get_data_size() == max_length: + break + out.prepare_next_chunk() + + if not self.running: + self.needs_input = False + if self.left_to_process != 0: + end = len(data) + start = end - self.left_to_process + assert start > 0 + self.unused_data = data[start:] + res = out.make_result_string() + return self.space.newbytes(res) + + @unwrap_spec(data='bufferstr', max_length=int) + def decompress(self, data, max_length=-1): + """decompress(data, max_length=-1) -> bytes Provide more data to the decompressor object. It will return chunks of decompressed data whenever possible. If you try to decompress data @@ -416,37 +473,30 @@ if not self.running: raise oefmt(self.space.w_EOFError, "end of stream was already found") - if data == '': - return self.space.newbytes('') + datalen = len(data) + if len(self.input_buffer) > 0: + input_buffer_in_use = True + data = self.input_buffer + data + datalen = len(data) + result = self._decompress_buf(data, max_length) + else: + input_buffer_in_use = False + result = self._decompress_buf(data, max_length) - in_bufsize = len(data) + if self.left_to_process == 0: + self.input_buffer = "" + self.needs_input = True + else: + self.needs_input = False + if not input_buffer_in_use: + start = datalen-self.left_to_process + assert start > 0 + self.input_buffer = data[start:] - with rffi.scoped_nonmovingbuffer(data) as in_buf: - self.bzs.c_next_in = in_buf - rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize) + return result - with OutBuffer(self.bzs) as out: - while True: - bzerror = BZ2_bzDecompress(self.bzs) - if bzerror == BZ_STREAM_END: - if rffi.getintfield(self.bzs, 'c_avail_in') != 0: - unused = [self.bzs.c_next_in[i] - for i in range( - rffi.getintfield(self.bzs, - 'c_avail_in'))] - self.unused_data = "".join(unused) - self.running = False - break - if bzerror != BZ_OK: - _catch_bz2_error(self.space, bzerror) - if rffi.getintfield(self.bzs, 'c_avail_in') == 0: - break - elif rffi.getintfield(self.bzs, 'c_avail_out') == 0: - out.prepare_next_chunk() - res = out.make_result_string() - return self.space.newbytes(res) W_BZ2Decompressor.typedef = TypeDef("_bz2.BZ2Decompressor", @@ -456,5 +506,6 @@ unused_data = interp_attrproperty_bytes("unused_data", W_BZ2Decompressor), eof = GetSetProperty(W_BZ2Decompressor.eof_w), decompress = interp2app(W_BZ2Decompressor.decompress), + needs_input = GetSetProperty(W_BZ2Decompressor.needs_input_w), ) W_BZ2Decompressor.typedef.acceptable_as_base_class = False diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py b/pypy/module/bz2/test/test_bz2_compdecomp.py --- a/pypy/module/bz2/test/test_bz2_compdecomp.py +++ b/pypy/module/bz2/test/test_bz2_compdecomp.py @@ -1,6 +1,8 @@ import os import py +import glob +import bz2 from pypy.module.bz2.test.support import CheckAllocation from pypy.module.bz2 import interp_bz2 @@ -34,11 +36,11 @@ mod.decompress = decompress # # For tests, patch the value of SMALLCHUNK - mod.OLD_SMALLCHUNK = interp_bz2.SMALLCHUNK - interp_bz2.SMALLCHUNK = 32 + mod.OLD_SMALLCHUNK = interp_bz2.INITIAL_BUFFER_SIZE + interp_bz2.INITIAL_BUFFER_SIZE = 32 def teardown_module(mod): - interp_bz2.SMALLCHUNK = mod.OLD_SMALLCHUNK + interp_bz2.INITIAL_BUFFER_SIZE = mod.OLD_SMALLCHUNK class AppTestBZ2Compressor(CheckAllocation): spaceconfig = dict(usemodules=('bz2', 'time', 'struct')) @@ -200,6 +202,22 @@ exc = raises(TypeError, pickle.dumps, BZ2Decompressor()) assert exc.value.args[0] == "cannot serialize '_bz2.BZ2Decompressor' object" + def test_decompress_max_length(self): + from bz2 import BZ2Decompressor + + bz2d = BZ2Decompressor() + decomp= [] + + length = len(self.DATA) + decomp.append(bz2d.decompress(self.DATA, max_length=100)) + assert len(decomp[-1]) == 100 + + while not bz2d.eof: + decomp.append(bz2d.decompress(b"", max_length=50)) + assert len(decomp[-1]) <= 50 + + assert b''.join(decomp) == self.TEXT + class AppTestBZ2ModuleFunctions(CheckAllocation): spaceconfig = dict(usemodules=('bz2', 'time')) diff --git a/pypy/module/bz2/test/test_bz2_file.py b/pypy/module/bz2/test/test_bz2_file.py --- a/pypy/module/bz2/test/test_bz2_file.py +++ b/pypy/module/bz2/test/test_bz2_file.py @@ -256,7 +256,6 @@ self.create_temp_file() bz2f = BZ2File(self.temppath) - raises(TypeError, bz2f.read, None) text_read = bz2f.read() assert text_read == self.TEXT bz2f.close() diff --git a/pypy/tool/build_cffi_imports.py b/pypy/tool/build_cffi_imports.py --- a/pypy/tool/build_cffi_imports.py +++ b/pypy/tool/build_cffi_imports.py @@ -44,7 +44,10 @@ return failures if __name__ == '__main__': - import py, os + # NOTE: it does not work to execute this file to rebuild the cffi backends + # for pypy3. This script is python 2! Thus you can specify + # exefile as an argument to still be able to run this script with a pypy2 vm + import py, os, argparse if '__pypy__' not in sys.builtin_module_names: print 'Call with a pypy interpreter' sys.exit(-1) @@ -52,8 +55,15 @@ class Options(object): pass - exename = py.path.local(sys.executable) + parser = argparse.ArgumentParser(description='Build all cffi backends in lib_pypy') + parser.add_argument('--exefile', dest='exefile', default=sys.executable, + help='instead of executing sys.executable' \ + ' you can specify an alternative pypy vm here') + args = parser.parse_args() + + exename = py.path.local(args.exefile) basedir = exename + while not basedir.join('include').exists(): _basedir = basedir.dirpath() if _basedir == basedir: diff --git a/rpython/tool/runsubprocess.py b/rpython/tool/runsubprocess.py --- a/rpython/tool/runsubprocess.py +++ b/rpython/tool/runsubprocess.py @@ -8,10 +8,15 @@ import os from subprocess import PIPE, Popen +PY3 = sys.version_info[0] >= 3 + def run_subprocess(executable, args, env=None, cwd=None): if isinstance(args, list): - args = [a.encode('latin1') if isinstance(a, unicode) else a - for a in args] + if PY3: + args = [a for a in args] + else: + args = [a.encode('latin1') if isinstance(a, unicode) else a + for a in args] return _run(executable, args, env, cwd) shell_default = False @@ -83,7 +88,7 @@ def _run(*args): try: - _child.stdin.write('%r\n' % (args,)) + _child.stdin.write(b'%r\n' % (args,)) except (OSError, IOError): # lost the child. Try again... spawn_subprocess() _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit