Raghuram Devarakonda added the comment:
I am attaching another patch (io2.diff). Please review. I am not sure
whether _adjust_chunk() should also adjust "readahead".
BTW, PEP 3116 says:
"If universal newlines without translation are requested on input (i.e.
newline=''), if a system read operation returns a buffer ending in '\r',
another system read operation is done to determine whether it is
followed by '\n' or not. In universal newlines mode with translation,
the second system read operation may be postponed until the next read
request, and if the following system read operation returns a buffer
starting with '\n', that character is simply discarded."
I suppose this issue is mainly talking about the latter (newline is
None). I don't understand what is meant by "enabling universal new line
mode without translation". Isn't the purpose of enabling universal new
line mode is to translate line endings? I may be missing something
basic, of course.
Added file: http://bugs.python.org/file8706/io2.diff
__________________________________
Tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue1395>
__________________________________
Index: Lib/io.py
===================================================================
--- Lib/io.py (revision 58902)
+++ Lib/io.py (working copy)
@@ -1075,7 +1075,8 @@
self._pending = ""
self._snapshot = None
self._seekable = self._telling = self.buffer.seekable()
-
+ self._nl_straddle = False
+
@property
def encoding(self):
return self._encoding
@@ -1136,16 +1137,31 @@
decoder = self._decoder = make_decoder() # XXX: errors
return decoder
+ def _adjust_chunk(self, readahead, pending):
+ if self._readtranslate:
+ if self._nl_straddle and pending and pending[0] == "\n":
+ pending = pending[1:]
+ # readahead = readahead[1:]
+ self._nl_straddle = False
+ if pending and pending[-1] == "\r":
+ self._nl_straddle = True
+ else:
+ self._nl_straddle = False
+
+ return readahead, pending
+
def _read_chunk(self):
if self._decoder is None:
raise ValueError("no decoder")
if not self._telling:
readahead = self.buffer.read1(self._CHUNK_SIZE)
pending = self._decoder.decode(readahead, not readahead)
- return readahead, pending
+ return self._adjust_chunk(readahead, pending)
+
decoder_buffer, decoder_state = self._decoder.getstate()
readahead = self.buffer.read1(self._CHUNK_SIZE)
pending = self._decoder.decode(readahead, not readahead)
+ readahead, pending = self._adjust_chunk(readahead, pending)
self._snapshot = (decoder_state, decoder_buffer + readahead, pending)
return readahead, pending
@@ -1244,6 +1260,10 @@
res = self._pending
if n < 0:
res += decoder.decode(self.buffer.read(), True)
+ if self._readtranslate:
+ if self._nl_straddle and res and res[0] == "\n":
+ res = res[1:]
+ self._nl_straddle = False
self._pending = ""
self._snapshot = None
return self._replacenl(res)
@@ -1253,8 +1273,9 @@
res += pending
if not readahead:
break
+ res = self._replacenl(res)
self._pending = res[n:]
- return self._replacenl(res[:n])
+ return res[:n]
def __next__(self):
self._telling = False
Index: Lib/test/test_io.py
===================================================================
--- Lib/test/test_io.py (revision 58902)
+++ Lib/test/test_io.py (working copy)
@@ -485,6 +485,10 @@
class TextIOWrapperTest(unittest.TestCase):
+ def setUp(self):
+ self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+ self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ASCII")
+
def tearDown(self):
test_support.unlink(test_support.TESTFN)
@@ -741,7 +745,59 @@
print("Reading using readline(): %6.3f seconds" % (t3-t2))
print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
+ def test_issue1395_1(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+ # read one char at a time
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_2(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+ txt._CHUNK_SIZE = 4
+
+ reads = ""
+ while True:
+ c = txt.read(4)
+ if not c:
+ break
+ reads += c
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_3(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read(4)
+ reads += txt.readline()
+ reads += txt.readline()
+ reads += txt.readline()
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_4(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read()
+ self.assertEquals(reads, self.normalized)
+
+ def test_issue1395_5(self):
+ txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ASCII")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ pos = txt.tell()
+ txt.seek(0)
+ txt.seek(pos)
+ self.assertEquals(txt.read(4), "BBB\n")
+
# XXX Tests for open()
class MiscIOTest(unittest.TestCase):
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com