Author: Ronan Lamy <ronan.l...@gmail.com>
Branch: py3.5
Changeset: r93183:8125ba2d1fc1
Date: 2017-11-26 03:02 +0000
http://bitbucket.org/pypy/pypy/changeset/8125ba2d1fc1/

Log:    hg merge default

diff --git a/extra_tests/test_textio.py b/extra_tests/test_textio.py
new file mode 100644
--- /dev/null
+++ b/extra_tests/test_textio.py
@@ -0,0 +1,28 @@
+from hypothesis import given, strategies as st
+
+from io import BytesIO, TextIOWrapper
+
+LINESEP = ['', '\r', '\n', '\r\n']
+
+@st.composite
+def text_with_newlines(draw):
+    sep = draw(st.sampled_from(LINESEP))
+    lines = draw(st.lists(st.text(max_size=10), max_size=10))
+    return sep.join(lines)
+
+@given(txt=text_with_newlines(),
+       mode=st.sampled_from(['\r', '\n', '\r\n', '']),
+       limit=st.integers(min_value=-1))
+def test_readline(txt, mode, limit):
+    textio = TextIOWrapper(
+        BytesIO(txt.encode('utf-8')), encoding='utf-8', newline=mode)
+    lines = []
+    while True:
+        line = textio.readline(limit)
+        if limit > 0:
+            assert len(line) < limit
+        if line:
+            lines.append(line)
+        else:
+            break
+    assert u''.join(lines) == txt
diff --git a/pypy/module/_io/interp_stringio.py 
b/pypy/module/_io/interp_stringio.py
--- a/pypy/module/_io/interp_stringio.py
+++ b/pypy/module/_io/interp_stringio.py
@@ -2,21 +2,115 @@
 from pypy.interpreter.typedef import (
     TypeDef, generic_new_descr, GetSetProperty)
 from pypy.interpreter.gateway import interp2app, unwrap_spec, WrappedDefault
-from pypy.module._io.interp_textio import W_TextIOBase, 
W_IncrementalNewlineDecoder
+from pypy.module._io.interp_textio import (
+        W_TextIOBase, W_IncrementalNewlineDecoder)
 from pypy.module._io.interp_iobase import convert_size
 
+class UnicodeIO(object):
+    def __init__(self, data=None, pos=0):
+        if data is None:
+            data = []
+        self.data = data
+        self.pos = pos
+
+    def resize(self, newlength):
+        if len(self.data) > newlength:
+            self.data = self.data[:newlength]
+        if len(self.data) < newlength:
+            self.data.extend([u'\0'] * (newlength - len(self.data)))
+
+    def read(self, size):
+        start = self.pos
+        available = len(self.data) - start
+        if available <= 0:
+            return u''
+        if size >= 0 and size <= available:
+            end = start + size
+        else:
+            end = len(self.data)
+        assert 0 <= start <= end
+        self.pos = end
+        return u''.join(self.data[start:end])
+
+    def _convert_limit(self, limit):
+        if limit < 0 or limit > len(self.data) - self.pos:
+            limit = len(self.data) - self.pos
+        assert limit >= 0
+        return limit
+
+    def readline_universal(self, limit):
+        # Universal newline search. Find any of \r, \r\n, \n
+        limit = self._convert_limit(limit)
+        start = self.pos
+        end = start + limit
+        pos = start
+        while pos < end:
+            ch = self.data[pos]
+            pos += 1
+            if ch == '\n':
+                break
+            if ch == '\r':
+                if pos >= end:
+                    break
+                if self.data[pos] == '\n':
+                    pos += 1
+                    break
+                else:
+                    break
+        self.pos = pos
+        result = u''.join(self.data[start:pos])
+        return result
+
+    def readline(self, marker, limit):
+        start = self.pos
+        limit = self._convert_limit(limit)
+        end = start + limit
+        found = False
+        for pos in range(start, end - len(marker) + 1):
+            ch = self.data[pos]
+            if ch == marker[0]:
+                for j in range(1, len(marker)):
+                    if self.data[pos + j] != marker[j]:
+                        break  # from inner loop
+                else:
+                    pos += len(marker)
+                    found = True
+                    break
+        if not found:
+            pos = end
+        self.pos = pos
+        result = u''.join(self.data[start:pos])
+        return result
+
+    def write(self, string):
+        length = len(string)
+        if self.pos + length > len(self.data):
+            self.resize(self.pos + length)
+
+        for i in range(length):
+            self.data[self.pos + i] = string[i]
+        self.pos += length
+
+    def seek(self, pos):
+        self.pos = pos
+
+    def truncate(self, size):
+        if size < len(self.data):
+            self.resize(size)
+
+    def getvalue(self):
+        return u''.join(self.data)
+
 
 class W_StringIO(W_TextIOBase):
     def __init__(self, space):
         W_TextIOBase.__init__(self, space)
-        self.buf = []
-        self.pos = 0
+        self.buf = UnicodeIO()
 
     @unwrap_spec(w_newline = WrappedDefault(u"\n"))
     def descr_init(self, space, w_initvalue=None, w_newline=None):
         # In case __init__ is called multiple times
-        self.buf = []
-        self.pos = 0
+        self.buf = UnicodeIO()
         self.w_decoder = None
         self.readnl = None
         self.writenl = None
@@ -27,7 +121,7 @@
             newline = space.unicode_w(w_newline)
 
         if (newline is not None and newline != u"" and newline != u"\n" and
-            newline != u"\r" and newline != u"\r\n"):
+                newline != u"\r" and newline != u"\r\n"):
             # Not using oefmt() because I don't know how to use it
             # with unicode
             raise OperationError(space.w_ValueError,
@@ -50,7 +144,7 @@
 
         if not space.is_none(w_initvalue):
             self.write_w(space, w_initvalue)
-            self.pos = 0
+            self.buf.pos = 0
 
     def descr_getstate(self, space):
         w_initialval = self.getvalue_w(space)
@@ -58,9 +152,9 @@
         if self.readnl is None:
             w_readnl = space.w_None
         else:
-            w_readnl = space.str(space.newunicode(self.readnl)) # YYY
+            w_readnl = space.str(space.newunicode(self.readnl))  # YYY
         return space.newtuple([
-            w_initialval, w_readnl, space.newint(self.pos), w_dict
+            w_initialval, w_readnl, space.newint(self.buf.pos), w_dict
         ])
 
     def descr_setstate(self, space, w_state):
@@ -69,34 +163,33 @@
         # We allow the state tuple to be longer than 4, because we may need
         # someday to extend the object's state without breaking
         # backwards-compatibility
-        if not space.isinstance_w(w_state, space.w_tuple) or 
space.len_w(w_state) < 4:
+        if (not space.isinstance_w(w_state, space.w_tuple)
+                or space.len_w(w_state) < 4):
             raise oefmt(space.w_TypeError,
                         "%T.__setstate__ argument should be a 4-tuple, got %T",
                         self, w_state)
         w_initval, w_readnl, w_pos, w_dict = space.unpackiterable(w_state, 4)
+        if not space.isinstance_w(w_initval, space.w_unicode):
+            raise oefmt(space.w_TypeError,
+                        "unicode argument expected, got '%T'", w_initval)
         # Initialize state
-        self.descr_init(space, w_initval, w_readnl)
+        self.descr_init(space, None, w_readnl)
 
-        # Restore the buffer state. Even if __init__ did initialize the buffer,
-        # we have to initialize it again since __init__ may translates the
-        # newlines in the inital_value string. We clearly do not want that
+        # Restore the buffer state. We're not doing it via __init__
         # because the string value in the state tuple has already been
         # translated once by __init__. So we do not take any chance and replace
         # object's buffer completely
         initval = space.unicode_w(w_initval)
-        size = len(initval)
-        self.resize_buffer(size)
-        self.buf = list(initval)
         pos = space.getindex_w(w_pos, space.w_TypeError)
         if pos < 0:
             raise oefmt(space.w_ValueError,
                         "position value cannot be negative")
-        self.pos = pos
+        self.buf = UnicodeIO(list(initval), pos)
         if not space.is_w(w_dict, space.w_None):
             if not space.isinstance_w(w_dict, space.w_dict):
-                raise oefmt(space.w_TypeError,
-                            "fourth item of state should be a dict, got a %T",
-                            w_dict)
+                raise oefmt(
+                    space.w_TypeError,
+                    "fourth item of state should be a dict, got a %T", w_dict)
             # Alternatively, we could replace the internal dictionary
             # completely. However, it seems more practical to just update it.
             space.call_method(self.w_dict, "update", w_dict)
@@ -107,88 +200,47 @@
                 message = "I/O operation on closed file"
             raise OperationError(space.w_ValueError, space.newtext(message))
 
-    def resize_buffer(self, newlength):
-        if len(self.buf) > newlength:
-            self.buf = self.buf[:newlength]
-        if len(self.buf) < newlength:
-            self.buf.extend([u'\0'] * (newlength - len(self.buf)))
-
-    def write(self, string):
-        length = len(string)
-        if self.pos + length > len(self.buf):
-            self.resize_buffer(self.pos + length)
-
-        for i in range(length):
-            self.buf[self.pos + i] = string[i]
-        self.pos += length
-
     def write_w(self, space, w_obj):
         if not space.isinstance_w(w_obj, space.w_unicode):
             raise oefmt(space.w_TypeError,
                         "unicode argument expected, got '%T'", w_obj)
         self._check_closed(space)
-
         orig_size = space.len_w(w_obj)
 
         if self.w_decoder is not None:
             w_decoded = space.call_method(
-                self.w_decoder, "decode", w_obj, space.w_True
-            )
+                self.w_decoder, "decode", w_obj, space.w_True)
         else:
             w_decoded = w_obj
-
         if self.writenl:
             w_decoded = space.call_method(
-                w_decoded, "replace", space.newtext("\n"), 
space.newunicode(self.writenl)
-            )
+                w_decoded, "replace",
+                space.newtext("\n"), space.newunicode(self.writenl))
+        string = space.unicode_w(w_decoded)
+        if string:
+            self.buf.write(string)
 
-        string = space.unicode_w(w_decoded)
-        size = len(string)
-
-        if size:
-            self.write(string)
         return space.newint(orig_size)
 
     def read_w(self, space, w_size=None):
         self._check_closed(space)
         size = convert_size(space, w_size)
-        start = self.pos
-        available = len(self.buf) - start
-        if available <= 0:
-            return space.newunicode(u"")
-        if size >= 0 and size <= available:
-            end = start + size
-        else:
-            end = len(self.buf)
-        assert 0 <= start <= end
-        self.pos = end
-        return space.newunicode(u''.join(self.buf[start:end]))
+        return space.newunicode(self.buf.read(size))
 
     def readline_w(self, space, w_limit=None):
         self._check_closed(space)
         limit = convert_size(space, w_limit)
+        if self.readuniversal:
+            result = self.buf.readline_universal(limit)
+        else:
+            if self.readtranslate:
+                # Newlines are already translated, only search for \n
+                newline = u'\n'
+            else:
+                newline = self.readnl
+            result = self.buf.readline(newline, limit)
+        return space.newunicode(result)
 
-        if self.pos >= len(self.buf):
-            return space.newunicode(u"")
-
-        start = self.pos
-        if limit < 0 or limit > len(self.buf) - self.pos:
-            limit = len(self.buf) - self.pos
-
-        assert limit >= 0
-        end = start + limit
-
-        endpos, consumed = self._find_line_ending(
-            # XXX: super inefficient, makes a copy of the entire contents.
-            u"".join(self.buf),
-            start,
-            end
-        )
-        if endpos < 0:
-            endpos = end
-        assert endpos >= 0
-        self.pos = endpos
-        return space.newunicode(u"".join(self.buf[start:endpos]))
 
     @unwrap_spec(pos=int, mode=int)
     def seek_w(self, space, pos, mode=0):
@@ -204,32 +256,27 @@
 
         # XXX: this makes almost no sense, but its how CPython does it.
         if mode == 1:
-            pos = self.pos
+            pos = self.buf.pos
         elif mode == 2:
-            pos = len(self.buf)
-
+            pos = len(self.buf.data)
         assert pos >= 0
-        self.pos = pos
+        self.buf.seek(pos)
         return space.newint(pos)
 
     def truncate_w(self, space, w_size=None):
         self._check_closed(space)
         if space.is_none(w_size):
-            size = self.pos
+            size = self.buf.pos
         else:
             size = space.int_w(w_size)
-
         if size < 0:
             raise oefmt(space.w_ValueError, "Negative size value %d", size)
-
-        if size < len(self.buf):
-            self.resize_buffer(size)
-
+        self.buf.truncate(size)
         return space.newint(size)
 
     def getvalue_w(self, space):
         self._check_closed(space)
-        return space.newunicode(u''.join(self.buf))
+        return space.newunicode(self.buf.getvalue())
 
     def readable_w(self, space):
         self._check_closed(space)
diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -214,45 +214,6 @@
     def newlines_get_w(self, space):
         return space.w_None
 
-    def _find_line_ending(self, line, start, end):
-        size = end - start
-        if self.readuniversal:
-            # Universal newline search. Find any of \r, \r\n, \n
-            # The decoder ensures that \r\n are not split in two pieces
-            i = start
-            while True:
-                # Fast path for non-control chars.
-                while i < end and line[i] > '\r':
-                    i += 1
-                if i >= end:
-                    return -1, size
-                ch = line[i]
-                i += 1
-                if ch == '\n':
-                    return i, 0
-                if ch == '\r':
-                    if line[i] == '\n':
-                        return i + 1, 0
-                    else:
-                        return i, 0
-        if self.readtranslate:
-            # Newlines are already translated, only search for \n
-            newline = u'\n'
-        else:
-            # Non-universal mode.
-            newline = self.readnl
-        end_scan = end - len(newline) + 1
-        for i in range(start, end_scan):
-            ch = line[i]
-            if ch == newline[0]:
-                for j in range(1, len(newline)):
-                    if line[i + j] != newline[j]:
-                        break
-                else:
-                    return i + len(newline), 0
-        return -1, end_scan
-
-
 W_TextIOBase.typedef = TypeDef(
     '_io._TextIOBase', W_IOBase.typedef,
     __new__ = generic_new_descr(W_TextIOBase),
@@ -343,6 +304,126 @@
         self.input = input
 
 
+class DecodeBuffer(object):
+    def __init__(self, text=None):
+        self.text = text
+        self.pos = 0
+
+    def set(self, space, w_decoded):
+        check_decoded(space, w_decoded)
+        self.text = space.unicode_w(w_decoded)
+        self.pos = 0
+
+    def reset(self):
+        self.text = None
+        self.pos = 0
+
+    def get_chars(self, size):
+        if self.text is None:
+            return u""
+
+        available = len(self.text) - self.pos
+        if size < 0 or size > available:
+            size = available
+        assert size >= 0
+
+        if self.pos > 0 or size < available:
+            start = self.pos
+            end = self.pos + size
+            assert start >= 0
+            assert end >= 0
+            chars = self.text[start:end]
+        else:
+            chars = self.text
+
+        self.pos += size
+        return chars
+
+    def has_data(self):
+        return (self.text is not None and not self.exhausted())
+
+    def exhausted(self):
+        return self.pos >= len(self.text)
+
+    def next_char(self):
+        if self.exhausted():
+            raise StopIteration
+        ch = self.text[self.pos]
+        self.pos += 1
+        return ch
+
+    def peek_char(self):
+        # like next_char, but doesn't advance pos
+        if self.exhausted():
+            raise StopIteration
+        ch = self.text[self.pos]
+        return ch
+
+    def find_newline_universal(self, limit):
+        # Universal newline search. Find any of \r, \r\n, \n
+        # The decoder ensures that \r\n are not split in two pieces
+        if limit < 0:
+            limit = sys.maxint
+        scanned = 0
+        while scanned < limit:
+            try:
+                ch = self.next_char()
+            except StopIteration:
+                return False
+            if ch == u'\n':
+                return True
+            if ch == u'\r':
+                if scanned >= limit:
+                    return False
+                try:
+                    ch = self.peek_char()
+                except StopIteration:
+                    return False
+                if ch == u'\n':
+                    self.next_char()
+                    return True
+                else:
+                    return True
+        return False
+
+    def find_crlf(self, limit):
+        if limit < 0:
+            limit = sys.maxint
+        scanned = 0
+        while scanned < limit:
+            try:
+                ch = self.next_char()
+            except StopIteration:
+                return False
+            scanned += 1
+            if ch == u'\r':
+                if scanned >= limit:
+                    return False
+                try:
+                    if self.peek_char() == u'\n':
+                        self.next_char()
+                        return True
+                except StopIteration:
+                    # This is the tricky case: we found a \r right at the end
+                    self.pos -= 1
+                    return False
+        return False
+
+    def find_char(self, marker, limit):
+        if limit < 0:
+            limit = sys.maxint
+        scanned = 0
+        while scanned < limit:
+            try:
+                ch = self.next_char()
+            except StopIteration:
+                return False
+            if ch == marker:
+                return True
+            scanned += 1
+        return False
+
+
 def check_decoded(space, w_decoded):
     if not space.isinstance_w(w_decoded, space.w_unicode):
         msg = "decoder should return a string result, not '%T'"
@@ -356,8 +437,7 @@
         self.w_encoder = None
         self.w_decoder = None
 
-        self.decoded_chars = None   # buffer for text returned from decoder
-        self.decoded_chars_used = 0 # offset into _decoded_chars for read()
+        self.decoded = DecodeBuffer()
         self.pending_bytes = None   # list of bytes objects waiting to be
                                     # written, or NULL
         self.chunk_size = 8192
@@ -546,44 +626,10 @@
     # _____________________________________________________________
     # read methods
 
-    def _unset_decoded(self):
-        self.decoded_chars = None
-        self.decoded_chars_used = 0
-
-    def _set_decoded(self, space, w_decoded):
-        check_decoded(space, w_decoded)
-        self.decoded_chars = space.unicode_w(w_decoded)
-        self.decoded_chars_used = 0
-
-    def _get_decoded_chars(self, size):
-        if self.decoded_chars is None:
-            return u""
-
-        available = len(self.decoded_chars) - self.decoded_chars_used
-        if size < 0 or size > available:
-            size = available
-        assert size >= 0
-
-        if self.decoded_chars_used > 0 or size < available:
-            start = self.decoded_chars_used
-            end = self.decoded_chars_used + size
-            assert start >= 0
-            assert end >= 0
-            chars = self.decoded_chars[start:end]
-        else:
-            chars = self.decoded_chars
-
-        self.decoded_chars_used += size
-        return chars
-
-    def _has_data(self):
-        return (self.decoded_chars is not None and
-            self.decoded_chars_used < len(self.decoded_chars))
-
     def _read_chunk(self, space):
         """Read and decode the next chunk of data from the BufferedReader.
         The return value is True unless EOF was reached.  The decoded string
-        is placed in self._decoded_chars (replacing its previous value).
+        is placed in self.decoded (replacing its previous value).
         The entire input chunk is sent to the decoder, though some of it may
         remain buffered in the decoder, yet to be converted."""
 
@@ -607,7 +653,7 @@
             dec_buffer = None
             dec_flags = 0
 
-        # Read a chunk, decode it, and put the result in self._decoded_chars
+        # Read a chunk, decode it, and put the result in self.decoded
         func_name = "read1" if self.has_read1 else "read"
         w_input = space.call_method(self.w_buffer, func_name,
                                     space.newint(self.chunk_size))
@@ -622,7 +668,7 @@
         eof = input_buf.getlength() == 0
         w_decoded = space.call_method(self.w_decoder, "decode",
                                       w_input, space.newbool(eof))
-        self._set_decoded(space, w_decoded)
+        self.decoded.set(space, w_decoded)
         if space.len_w(w_decoded) > 0:
             eof = False
 
@@ -635,10 +681,10 @@
         return not eof
 
     def _ensure_data(self, space):
-        while not self._has_data():
+        while not self.decoded.has_data():
             try:
                 if not self._read_chunk(space):
-                    self._unset_decoded()
+                    self.decoded.reset()
                     self.snapshot = None
                     return False
             except OperationError as e:
@@ -671,7 +717,7 @@
             w_bytes = space.call_method(self.w_buffer, "read")
             w_decoded = space.call_method(self.w_decoder, "decode", w_bytes, 
space.w_True)
             check_decoded(space, w_decoded)
-            w_result = space.newunicode(self._get_decoded_chars(-1))
+            w_result = space.newunicode(self.decoded.get_chars(-1))
             w_final = space.add(w_result, w_decoded)
             self.snapshot = None
             return w_final
@@ -683,83 +729,79 @@
         while remaining > 0:
             if not self._ensure_data(space):
                 break
-            data = self._get_decoded_chars(remaining)
+            data = self.decoded.get_chars(remaining)
             builder.append(data)
             remaining -= len(data)
 
         return space.newunicode(builder.build())
 
+    def _scan_line_ending(self, limit):
+        if self.readuniversal:
+            return self.decoded.find_newline_universal(limit)
+        else:
+            if self.readtranslate:
+                # Newlines are already translated, only search for \n
+                newline = u'\n'
+            else:
+                # Non-universal mode.
+                newline = self.readnl
+            if newline == u'\r\n':
+                return self.decoded.find_crlf(limit)
+            else:
+                return self.decoded.find_char(newline[0], limit)
+
     def readline_w(self, space, w_limit=None):
         self._check_attached(space)
         self._check_closed(space)
         self._writeflush(space)
 
         limit = convert_size(space, w_limit)
-
-        line = None
-        remaining = None
+        remnant = None
         builder = UnicodeBuilder()
-
         while True:
             # First, get some data if necessary
             has_data = self._ensure_data(space)
             if not has_data:
                 # end of file
-                start = endpos = offset_to_buffer = 0
+                if remnant:
+                    builder.append(remnant)
                 break
 
-            if not remaining:
-                line = self.decoded_chars
-                start = self.decoded_chars_used
-                offset_to_buffer = 0
+            if remnant:
+                assert not self.readtranslate and self.readnl == u'\r\n'
+                assert self.decoded.pos == 0
+                if remnant == u'\r' and self.decoded.text[0] == u'\n':
+                    builder.append(u'\r\n')
+                    self.decoded.pos = 1
+                    remnant = None
+                    break
+                else:
+                    builder.append(remnant)
+                    remnant = None
+                    continue
+
+            if limit > 0:
+                remaining = limit - builder.getlength()
+                assert remaining >= 0
             else:
-                assert self.decoded_chars_used == 0
-                line = remaining + self.decoded_chars
-                start = 0
-                offset_to_buffer = len(remaining)
-                remaining = None
+                remaining = -1
+            start = self.decoded.pos
+            assert start >= 0
+            found = self._scan_line_ending(remaining)
+            end_scan = self.decoded.pos
+            if end_scan > start:
+                s = self.decoded.text[start:end_scan]
+                builder.append(s)
 
-            line_len = len(line)
-            endpos, consumed = self._find_line_ending(line, start, line_len)
-            chunked = builder.getlength()
-            if endpos >= 0:
-                if limit >= 0 and endpos >= start + limit - chunked:
-                    endpos = start + limit - chunked
-                    assert endpos >= 0
-                break
-            assert consumed >= 0
-
-            # We can put aside up to `endpos`
-            endpos = consumed + start
-            if limit >= 0 and endpos >= start + limit - chunked:
-                # Didn't find line ending, but reached length limit
-                endpos = start + limit - chunked
-                assert endpos >= 0
+            if found or (limit >= 0 and builder.getlength() >= limit):
                 break
 
-            # No line ending seen yet - put aside current data
-            if endpos > start:
-                s = line[start:endpos]
-                builder.append(s)
-
-            # There may be some remaining bytes we'll have to prepend to the
+            # There may be some remaining chars we'll have to prepend to the
             # next chunk of data
-            if endpos < line_len:
-                remaining = line[endpos:]
-            line = None
+            if not self.decoded.exhausted():
+                remnant = self.decoded.get_chars(-1)
             # We have consumed the buffer
-            self._unset_decoded()
-
-        if line:
-            # Our line ends in the current buffer
-            decoded_chars_used = endpos - offset_to_buffer
-            assert decoded_chars_used >= 0
-            self.decoded_chars_used = decoded_chars_used
-            if start > 0 or endpos < len(line):
-                line = line[start:endpos]
-            builder.append(line)
-        elif remaining:
-            builder.append(remaining)
+            self.decoded.reset()
 
         result = builder.build()
         return space.newunicode(result)
@@ -903,7 +945,7 @@
                 self._unsupportedoperation(
                     space, "can't do nonzero end-relative seeks")
             space.call_method(self, "flush")
-            self._unset_decoded()
+            self.decoded.reset()
             self.snapshot = None
             if self.w_decoder:
                 space.call_method(self.w_decoder, "reset")
@@ -933,7 +975,7 @@
         # Seek back to the safe start point
         space.call_method(self.w_buffer, "seek", 
space.newint(cookie.start_pos))
 
-        self._unset_decoded()
+        self.decoded.reset()
         self.snapshot = None
 
         # Restore the decoder to its state from the safe start point.
@@ -954,13 +996,13 @@
 
             w_decoded = space.call_method(self.w_decoder, "decode",
                                           w_chunk, 
space.newbool(bool(cookie.need_eof)))
-            self._set_decoded(space, w_decoded)
+            self.decoded.set(space, w_decoded)
 
             # Skip chars_to_skip of the decoded characters
-            if len(self.decoded_chars) < cookie.chars_to_skip:
+            if len(self.decoded.text) < cookie.chars_to_skip:
                 raise oefmt(space.w_IOError,
                             "can't restore logical file position")
-            self.decoded_chars_used = cookie.chars_to_skip
+            self.decoded.pos = cookie.chars_to_skip
         else:
             self.snapshot = PositionSnapshot(cookie.dec_flags, "")
 
@@ -987,7 +1029,7 @@
         w_pos = space.call_method(self.w_buffer, "tell")
 
         if self.w_decoder is None or self.snapshot is None:
-            assert not self.decoded_chars
+            assert not self.decoded.text
             return w_pos
 
         cookie = PositionCookie(space.bigint_w(w_pos))
@@ -998,11 +1040,11 @@
         cookie.start_pos -= len(input)
 
         # How many decoded characters have been used up since the snapshot?
-        if not self.decoded_chars_used:
+        if not self.decoded.pos:
             # We haven't moved from the snapshot point.
             return space.newlong_from_rbigint(cookie.pack())
 
-        chars_to_skip = self.decoded_chars_used
+        chars_to_skip = self.decoded.pos
 
         # Starting from the snapshot position, we will walk the decoder
         # forward until it gives us enough decoded characters.
diff --git a/pypy/module/_io/test/test_interp_textio.py 
b/pypy/module/_io/test/test_interp_textio.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_io/test/test_interp_textio.py
@@ -0,0 +1,68 @@
+import pytest
+try:
+    from hypothesis import given, strategies as st, assume
+except ImportError:
+    pytest.skip("hypothesis required")
+from pypy.module._io.interp_bytesio import W_BytesIO
+from pypy.module._io.interp_textio import W_TextIOWrapper, DecodeBuffer
+
+LINESEP = ['', '\r', '\n', '\r\n']
+
+@st.composite
+def text_with_newlines(draw):
+    sep = draw(st.sampled_from(LINESEP))
+    lines = draw(st.lists(st.text(max_size=10), max_size=10))
+    return sep.join(lines)
+
+@given(txt=text_with_newlines(),
+       mode=st.sampled_from(['\r', '\n', '\r\n', '']),
+       limit=st.integers(min_value=-1))
+def test_readline(space, txt, mode, limit):
+    assume(limit != 0)
+    w_stream = W_BytesIO(space)
+    w_stream.descr_init(space, space.newbytes(txt.encode('utf-8')))
+    w_textio = W_TextIOWrapper(space)
+    w_textio.descr_init(
+        space, w_stream, encoding='utf-8',
+        w_newline=space.newtext(mode))
+    lines = []
+    while True:
+        line = space.unicode_w(w_textio.readline_w(space, space.newint(limit)))
+        if limit > 0:
+            assert len(line) <= limit
+        if line:
+            lines.append(line)
+        else:
+            break
+    assert u''.join(lines) == txt
+
+@given(st.text())
+def test_read_buffer(text):
+    buf = DecodeBuffer(text)
+    assert buf.get_chars(-1) == text
+    assert buf.exhausted()
+
+@given(st.text(), st.lists(st.integers(min_value=0)))
+def test_readn_buffer(text, sizes):
+    buf = DecodeBuffer(text)
+    strings = []
+    for n in sizes:
+        s = buf.get_chars(n)
+        if not buf.exhausted():
+            assert len(s) == n
+        else:
+            assert len(s) <= n
+        strings.append(s)
+    assert ''.join(strings) == text[:sum(sizes)]
+
+@given(st.text())
+def test_next_char(text):
+    buf = DecodeBuffer(text)
+    chars = []
+    try:
+        while True:
+            chars.append(buf.next_char())
+    except StopIteration:
+        pass
+    assert buf.exhausted()
+    assert u''.join(chars) == text
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to