Author: Amaury Forgeot d'Arc <amaur...@gmail.com>
Branch: py3k
Changeset: r48039:3f9d7e9aa6b6
Date: 2011-10-14 00:12 +0200
http://bitbucket.org/pypy/pypy/changeset/3f9d7e9aa6b6/

Log:    utf-8 codec now disallows lone surrogates the surrogateescape error
        handler can be used for round-tripping unicode strings with lone
        surrogates.

diff --git a/pypy/module/_codecs/interp_codecs.py 
b/pypy/module/_codecs/interp_codecs.py
--- a/pypy/module/_codecs/interp_codecs.py
+++ b/pypy/module/_codecs/interp_codecs.py
@@ -33,11 +33,14 @@
                 space.wrap(endpos),
                 space.wrap(reason))
             w_res = space.call_function(w_errorhandler, w_exc)
-            if (not space.is_true(space.isinstance(w_res, space.w_tuple))
+            if (not space.isinstance_w(w_res, space.w_tuple)
                 or space.len_w(w_res) != 2
-                or not space.is_true(space.isinstance(
-                                 space.getitem(w_res, space.wrap(0)),
-                                 space.w_unicode))):
+                or not (space.isinstance_w(
+                            space.getitem(w_res, space.wrap(0)),
+                            space.w_unicode) or
+                        (not decode and space.isinstance_w(
+                            space.getitem(w_res, space.wrap(0)),
+                            space.w_bytes)))):
                 if decode:
                     msg = ("decoding error handler must return "
                            "(unicode, int) tuple, not %s")
@@ -60,8 +63,9 @@
                 return replace, newpos
             else:
                 from pypy.objspace.std.unicodetype import encode_object
-                w_str = encode_object(space, w_replace, encoding, None)
-                replace = space.bytes_w(w_str)
+                if space.isinstance_w(w_replace, space.w_unicode):
+                    w_replace = encode_object(space, w_replace, encoding, None)
+                replace = space.bytes_w(w_replace)
                 return replace, newpos
         return unicode_call_errorhandler
 
@@ -246,11 +250,51 @@
         raise operationerrfmt(space.w_TypeError,
             "don't know how to handle %s in error callback", typename)
 
+def surrogateescape_errors(space, w_exc):
+    check_exception(space, w_exc)
+    if space.isinstance_w(w_exc, space.w_UnicodeEncodeError):
+        obj = space.realunicode_w(space.getattr(w_exc, space.wrap('object')))
+        start = space.int_w(space.getattr(w_exc, space.wrap('start')))
+        w_end = space.getattr(w_exc, space.wrap('end'))
+        end = space.int_w(w_end)
+        res = ''
+        pos = start
+        while pos < end:
+            ch = ord(obj[pos])
+            pos += 1
+            if ch < 0xdc80 or ch > 0xdcff:
+                # Not a UTF-8b surrogate, fail with original exception
+                raise OperationError(space.type(w_exc), w_exc)
+            res += chr(ch - 0xdc00)
+        return space.newtuple([space.wrapbytes(res), w_end])
+    elif space.isinstance_w(w_exc, space.w_UnicodeDecodeError):
+        consumed = 0
+        start = space.int_w(space.getattr(w_exc, space.wrap('start')))
+        end = space.int_w(space.getattr(w_exc, space.wrap('end')))
+        obj = space.bytes_w(space.getattr(w_exc, space.wrap('object')))
+        replace = u''
+        while consumed < 4 and consumed < end - start:
+            c = ord(obj[start+consumed])
+            if c < 128:
+                # Refuse to escape ASCII bytes.
+                break
+            replace += unichr(0xdc00 + c)
+            consumed += 1
+        if not consumed:
+            # codec complained about ASCII byte.
+            raise OperationError(space.type(w_exc), w_exc)
+        return space.newtuple([space.wrap(replace),
+                               space.wrap(start + consumed)])
+    else:
+        typename = space.type(w_exc).getname(space)
+        raise operationerrfmt(space.w_TypeError,
+            "don't know how to handle %s in error callback", typename)
+
 def register_builtin_error_handlers(space):
     "NOT_RPYTHON"
     state = space.fromcache(CodecState)
     for error in ("strict", "ignore", "replace", "xmlcharrefreplace",
-                  "backslashreplace"):
+                  "backslashreplace", "surrogateescape"):
         name = error + "_errors"
         state.codec_error_registry[error] = 
space.wrap(interp2app(globals()[name]))
 
diff --git a/pypy/module/_codecs/test/test_codecs.py 
b/pypy/module/_codecs/test/test_codecs.py
--- a/pypy/module/_codecs/test/test_codecs.py
+++ b/pypy/module/_codecs/test/test_codecs.py
@@ -483,6 +483,10 @@
     def test_backslahreplace(self):
         assert u'a\xac\u1234\u20ac\u8000'.encode('ascii', 'backslashreplace') 
== 'a\\xac\u1234\u20ac\u8000'
 
+    def test_surrogateescape(self):
+        assert b'a\x80b'.decode('utf-8', 'surrogateescape') == 'a\udc80b'
+        assert 'a\udc80b'.encode('utf-8', 'surrogateescape') == b'a\x80b'
+
     def test_badhandler(self):
         import codecs
         results = ( 42, u"foo", (1,2,3), (u"foo", 1, 3), (u"foo", None), 
(u"foo",), ("foo", 1, 3), ("foo", None), ("foo",) )
diff --git a/pypy/rlib/runicode.py b/pypy/rlib/runicode.py
--- a/pypy/rlib/runicode.py
+++ b/pypy/rlib/runicode.py
@@ -255,10 +255,10 @@
 def unicode_encode_utf_8(s, size, errors, errorhandler=None):
     assert(size >= 0)
     result = StringBuilder(size)
-    i = 0
-    while i < size:
-        ch = ord(s[i])
-        i += 1
+    pos = 0
+    while pos < size:
+        ch = ord(s[pos])
+        pos += 1
         if ch < 0x80:
             # Encode ASCII
             result.append(chr(ch))
@@ -270,19 +270,19 @@
             # Encode UCS2 Unicode ordinals
             if ch < 0x10000:
                 # Special case: check for high surrogate
-                if 0xD800 <= ch <= 0xDBFF and i != size:
-                    ch2 = ord(s[i])
+                if 0xD800 <= ch <= 0xDBFF and pos != size:
+                    ch2 = ord(s[pos])
                     # Check for low surrogate and combine the two to
                     # form a UCS4 value
                     if 0xDC00 <= ch2 <= 0xDFFF:
                         ch3 = ((ch - 0xD800) << 10 | (ch2 - 0xDC00)) + 0x10000
-                        i += 1
+                        pos += 1
                         _encodeUCS4(result, ch3)
                         continue
-                # Fall through: handles isolated high surrogates
-                result.append((chr((0xe0 | (ch >> 12)))))
-                result.append((chr((0x80 | ((ch >> 6) & 0x3f)))))
-                result.append((chr((0x80 | (ch & 0x3f)))))
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'surrogates not allowed',
+                                      s, pos-1, pos)
+                result.append(r)
                 continue
             else:
                 _encodeUCS4(result, ch)
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to