Raghuram Devarakonda added the comment:

I am attaching another  patch (io2.diff). Please review. I am not sure
whether _adjust_chunk() should also adjust "readahead".

BTW, PEP 3116 says:

"If universal newlines without translation are requested on input (i.e.
newline=''), if a system read operation returns a buffer ending in '\r',
another system read operation is done to determine whether it is
followed by '\n' or not. In universal newlines mode with translation,
the second system read operation may be postponed until the next read
request, and if the following system read operation returns a buffer
starting with '\n', that character is simply discarded."

I suppose this issue is mainly talking about the latter (newline is
None). I don't understand what is meant by "enabling universal new line
mode without translation". Isn't the purpose of enabling universal new
line mode is to translate line endings? I may be missing something
basic, of course.

Added file: http://bugs.python.org/file8706/io2.diff

__________________________________
Tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue1395>
__________________________________
Index: Lib/io.py
===================================================================
--- Lib/io.py	(revision 58902)
+++ Lib/io.py	(working copy)
@@ -1075,7 +1075,8 @@
         self._pending = ""
         self._snapshot = None
         self._seekable = self._telling = self.buffer.seekable()
-
+        self._nl_straddle = False
+        
     @property
     def encoding(self):
         return self._encoding
@@ -1136,16 +1137,31 @@
         decoder = self._decoder = make_decoder()  # XXX: errors
         return decoder
 
+    def _adjust_chunk(self, readahead, pending):
+        if self._readtranslate:
+            if self._nl_straddle and pending and pending[0] == "\n":
+                pending = pending[1:]
+                # readahead = readahead[1:]
+                self._nl_straddle = False
+            if pending and pending[-1] == "\r":
+                self._nl_straddle = True
+            else:
+                self._nl_straddle = False
+
+        return readahead, pending
+        
     def _read_chunk(self):
         if self._decoder is None:
             raise ValueError("no decoder")
         if not self._telling:
             readahead = self.buffer.read1(self._CHUNK_SIZE)
             pending = self._decoder.decode(readahead, not readahead)
-            return readahead, pending
+            return self._adjust_chunk(readahead, pending)
+        
         decoder_buffer, decoder_state = self._decoder.getstate()
         readahead = self.buffer.read1(self._CHUNK_SIZE)
         pending = self._decoder.decode(readahead, not readahead)
+        readahead, pending = self._adjust_chunk(readahead, pending)
         self._snapshot = (decoder_state, decoder_buffer + readahead, pending)
         return readahead, pending
 
@@ -1244,6 +1260,10 @@
         res = self._pending
         if n < 0:
             res += decoder.decode(self.buffer.read(), True)
+            if self._readtranslate:
+                if self._nl_straddle and res and res[0] == "\n":
+                    res = res[1:]
+                    self._nl_straddle = False
             self._pending = ""
             self._snapshot = None
             return self._replacenl(res)
@@ -1253,8 +1273,9 @@
                 res += pending
                 if not readahead:
                     break
+            res = self._replacenl(res)
             self._pending = res[n:]
-            return self._replacenl(res[:n])
+            return res[:n]
 
     def __next__(self):
         self._telling = False
Index: Lib/test/test_io.py
===================================================================
--- Lib/test/test_io.py	(revision 58902)
+++ Lib/test/test_io.py	(working copy)
@@ -485,6 +485,10 @@
 
 class TextIOWrapperTest(unittest.TestCase):
 
+    def setUp(self):
+        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ASCII")
+
     def tearDown(self):
         test_support.unlink(test_support.TESTFN)
 
@@ -741,7 +745,59 @@
                 print("Reading using readline(): %6.3f seconds" % (t3-t2))
                 print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
 
+    def test_issue1395_1(self):
+        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
 
+        # read one char at a time
+        reads = ""
+        while True:
+            c = txt.read(1)
+            if not c:
+                break
+            reads += c
+        self.assertEquals(reads, self.normalized)
+
+    def test_issue1395_2(self):
+        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+        txt._CHUNK_SIZE = 4
+
+        reads = ""
+        while True:
+            c = txt.read(4)
+            if not c:
+                break
+            reads += c
+        self.assertEquals(reads, self.normalized)
+
+    def test_issue1395_3(self):
+        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        reads += txt.read(4)
+        reads += txt.readline()
+        reads += txt.readline()
+        reads += txt.readline()
+        self.assertEquals(reads, self.normalized)
+
+    def test_issue1395_4(self):
+        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        reads += txt.read()
+        self.assertEquals(reads, self.normalized)
+
+    def test_issue1395_5(self):
+        txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        pos = txt.tell()
+        txt.seek(0)
+        txt.seek(pos)
+        self.assertEquals(txt.read(4), "BBB\n")
+
 # XXX Tests for open()
 
 class MiscIOTest(unittest.TestCase):
_______________________________________________
Python-bugs-list mailing list 
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to