Author: Amaury Forgeot d'Arc <[email protected]>
Branch: stdlib-2.7.9
Changeset: r75523:88b9caf2a309
Date: 2015-01-25 23:13 +0100
http://bitbucket.org/pypy/pypy/changeset/88b9caf2a309/

Log:    Add SSLSocket.tls_unique_cb()

diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py
--- a/pypy/module/_ssl/interp_ssl.py
+++ b/pypy/module/_ssl/interp_ssl.py
@@ -571,6 +571,27 @@
             return space.w_None
         return space.wrap(rffi.charp2str(version))
 
+    def tls_unique_cb_w(self, space):
+        """Returns the 'tls-unique' channel binding data, as defined by RFC 
5929.
+        If the TLS handshake is not yet complete, None is returned"""
+
+        # In case of 'tls-unique' it will be 12 bytes for TLS, 36
+        # bytes for older SSL, but let's be safe
+        CB_MAXLEN = 128
+
+        with lltype.scoped_alloc(rffi.CCHARP.TO, CB_MAXLEN) as buf:
+            if (libssl_SSL_session_reused(self.ssl) ^ 
+                (self.socket_type == PY_SSL_CLIENT)):
+                # if session is resumed XOR we are the client
+                length = libssl_SSL_get_finished(self.ssl, buf, CB_MAXLEN)
+            else:
+                # if a new session XOR we are the server
+                length = libssl_SSL_get_peer_finished(self.ssl, buf, CB_MAXLEN)
+            
+            if length > 0:
+                return space.wrap(rffi.charpsize2str(buf, intmask(length)))
+
+
 _SSLSocket.typedef = TypeDef(
     "_ssl._SSLSocket",
 
@@ -584,6 +605,7 @@
     selected_npn_protocol = interp2app(_SSLSocket.selected_npn_protocol),
     compression = interp2app(_SSLSocket.compression_w),
     version = interp2app(_SSLSocket.version_w),
+    tls_unique_cb = interp2app(_SSLSocket.tls_unique_cb_w),
 )
 
 
diff --git a/pypy/module/_ssl/test/test_ssl.py 
b/pypy/module/_ssl/test/test_ssl.py
--- a/pypy/module/_ssl/test/test_ssl.py
+++ b/pypy/module/_ssl/test/test_ssl.py
@@ -231,6 +231,22 @@
         self.s.close()
         del ss; gc.collect()
 
+    def test_tls_unique_cb(self):
+        import ssl, sys, gc
+        ss = ssl.wrap_socket(self.s)
+        ss.do_handshake()
+        assert isinstance(ss.get_channel_binding(), bytes)
+        self.s.close()
+        del ss; gc.collect()
+
+    def test_compression(self):
+        import ssl, sys, gc
+        ss = ssl.wrap_socket(self.s)
+        ss.do_handshake()
+        assert ss.compression() in [None, 'ZLIB', 'RLE']
+        self.s.close()
+        del ss; gc.collect()
+
 
 class AppTestConnectedSSL_Timeout(AppTestConnectedSSL):
     # Same tests, with a socket timeout
diff --git a/rpython/rlib/ropenssl.py b/rpython/rlib/ropenssl.py
--- a/rpython/rlib/ropenssl.py
+++ b/rpython/rlib/ropenssl.py
@@ -282,6 +282,9 @@
 ssl_external('SSL_get_shutdown', [SSL], rffi.INT)
 ssl_external('SSL_set_read_ahead', [SSL, rffi.INT], lltype.Void)
 ssl_external('SSL_set_tlsext_host_name', [SSL, rffi.CCHARP], rffi.INT, 
macro=True)
+ssl_external('SSL_session_reused', [SSL], rffi.INT, macro=True)
+ssl_external('SSL_get_finished', [SSL, rffi.CCHARP, rffi.SIZE_T], rffi.SIZE_T)
+ssl_external('SSL_get_peer_finished', [SSL, rffi.CCHARP, rffi.SIZE_T], 
rffi.SIZE_T)
 ssl_external('SSL_get_current_compression', [SSL], COMP_METHOD)
 ssl_external('SSL_get_version', [SSL], rffi.CCHARP)
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to