Author: Amaury Forgeot d'Arc <amaur...@gmail.com> Branch: py3k Changeset: r48039:3f9d7e9aa6b6 Date: 2011-10-14 00:12 +0200 http://bitbucket.org/pypy/pypy/changeset/3f9d7e9aa6b6/
Log: utf-8 codec now disallows lone surrogates the surrogateescape error handler can be used for round-tripping unicode strings with lone surrogates. diff --git a/pypy/module/_codecs/interp_codecs.py b/pypy/module/_codecs/interp_codecs.py --- a/pypy/module/_codecs/interp_codecs.py +++ b/pypy/module/_codecs/interp_codecs.py @@ -33,11 +33,14 @@ space.wrap(endpos), space.wrap(reason)) w_res = space.call_function(w_errorhandler, w_exc) - if (not space.is_true(space.isinstance(w_res, space.w_tuple)) + if (not space.isinstance_w(w_res, space.w_tuple) or space.len_w(w_res) != 2 - or not space.is_true(space.isinstance( - space.getitem(w_res, space.wrap(0)), - space.w_unicode))): + or not (space.isinstance_w( + space.getitem(w_res, space.wrap(0)), + space.w_unicode) or + (not decode and space.isinstance_w( + space.getitem(w_res, space.wrap(0)), + space.w_bytes)))): if decode: msg = ("decoding error handler must return " "(unicode, int) tuple, not %s") @@ -60,8 +63,9 @@ return replace, newpos else: from pypy.objspace.std.unicodetype import encode_object - w_str = encode_object(space, w_replace, encoding, None) - replace = space.bytes_w(w_str) + if space.isinstance_w(w_replace, space.w_unicode): + w_replace = encode_object(space, w_replace, encoding, None) + replace = space.bytes_w(w_replace) return replace, newpos return unicode_call_errorhandler @@ -246,11 +250,51 @@ raise operationerrfmt(space.w_TypeError, "don't know how to handle %s in error callback", typename) +def surrogateescape_errors(space, w_exc): + check_exception(space, w_exc) + if space.isinstance_w(w_exc, space.w_UnicodeEncodeError): + obj = space.realunicode_w(space.getattr(w_exc, space.wrap('object'))) + start = space.int_w(space.getattr(w_exc, space.wrap('start'))) + w_end = space.getattr(w_exc, space.wrap('end')) + end = space.int_w(w_end) + res = '' + pos = start + while pos < end: + ch = ord(obj[pos]) + pos += 1 + if ch < 0xdc80 or ch > 0xdcff: + # Not a UTF-8b surrogate, fail with original exception + raise OperationError(space.type(w_exc), w_exc) + res += chr(ch - 0xdc00) + return space.newtuple([space.wrapbytes(res), w_end]) + elif space.isinstance_w(w_exc, space.w_UnicodeDecodeError): + consumed = 0 + start = space.int_w(space.getattr(w_exc, space.wrap('start'))) + end = space.int_w(space.getattr(w_exc, space.wrap('end'))) + obj = space.bytes_w(space.getattr(w_exc, space.wrap('object'))) + replace = u'' + while consumed < 4 and consumed < end - start: + c = ord(obj[start+consumed]) + if c < 128: + # Refuse to escape ASCII bytes. + break + replace += unichr(0xdc00 + c) + consumed += 1 + if not consumed: + # codec complained about ASCII byte. + raise OperationError(space.type(w_exc), w_exc) + return space.newtuple([space.wrap(replace), + space.wrap(start + consumed)]) + else: + typename = space.type(w_exc).getname(space) + raise operationerrfmt(space.w_TypeError, + "don't know how to handle %s in error callback", typename) + def register_builtin_error_handlers(space): "NOT_RPYTHON" state = space.fromcache(CodecState) for error in ("strict", "ignore", "replace", "xmlcharrefreplace", - "backslashreplace"): + "backslashreplace", "surrogateescape"): name = error + "_errors" state.codec_error_registry[error] = space.wrap(interp2app(globals()[name])) diff --git a/pypy/module/_codecs/test/test_codecs.py b/pypy/module/_codecs/test/test_codecs.py --- a/pypy/module/_codecs/test/test_codecs.py +++ b/pypy/module/_codecs/test/test_codecs.py @@ -483,6 +483,10 @@ def test_backslahreplace(self): assert u'a\xac\u1234\u20ac\u8000'.encode('ascii', 'backslashreplace') == 'a\\xac\u1234\u20ac\u8000' + def test_surrogateescape(self): + assert b'a\x80b'.decode('utf-8', 'surrogateescape') == 'a\udc80b' + assert 'a\udc80b'.encode('utf-8', 'surrogateescape') == b'a\x80b' + def test_badhandler(self): import codecs results = ( 42, u"foo", (1,2,3), (u"foo", 1, 3), (u"foo", None), (u"foo",), ("foo", 1, 3), ("foo", None), ("foo",) ) diff --git a/pypy/rlib/runicode.py b/pypy/rlib/runicode.py --- a/pypy/rlib/runicode.py +++ b/pypy/rlib/runicode.py @@ -255,10 +255,10 @@ def unicode_encode_utf_8(s, size, errors, errorhandler=None): assert(size >= 0) result = StringBuilder(size) - i = 0 - while i < size: - ch = ord(s[i]) - i += 1 + pos = 0 + while pos < size: + ch = ord(s[pos]) + pos += 1 if ch < 0x80: # Encode ASCII result.append(chr(ch)) @@ -270,19 +270,19 @@ # Encode UCS2 Unicode ordinals if ch < 0x10000: # Special case: check for high surrogate - if 0xD800 <= ch <= 0xDBFF and i != size: - ch2 = ord(s[i]) + if 0xD800 <= ch <= 0xDBFF and pos != size: + ch2 = ord(s[pos]) # Check for low surrogate and combine the two to # form a UCS4 value if 0xDC00 <= ch2 <= 0xDFFF: ch3 = ((ch - 0xD800) << 10 | (ch2 - 0xDC00)) + 0x10000 - i += 1 + pos += 1 _encodeUCS4(result, ch3) continue - # Fall through: handles isolated high surrogates - result.append((chr((0xe0 | (ch >> 12))))) - result.append((chr((0x80 | ((ch >> 6) & 0x3f))))) - result.append((chr((0x80 | (ch & 0x3f))))) + r, pos = errorhandler(errors, 'utf-8', + 'surrogates not allowed', + s, pos-1, pos) + result.append(r) continue else: _encodeUCS4(result, ch) _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit