Author: Ronan Lamy <ronan.l...@gmail.com> Branch: py3.5 Changeset: r93183:8125ba2d1fc1 Date: 2017-11-26 03:02 +0000 http://bitbucket.org/pypy/pypy/changeset/8125ba2d1fc1/
Log: hg merge default diff --git a/extra_tests/test_textio.py b/extra_tests/test_textio.py new file mode 100644 --- /dev/null +++ b/extra_tests/test_textio.py @@ -0,0 +1,28 @@ +from hypothesis import given, strategies as st + +from io import BytesIO, TextIOWrapper + +LINESEP = ['', '\r', '\n', '\r\n'] + +@st.composite +def text_with_newlines(draw): + sep = draw(st.sampled_from(LINESEP)) + lines = draw(st.lists(st.text(max_size=10), max_size=10)) + return sep.join(lines) + +@given(txt=text_with_newlines(), + mode=st.sampled_from(['\r', '\n', '\r\n', '']), + limit=st.integers(min_value=-1)) +def test_readline(txt, mode, limit): + textio = TextIOWrapper( + BytesIO(txt.encode('utf-8')), encoding='utf-8', newline=mode) + lines = [] + while True: + line = textio.readline(limit) + if limit > 0: + assert len(line) < limit + if line: + lines.append(line) + else: + break + assert u''.join(lines) == txt diff --git a/pypy/module/_io/interp_stringio.py b/pypy/module/_io/interp_stringio.py --- a/pypy/module/_io/interp_stringio.py +++ b/pypy/module/_io/interp_stringio.py @@ -2,21 +2,115 @@ from pypy.interpreter.typedef import ( TypeDef, generic_new_descr, GetSetProperty) from pypy.interpreter.gateway import interp2app, unwrap_spec, WrappedDefault -from pypy.module._io.interp_textio import W_TextIOBase, W_IncrementalNewlineDecoder +from pypy.module._io.interp_textio import ( + W_TextIOBase, W_IncrementalNewlineDecoder) from pypy.module._io.interp_iobase import convert_size +class UnicodeIO(object): + def __init__(self, data=None, pos=0): + if data is None: + data = [] + self.data = data + self.pos = pos + + def resize(self, newlength): + if len(self.data) > newlength: + self.data = self.data[:newlength] + if len(self.data) < newlength: + self.data.extend([u'\0'] * (newlength - len(self.data))) + + def read(self, size): + start = self.pos + available = len(self.data) - start + if available <= 0: + return u'' + if size >= 0 and size <= available: + end = start + size + else: + end = len(self.data) + assert 0 <= start <= end + self.pos = end + return u''.join(self.data[start:end]) + + def _convert_limit(self, limit): + if limit < 0 or limit > len(self.data) - self.pos: + limit = len(self.data) - self.pos + assert limit >= 0 + return limit + + def readline_universal(self, limit): + # Universal newline search. Find any of \r, \r\n, \n + limit = self._convert_limit(limit) + start = self.pos + end = start + limit + pos = start + while pos < end: + ch = self.data[pos] + pos += 1 + if ch == '\n': + break + if ch == '\r': + if pos >= end: + break + if self.data[pos] == '\n': + pos += 1 + break + else: + break + self.pos = pos + result = u''.join(self.data[start:pos]) + return result + + def readline(self, marker, limit): + start = self.pos + limit = self._convert_limit(limit) + end = start + limit + found = False + for pos in range(start, end - len(marker) + 1): + ch = self.data[pos] + if ch == marker[0]: + for j in range(1, len(marker)): + if self.data[pos + j] != marker[j]: + break # from inner loop + else: + pos += len(marker) + found = True + break + if not found: + pos = end + self.pos = pos + result = u''.join(self.data[start:pos]) + return result + + def write(self, string): + length = len(string) + if self.pos + length > len(self.data): + self.resize(self.pos + length) + + for i in range(length): + self.data[self.pos + i] = string[i] + self.pos += length + + def seek(self, pos): + self.pos = pos + + def truncate(self, size): + if size < len(self.data): + self.resize(size) + + def getvalue(self): + return u''.join(self.data) + class W_StringIO(W_TextIOBase): def __init__(self, space): W_TextIOBase.__init__(self, space) - self.buf = [] - self.pos = 0 + self.buf = UnicodeIO() @unwrap_spec(w_newline = WrappedDefault(u"\n")) def descr_init(self, space, w_initvalue=None, w_newline=None): # In case __init__ is called multiple times - self.buf = [] - self.pos = 0 + self.buf = UnicodeIO() self.w_decoder = None self.readnl = None self.writenl = None @@ -27,7 +121,7 @@ newline = space.unicode_w(w_newline) if (newline is not None and newline != u"" and newline != u"\n" and - newline != u"\r" and newline != u"\r\n"): + newline != u"\r" and newline != u"\r\n"): # Not using oefmt() because I don't know how to use it # with unicode raise OperationError(space.w_ValueError, @@ -50,7 +144,7 @@ if not space.is_none(w_initvalue): self.write_w(space, w_initvalue) - self.pos = 0 + self.buf.pos = 0 def descr_getstate(self, space): w_initialval = self.getvalue_w(space) @@ -58,9 +152,9 @@ if self.readnl is None: w_readnl = space.w_None else: - w_readnl = space.str(space.newunicode(self.readnl)) # YYY + w_readnl = space.str(space.newunicode(self.readnl)) # YYY return space.newtuple([ - w_initialval, w_readnl, space.newint(self.pos), w_dict + w_initialval, w_readnl, space.newint(self.buf.pos), w_dict ]) def descr_setstate(self, space, w_state): @@ -69,34 +163,33 @@ # We allow the state tuple to be longer than 4, because we may need # someday to extend the object's state without breaking # backwards-compatibility - if not space.isinstance_w(w_state, space.w_tuple) or space.len_w(w_state) < 4: + if (not space.isinstance_w(w_state, space.w_tuple) + or space.len_w(w_state) < 4): raise oefmt(space.w_TypeError, "%T.__setstate__ argument should be a 4-tuple, got %T", self, w_state) w_initval, w_readnl, w_pos, w_dict = space.unpackiterable(w_state, 4) + if not space.isinstance_w(w_initval, space.w_unicode): + raise oefmt(space.w_TypeError, + "unicode argument expected, got '%T'", w_initval) # Initialize state - self.descr_init(space, w_initval, w_readnl) + self.descr_init(space, None, w_readnl) - # Restore the buffer state. Even if __init__ did initialize the buffer, - # we have to initialize it again since __init__ may translates the - # newlines in the inital_value string. We clearly do not want that + # Restore the buffer state. We're not doing it via __init__ # because the string value in the state tuple has already been # translated once by __init__. So we do not take any chance and replace # object's buffer completely initval = space.unicode_w(w_initval) - size = len(initval) - self.resize_buffer(size) - self.buf = list(initval) pos = space.getindex_w(w_pos, space.w_TypeError) if pos < 0: raise oefmt(space.w_ValueError, "position value cannot be negative") - self.pos = pos + self.buf = UnicodeIO(list(initval), pos) if not space.is_w(w_dict, space.w_None): if not space.isinstance_w(w_dict, space.w_dict): - raise oefmt(space.w_TypeError, - "fourth item of state should be a dict, got a %T", - w_dict) + raise oefmt( + space.w_TypeError, + "fourth item of state should be a dict, got a %T", w_dict) # Alternatively, we could replace the internal dictionary # completely. However, it seems more practical to just update it. space.call_method(self.w_dict, "update", w_dict) @@ -107,88 +200,47 @@ message = "I/O operation on closed file" raise OperationError(space.w_ValueError, space.newtext(message)) - def resize_buffer(self, newlength): - if len(self.buf) > newlength: - self.buf = self.buf[:newlength] - if len(self.buf) < newlength: - self.buf.extend([u'\0'] * (newlength - len(self.buf))) - - def write(self, string): - length = len(string) - if self.pos + length > len(self.buf): - self.resize_buffer(self.pos + length) - - for i in range(length): - self.buf[self.pos + i] = string[i] - self.pos += length - def write_w(self, space, w_obj): if not space.isinstance_w(w_obj, space.w_unicode): raise oefmt(space.w_TypeError, "unicode argument expected, got '%T'", w_obj) self._check_closed(space) - orig_size = space.len_w(w_obj) if self.w_decoder is not None: w_decoded = space.call_method( - self.w_decoder, "decode", w_obj, space.w_True - ) + self.w_decoder, "decode", w_obj, space.w_True) else: w_decoded = w_obj - if self.writenl: w_decoded = space.call_method( - w_decoded, "replace", space.newtext("\n"), space.newunicode(self.writenl) - ) + w_decoded, "replace", + space.newtext("\n"), space.newunicode(self.writenl)) + string = space.unicode_w(w_decoded) + if string: + self.buf.write(string) - string = space.unicode_w(w_decoded) - size = len(string) - - if size: - self.write(string) return space.newint(orig_size) def read_w(self, space, w_size=None): self._check_closed(space) size = convert_size(space, w_size) - start = self.pos - available = len(self.buf) - start - if available <= 0: - return space.newunicode(u"") - if size >= 0 and size <= available: - end = start + size - else: - end = len(self.buf) - assert 0 <= start <= end - self.pos = end - return space.newunicode(u''.join(self.buf[start:end])) + return space.newunicode(self.buf.read(size)) def readline_w(self, space, w_limit=None): self._check_closed(space) limit = convert_size(space, w_limit) + if self.readuniversal: + result = self.buf.readline_universal(limit) + else: + if self.readtranslate: + # Newlines are already translated, only search for \n + newline = u'\n' + else: + newline = self.readnl + result = self.buf.readline(newline, limit) + return space.newunicode(result) - if self.pos >= len(self.buf): - return space.newunicode(u"") - - start = self.pos - if limit < 0 or limit > len(self.buf) - self.pos: - limit = len(self.buf) - self.pos - - assert limit >= 0 - end = start + limit - - endpos, consumed = self._find_line_ending( - # XXX: super inefficient, makes a copy of the entire contents. - u"".join(self.buf), - start, - end - ) - if endpos < 0: - endpos = end - assert endpos >= 0 - self.pos = endpos - return space.newunicode(u"".join(self.buf[start:endpos])) @unwrap_spec(pos=int, mode=int) def seek_w(self, space, pos, mode=0): @@ -204,32 +256,27 @@ # XXX: this makes almost no sense, but its how CPython does it. if mode == 1: - pos = self.pos + pos = self.buf.pos elif mode == 2: - pos = len(self.buf) - + pos = len(self.buf.data) assert pos >= 0 - self.pos = pos + self.buf.seek(pos) return space.newint(pos) def truncate_w(self, space, w_size=None): self._check_closed(space) if space.is_none(w_size): - size = self.pos + size = self.buf.pos else: size = space.int_w(w_size) - if size < 0: raise oefmt(space.w_ValueError, "Negative size value %d", size) - - if size < len(self.buf): - self.resize_buffer(size) - + self.buf.truncate(size) return space.newint(size) def getvalue_w(self, space): self._check_closed(space) - return space.newunicode(u''.join(self.buf)) + return space.newunicode(self.buf.getvalue()) def readable_w(self, space): self._check_closed(space) diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py --- a/pypy/module/_io/interp_textio.py +++ b/pypy/module/_io/interp_textio.py @@ -214,45 +214,6 @@ def newlines_get_w(self, space): return space.w_None - def _find_line_ending(self, line, start, end): - size = end - start - if self.readuniversal: - # Universal newline search. Find any of \r, \r\n, \n - # The decoder ensures that \r\n are not split in two pieces - i = start - while True: - # Fast path for non-control chars. - while i < end and line[i] > '\r': - i += 1 - if i >= end: - return -1, size - ch = line[i] - i += 1 - if ch == '\n': - return i, 0 - if ch == '\r': - if line[i] == '\n': - return i + 1, 0 - else: - return i, 0 - if self.readtranslate: - # Newlines are already translated, only search for \n - newline = u'\n' - else: - # Non-universal mode. - newline = self.readnl - end_scan = end - len(newline) + 1 - for i in range(start, end_scan): - ch = line[i] - if ch == newline[0]: - for j in range(1, len(newline)): - if line[i + j] != newline[j]: - break - else: - return i + len(newline), 0 - return -1, end_scan - - W_TextIOBase.typedef = TypeDef( '_io._TextIOBase', W_IOBase.typedef, __new__ = generic_new_descr(W_TextIOBase), @@ -343,6 +304,126 @@ self.input = input +class DecodeBuffer(object): + def __init__(self, text=None): + self.text = text + self.pos = 0 + + def set(self, space, w_decoded): + check_decoded(space, w_decoded) + self.text = space.unicode_w(w_decoded) + self.pos = 0 + + def reset(self): + self.text = None + self.pos = 0 + + def get_chars(self, size): + if self.text is None: + return u"" + + available = len(self.text) - self.pos + if size < 0 or size > available: + size = available + assert size >= 0 + + if self.pos > 0 or size < available: + start = self.pos + end = self.pos + size + assert start >= 0 + assert end >= 0 + chars = self.text[start:end] + else: + chars = self.text + + self.pos += size + return chars + + def has_data(self): + return (self.text is not None and not self.exhausted()) + + def exhausted(self): + return self.pos >= len(self.text) + + def next_char(self): + if self.exhausted(): + raise StopIteration + ch = self.text[self.pos] + self.pos += 1 + return ch + + def peek_char(self): + # like next_char, but doesn't advance pos + if self.exhausted(): + raise StopIteration + ch = self.text[self.pos] + return ch + + def find_newline_universal(self, limit): + # Universal newline search. Find any of \r, \r\n, \n + # The decoder ensures that \r\n are not split in two pieces + if limit < 0: + limit = sys.maxint + scanned = 0 + while scanned < limit: + try: + ch = self.next_char() + except StopIteration: + return False + if ch == u'\n': + return True + if ch == u'\r': + if scanned >= limit: + return False + try: + ch = self.peek_char() + except StopIteration: + return False + if ch == u'\n': + self.next_char() + return True + else: + return True + return False + + def find_crlf(self, limit): + if limit < 0: + limit = sys.maxint + scanned = 0 + while scanned < limit: + try: + ch = self.next_char() + except StopIteration: + return False + scanned += 1 + if ch == u'\r': + if scanned >= limit: + return False + try: + if self.peek_char() == u'\n': + self.next_char() + return True + except StopIteration: + # This is the tricky case: we found a \r right at the end + self.pos -= 1 + return False + return False + + def find_char(self, marker, limit): + if limit < 0: + limit = sys.maxint + scanned = 0 + while scanned < limit: + try: + ch = self.next_char() + except StopIteration: + return False + if ch == marker: + return True + scanned += 1 + return False + + def check_decoded(space, w_decoded): if not space.isinstance_w(w_decoded, space.w_unicode): msg = "decoder should return a string result, not '%T'" @@ -356,8 +437,7 @@ self.w_encoder = None self.w_decoder = None - self.decoded_chars = None # buffer for text returned from decoder - self.decoded_chars_used = 0 # offset into _decoded_chars for read() + self.decoded = DecodeBuffer() self.pending_bytes = None # list of bytes objects waiting to be # written, or NULL self.chunk_size = 8192 @@ -546,44 +626,10 @@ # _____________________________________________________________ # read methods - def _unset_decoded(self): - self.decoded_chars = None - self.decoded_chars_used = 0 - - def _set_decoded(self, space, w_decoded): - check_decoded(space, w_decoded) - self.decoded_chars = space.unicode_w(w_decoded) - self.decoded_chars_used = 0 - - def _get_decoded_chars(self, size): - if self.decoded_chars is None: - return u"" - - available = len(self.decoded_chars) - self.decoded_chars_used - if size < 0 or size > available: - size = available - assert size >= 0 - - if self.decoded_chars_used > 0 or size < available: - start = self.decoded_chars_used - end = self.decoded_chars_used + size - assert start >= 0 - assert end >= 0 - chars = self.decoded_chars[start:end] - else: - chars = self.decoded_chars - - self.decoded_chars_used += size - return chars - - def _has_data(self): - return (self.decoded_chars is not None and - self.decoded_chars_used < len(self.decoded_chars)) - def _read_chunk(self, space): """Read and decode the next chunk of data from the BufferedReader. The return value is True unless EOF was reached. The decoded string - is placed in self._decoded_chars (replacing its previous value). + is placed in self.decoded (replacing its previous value). The entire input chunk is sent to the decoder, though some of it may remain buffered in the decoder, yet to be converted.""" @@ -607,7 +653,7 @@ dec_buffer = None dec_flags = 0 - # Read a chunk, decode it, and put the result in self._decoded_chars + # Read a chunk, decode it, and put the result in self.decoded func_name = "read1" if self.has_read1 else "read" w_input = space.call_method(self.w_buffer, func_name, space.newint(self.chunk_size)) @@ -622,7 +668,7 @@ eof = input_buf.getlength() == 0 w_decoded = space.call_method(self.w_decoder, "decode", w_input, space.newbool(eof)) - self._set_decoded(space, w_decoded) + self.decoded.set(space, w_decoded) if space.len_w(w_decoded) > 0: eof = False @@ -635,10 +681,10 @@ return not eof def _ensure_data(self, space): - while not self._has_data(): + while not self.decoded.has_data(): try: if not self._read_chunk(space): - self._unset_decoded() + self.decoded.reset() self.snapshot = None return False except OperationError as e: @@ -671,7 +717,7 @@ w_bytes = space.call_method(self.w_buffer, "read") w_decoded = space.call_method(self.w_decoder, "decode", w_bytes, space.w_True) check_decoded(space, w_decoded) - w_result = space.newunicode(self._get_decoded_chars(-1)) + w_result = space.newunicode(self.decoded.get_chars(-1)) w_final = space.add(w_result, w_decoded) self.snapshot = None return w_final @@ -683,83 +729,79 @@ while remaining > 0: if not self._ensure_data(space): break - data = self._get_decoded_chars(remaining) + data = self.decoded.get_chars(remaining) builder.append(data) remaining -= len(data) return space.newunicode(builder.build()) + def _scan_line_ending(self, limit): + if self.readuniversal: + return self.decoded.find_newline_universal(limit) + else: + if self.readtranslate: + # Newlines are already translated, only search for \n + newline = u'\n' + else: + # Non-universal mode. + newline = self.readnl + if newline == u'\r\n': + return self.decoded.find_crlf(limit) + else: + return self.decoded.find_char(newline[0], limit) + def readline_w(self, space, w_limit=None): self._check_attached(space) self._check_closed(space) self._writeflush(space) limit = convert_size(space, w_limit) - - line = None - remaining = None + remnant = None builder = UnicodeBuilder() - while True: # First, get some data if necessary has_data = self._ensure_data(space) if not has_data: # end of file - start = endpos = offset_to_buffer = 0 + if remnant: + builder.append(remnant) break - if not remaining: - line = self.decoded_chars - start = self.decoded_chars_used - offset_to_buffer = 0 + if remnant: + assert not self.readtranslate and self.readnl == u'\r\n' + assert self.decoded.pos == 0 + if remnant == u'\r' and self.decoded.text[0] == u'\n': + builder.append(u'\r\n') + self.decoded.pos = 1 + remnant = None + break + else: + builder.append(remnant) + remnant = None + continue + + if limit > 0: + remaining = limit - builder.getlength() + assert remaining >= 0 else: - assert self.decoded_chars_used == 0 - line = remaining + self.decoded_chars - start = 0 - offset_to_buffer = len(remaining) - remaining = None + remaining = -1 + start = self.decoded.pos + assert start >= 0 + found = self._scan_line_ending(remaining) + end_scan = self.decoded.pos + if end_scan > start: + s = self.decoded.text[start:end_scan] + builder.append(s) - line_len = len(line) - endpos, consumed = self._find_line_ending(line, start, line_len) - chunked = builder.getlength() - if endpos >= 0: - if limit >= 0 and endpos >= start + limit - chunked: - endpos = start + limit - chunked - assert endpos >= 0 - break - assert consumed >= 0 - - # We can put aside up to `endpos` - endpos = consumed + start - if limit >= 0 and endpos >= start + limit - chunked: - # Didn't find line ending, but reached length limit - endpos = start + limit - chunked - assert endpos >= 0 + if found or (limit >= 0 and builder.getlength() >= limit): break - # No line ending seen yet - put aside current data - if endpos > start: - s = line[start:endpos] - builder.append(s) - - # There may be some remaining bytes we'll have to prepend to the + # There may be some remaining chars we'll have to prepend to the # next chunk of data - if endpos < line_len: - remaining = line[endpos:] - line = None + if not self.decoded.exhausted(): + remnant = self.decoded.get_chars(-1) # We have consumed the buffer - self._unset_decoded() - - if line: - # Our line ends in the current buffer - decoded_chars_used = endpos - offset_to_buffer - assert decoded_chars_used >= 0 - self.decoded_chars_used = decoded_chars_used - if start > 0 or endpos < len(line): - line = line[start:endpos] - builder.append(line) - elif remaining: - builder.append(remaining) + self.decoded.reset() result = builder.build() return space.newunicode(result) @@ -903,7 +945,7 @@ self._unsupportedoperation( space, "can't do nonzero end-relative seeks") space.call_method(self, "flush") - self._unset_decoded() + self.decoded.reset() self.snapshot = None if self.w_decoder: space.call_method(self.w_decoder, "reset") @@ -933,7 +975,7 @@ # Seek back to the safe start point space.call_method(self.w_buffer, "seek", space.newint(cookie.start_pos)) - self._unset_decoded() + self.decoded.reset() self.snapshot = None # Restore the decoder to its state from the safe start point. @@ -954,13 +996,13 @@ w_decoded = space.call_method(self.w_decoder, "decode", w_chunk, space.newbool(bool(cookie.need_eof))) - self._set_decoded(space, w_decoded) + self.decoded.set(space, w_decoded) # Skip chars_to_skip of the decoded characters - if len(self.decoded_chars) < cookie.chars_to_skip: + if len(self.decoded.text) < cookie.chars_to_skip: raise oefmt(space.w_IOError, "can't restore logical file position") - self.decoded_chars_used = cookie.chars_to_skip + self.decoded.pos = cookie.chars_to_skip else: self.snapshot = PositionSnapshot(cookie.dec_flags, "") @@ -987,7 +1029,7 @@ w_pos = space.call_method(self.w_buffer, "tell") if self.w_decoder is None or self.snapshot is None: - assert not self.decoded_chars + assert not self.decoded.text return w_pos cookie = PositionCookie(space.bigint_w(w_pos)) @@ -998,11 +1040,11 @@ cookie.start_pos -= len(input) # How many decoded characters have been used up since the snapshot? - if not self.decoded_chars_used: + if not self.decoded.pos: # We haven't moved from the snapshot point. return space.newlong_from_rbigint(cookie.pack()) - chars_to_skip = self.decoded_chars_used + chars_to_skip = self.decoded.pos # Starting from the snapshot position, we will walk the decoder # forward until it gives us enough decoded characters. diff --git a/pypy/module/_io/test/test_interp_textio.py b/pypy/module/_io/test/test_interp_textio.py new file mode 100644 --- /dev/null +++ b/pypy/module/_io/test/test_interp_textio.py @@ -0,0 +1,68 @@ +import pytest +try: + from hypothesis import given, strategies as st, assume +except ImportError: + pytest.skip("hypothesis required") +from pypy.module._io.interp_bytesio import W_BytesIO +from pypy.module._io.interp_textio import W_TextIOWrapper, DecodeBuffer + +LINESEP = ['', '\r', '\n', '\r\n'] + +@st.composite +def text_with_newlines(draw): + sep = draw(st.sampled_from(LINESEP)) + lines = draw(st.lists(st.text(max_size=10), max_size=10)) + return sep.join(lines) + +@given(txt=text_with_newlines(), + mode=st.sampled_from(['\r', '\n', '\r\n', '']), + limit=st.integers(min_value=-1)) +def test_readline(space, txt, mode, limit): + assume(limit != 0) + w_stream = W_BytesIO(space) + w_stream.descr_init(space, space.newbytes(txt.encode('utf-8'))) + w_textio = W_TextIOWrapper(space) + w_textio.descr_init( + space, w_stream, encoding='utf-8', + w_newline=space.newtext(mode)) + lines = [] + while True: + line = space.unicode_w(w_textio.readline_w(space, space.newint(limit))) + if limit > 0: + assert len(line) <= limit + if line: + lines.append(line) + else: + break + assert u''.join(lines) == txt + +@given(st.text()) +def test_read_buffer(text): + buf = DecodeBuffer(text) + assert buf.get_chars(-1) == text + assert buf.exhausted() + +@given(st.text(), st.lists(st.integers(min_value=0))) +def test_readn_buffer(text, sizes): + buf = DecodeBuffer(text) + strings = [] + for n in sizes: + s = buf.get_chars(n) + if not buf.exhausted(): + assert len(s) == n + else: + assert len(s) <= n + strings.append(s) + assert ''.join(strings) == text[:sum(sizes)] + +@given(st.text()) +def test_next_char(text): + buf = DecodeBuffer(text) + chars = [] + try: + while True: + chars.append(buf.next_char()) + except StopIteration: + pass + assert buf.exhausted() + assert u''.join(chars) == text _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit