Author: Richard Plangger <[email protected]>
Branch: py3.5-ssl
Changeset: r88373:5cfd2ef1d12f
Date: 2016-11-14 14:27 +0100
http://bitbucket.org/pypy/pypy/changeset/5cfd2ef1d12f/
Log: checkin of more changes to the cffi ssl module expose the raw
address of the RawBuffer.buf field (as it is done for
bytearrayobjects) allow only one cffi module to be built (added
--only cli param)
diff too long, truncating to 2000 out of 2604 lines
diff --git a/lib-python/3/test/test_ssl.py b/lib-python/3/test/test_ssl.py
--- a/lib-python/3/test/test_ssl.py
+++ b/lib-python/3/test/test_ssl.py
@@ -23,6 +23,9 @@
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
+IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
+IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
+
def data_file(*name):
return os.path.join(os.path.dirname(__file__), *name)
@@ -54,12 +57,15 @@
SIGNED_CERTFILE = data_file("keycert3.pem")
SIGNED_CERTFILE2 = data_file("keycert4.pem")
SIGNING_CA = data_file("pycacert.pem")
-
-SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
+# cert with all kinds of subject alt names
+ALLSANFILE = data_file("allsans.pem")
+
+REMOTE_HOST = "self-signed.pythontest.net"
+REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
EMPTYCERT = data_file("nullcert.pem")
BADCERT = data_file("badcert.pem")
-WRONGCERT = data_file("XXXnonexisting.pem")
+NONEXISTINGCERT = data_file("XXXnonexisting.pem")
BADKEY = data_file("badkey.pem")
NOKIACERT = data_file("nokia.pem")
NULLBYTECERT = data_file("nullbytecert.pem")
@@ -142,8 +148,8 @@
def test_str_for_enums(self):
# Make sure that the PROTOCOL_* constants have enum-like string
# reprs.
- proto = ssl.PROTOCOL_SSLv23
- self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
+ proto = ssl.PROTOCOL_TLS
+ self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
ctx = ssl.SSLContext(proto)
self.assertIs(ctx.protocol, proto)
@@ -275,8 +281,29 @@
self.assertEqual(p['subjectAltName'], san)
+ def test_parse_all_sans(self):
+ p = ssl._ssl._test_decode_cert(ALLSANFILE)
+ self.assertEqual(p['subjectAltName'],
+ (
+ ('DNS', 'allsans'),
+ ('othername', '<unsupported>'),
+ ('othername', '<unsupported>'),
+ ('email', '[email protected]'),
+ ('DNS', 'www.example.org'),
+ ('DirName',
+ ((('countryName', 'XY'),),
+ (('localityName', 'Castle Anthrax'),),
+ (('organizationName', 'Python Software Foundation'),),
+ (('commonName', 'dirname example'),))),
+ ('URI', 'https://www.python.org/'),
+ ('IP Address', '127.0.0.1'),
+ ('IP Address', '0:0:0:0:0:0:0:1\n'),
+ ('Registered ID', '1.2.3.4.5')
+ )
+ )
+
def test_DER_to_PEM(self):
- with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
+ with open(CAFILE_CACERT, 'r') as f:
pem = f.read()
d1 = ssl.PEM_cert_to_DER_cert(pem)
p2 = ssl.DER_cert_to_PEM_cert(d1)
@@ -311,8 +338,8 @@
self.assertGreaterEqual(status, 0)
self.assertLessEqual(status, 15)
# Version string as returned by {Open,Libre}SSL, the format might
change
- if "LibreSSL" in s:
- self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major,
minor)),
+ if IS_LIBRESSL:
+ self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
(s, t, hex(n)))
else:
self.assertTrue(s.startswith("OpenSSL
{:d}.{:d}.{:d}".format(major, minor, fix)),
@@ -366,17 +393,42 @@
s.connect, (HOST, 8080))
with self.assertRaises(OSError) as cm:
with socket.socket() as sock:
- ssl.wrap_socket(sock, certfile=WRONGCERT)
+ ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaises(OSError) as cm:
with socket.socket() as sock:
- ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
+ ssl.wrap_socket(sock,
+ certfile=CERTFILE, keyfile=NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaises(OSError) as cm:
with socket.socket() as sock:
- ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
+ ssl.wrap_socket(sock,
+ certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
+ def bad_cert_test(self, certfile):
+ """Check that trying to use the given client certificate fails"""
+ certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
+ certfile)
+ sock = socket.socket()
+ self.addCleanup(sock.close)
+ with self.assertRaises(ssl.SSLError):
+ ssl.wrap_socket(sock,
+ certfile=certfile,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+
+ def test_empty_cert(self):
+ """Wrapping with an empty cert file"""
+ self.bad_cert_test("nullcert.pem")
+
+ def test_malformed_cert(self):
+ """Wrapping with a badly formatted certificate (syntax error)"""
+ self.bad_cert_test("badcert.pem")
+
+ def test_malformed_key(self):
+ """Wrapping with a badly formatted key (syntax error)"""
+ self.bad_cert_test("badkey.pem")
+
def test_match_hostname(self):
def ok(cert, hostname):
ssl.match_hostname(cert, hostname)
@@ -764,7 +816,8 @@
def test_constructor(self):
for protocol in PROTOCOLS:
ssl.SSLContext(protocol)
- self.assertRaises(TypeError, ssl.SSLContext)
+ ctx = ssl.SSLContext()
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
self.assertRaises(ValueError, ssl.SSLContext, -1)
self.assertRaises(ValueError, ssl.SSLContext, 42)
@@ -785,17 +838,18 @@
def test_options(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
- self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
- ctx.options)
+ default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
+ if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
+ default |= ssl.OP_NO_COMPRESSION
+ self.assertEqual(default, ctx.options)
ctx.options |= ssl.OP_NO_TLSv1
- self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 |
ssl.OP_NO_TLSv1,
- ctx.options)
+ self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
if can_clear_options():
- ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
- self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
- ctx.options)
+ ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
+ self.assertEqual(default, ctx.options)
ctx.options = 0
- self.assertEqual(0, ctx.options)
+ # Ubuntu has OP_NO_SSLv3 forced on by default
+ self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
else:
with self.assertRaises(ValueError):
ctx.options = 0
@@ -842,7 +896,7 @@
ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
with self.assertRaises(OSError) as cm:
- ctx.load_cert_chain(WRONGCERT)
+ ctx.load_cert_chain(NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
ctx.load_cert_chain(BADCERT)
@@ -862,7 +916,7 @@
# Mismatching key and cert
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
- ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
+ ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
# Password protected key and cert
ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
@@ -927,7 +981,7 @@
self.assertRaises(TypeError, ctx.load_verify_locations)
self.assertRaises(TypeError, ctx.load_verify_locations, None, None,
None)
with self.assertRaises(OSError) as cm:
- ctx.load_verify_locations(WRONGCERT)
+ ctx.load_verify_locations(NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
ctx.load_verify_locations(BADCERT)
@@ -1003,7 +1057,7 @@
self.assertRaises(TypeError, ctx.load_dh_params)
self.assertRaises(TypeError, ctx.load_dh_params, None)
with self.assertRaises(FileNotFoundError) as cm:
- ctx.load_dh_params(WRONGCERT)
+ ctx.load_dh_params(NONEXISTINGCERT)
self.assertEqual(cm.exception.errno, errno.ENOENT)
with self.assertRaises(ssl.SSLError) as cm:
ctx.load_dh_params(CERTFILE)
@@ -1080,7 +1134,7 @@
ctx.load_verify_locations(CERTFILE)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 0, 'crl': 0, 'x509': 1})
- ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+ ctx.load_verify_locations(CAFILE_CACERT)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 1, 'crl': 0, 'x509': 2})
@@ -1090,8 +1144,8 @@
# CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
ctx.load_verify_locations(CERTFILE)
self.assertEqual(ctx.get_ca_certs(), [])
- # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
- ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+ # but CAFILE_CACERT is a CA cert
+ ctx.load_verify_locations(CAFILE_CACERT)
self.assertEqual(ctx.get_ca_certs(),
[{'issuer': ((('organizationName', 'Root CA'),),
(('organizationalUnitName',
'http://www.cacert.org'),),
@@ -1107,7 +1161,7 @@
(('emailAddress', '[email protected]'),)),
'version': 3}])
- with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
+ with open(CAFILE_CACERT) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
self.assertEqual(ctx.get_ca_certs(True), [der])
@@ -1128,6 +1182,7 @@
self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
@unittest.skipIf(sys.platform == "win32", "not-Windows specific")
+ @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
def test_load_default_certs_env(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
with support.EnvironmentVarGuard() as env:
@@ -1345,11 +1400,11 @@
class NetworkedTests(unittest.TestCase):
def test_connect(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
try:
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
self.assertEqual({}, s.getpeercert())
finally:
s.close()
@@ -1358,27 +1413,27 @@
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, ("svn.python.org", 443))
+ s.connect, (REMOTE_HOST, 443))
s.close()
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
+ ca_certs=REMOTE_ROOT_CERT)
try:
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
self.assertTrue(s.getpeercert())
finally:
s.close()
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
+ ca_certs=REMOTE_ROOT_CERT)
try:
- self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
+ self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
self.assertTrue(s.getpeercert())
finally:
s.close()
@@ -1386,14 +1441,14 @@
def test_non_blocking_connect_ex(self):
# Issue #11326: non-blocking connect_ex() should allow handshake
# to proceed after the socket gets ready.
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
+ ca_certs=REMOTE_ROOT_CERT,
do_handshake_on_connect=False)
try:
s.setblocking(False)
- rc = s.connect_ex(('svn.python.org', 443))
+ rc = s.connect_ex((REMOTE_HOST, 443))
# EWOULDBLOCK under Windows, EINPROGRESS elsewhere
self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
# Wait for connect to finish
@@ -1415,58 +1470,62 @@
def test_timeout_connect_ex(self):
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
+ ca_certs=REMOTE_ROOT_CERT,
do_handshake_on_connect=False)
try:
s.settimeout(0.0000001)
- rc = s.connect_ex(('svn.python.org', 443))
+ rc = s.connect_ex((REMOTE_HOST, 443))
if rc == 0:
- self.skipTest("svn.python.org responded too quickly")
+ self.skipTest("REMOTE_HOST responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
finally:
s.close()
def test_connect_ex_error(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
+ ca_certs=REMOTE_ROOT_CERT)
try:
- rc = s.connect_ex(("svn.python.org", 444))
+ rc = s.connect_ex((REMOTE_HOST, 444))
# Issue #19919: Windows machines or VMs hosted on Windows
# machines sometimes return EWOULDBLOCK.
- self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
+ errors = (
+ errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
+ errno.EWOULDBLOCK,
+ )
+ self.assertIn(rc, errors)
finally:
s.close()
def test_connect_with_context(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
# Same as test_connect, but with a separately created context
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
try:
self.assertEqual({}, s.getpeercert())
finally:
s.close()
# Same with a server hostname
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
- server_hostname="svn.python.org")
- s.connect(("svn.python.org", 443))
+ server_hostname=REMOTE_HOST)
+ s.connect((REMOTE_HOST, 443))
s.close()
# This should fail because we have no verification certs
ctx.verify_mode = ssl.CERT_REQUIRED
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, ("svn.python.org", 443))
+ s.connect, (REMOTE_HOST, 443))
s.close()
# This should succeed because we specify the root cert
- ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+ ctx.load_verify_locations(REMOTE_ROOT_CERT)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1479,12 +1538,12 @@
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
# contain both versions of each certificate (same content, different
# filename) for this test to be portable across OpenSSL releases.
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1495,7 +1554,7 @@
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=BYTES_CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1503,15 +1562,15 @@
s.close()
def test_connect_cadata(self):
- with open(CAFILE_CACERT) as f:
+ with open(REMOTE_ROOT_CERT) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=pem)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1520,7 +1579,7 @@
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=der)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1529,9 +1588,9 @@
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
- ss.connect(("svn.python.org", 443))
+ ss.connect((REMOTE_HOST, 443))
fd = ss.fileno()
f = ss.makefile()
f.close()
@@ -1545,9 +1604,9 @@
self.assertEqual(e.exception.errno, errno.EBADF)
def test_non_blocking_handshake(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
s = socket.socket(socket.AF_INET)
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
s.setblocking(False)
s = ssl.wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
@@ -1590,12 +1649,12 @@
if support.verbose:
sys.stdout.write("\nVerified certificate for %s:%s
is\n%s\n" % (host, port ,pem))
- _test_get_server_certificate('svn.python.org', 443,
SVN_PYTHON_ORG_ROOT_CERT)
+ _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
if support.IPV6_ENABLED:
_test_get_server_certificate('ipv6.google.com', 443)
def test_ciphers(self):
- remote = ("svn.python.org", 443)
+ remote = (REMOTE_HOST, 443)
with support.transient_internet(remote[0]):
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
@@ -1640,13 +1699,13 @@
def test_get_ca_certs_capath(self):
# capath certs are loaded on request
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
self.assertEqual(ctx.get_ca_certs(), [])
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect(("svn.python.org", 443))
+ s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
@@ -1657,12 +1716,12 @@
@needs_sni
def test_context_setget(self):
# Check that the context of a connected socket can be replaced.
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = socket.socket(socket.AF_INET)
with ctx1.wrap_socket(s) as ss:
- ss.connect(("svn.python.org", 443))
+ ss.connect((REMOTE_HOST, 443))
self.assertIs(ss.context, ctx1)
self.assertIs(ss._sslobj.context, ctx1)
ss.context = ctx2
@@ -1683,13 +1742,8 @@
try:
ret = func(*args)
except ssl.SSLError as e:
- # Note that we get a spurious -1/SSL_ERROR_SYSCALL for
- # non-blocking IO. The SSL_shutdown manpage hints at this.
- # It *should* be safe to just ignore SYS_ERROR_SYSCALL because
- # with a Memory BIO there's no syscalls (for IO at least).
if e.errno not in (ssl.SSL_ERROR_WANT_READ,
- ssl.SSL_ERROR_WANT_WRITE,
- ssl.SSL_ERROR_SYSCALL):
+ ssl.SSL_ERROR_WANT_WRITE):
raise
errno = e.errno
# Get any data from the outgoing BIO irrespective of any error, and
@@ -1712,36 +1766,42 @@
return ret
def test_handshake(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
sock = socket.socket(socket.AF_INET)
- sock.connect(("svn.python.org", 443))
+ sock.connect((REMOTE_HOST, 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+ ctx.load_verify_locations(REMOTE_ROOT_CERT)
ctx.check_hostname = True
- sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
+ sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST)
self.assertIs(sslobj._sslobj.owner, sslobj)
self.assertIsNone(sslobj.cipher())
- self.assertIsNone(sslobj.shared_ciphers())
+ self.assertIsNotNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher())
- self.assertIsNone(sslobj.shared_ciphers())
+ self.assertIsNotNone(sslobj.shared_ciphers())
self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique'))
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+ try:
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+ except ssl.SSLSyscallError:
+ # self-signed.pythontest.net probably shuts down the TCP
+ # connection without sending a secure shutdown message, and
+ # this is reported as SSL_ERROR_SYSCALL
+ pass
self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
sock.close()
def test_read_write_data(self):
- with support.transient_internet("svn.python.org"):
+ with support.transient_internet(REMOTE_HOST):
sock = socket.socket(socket.AF_INET)
- sock.connect(("svn.python.org", 443))
+ sock.connect((REMOTE_HOST, 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
@@ -2084,36 +2144,6 @@
self.active = False
self.server.close()
- def bad_cert_test(certfile):
- """
- Launch a server with CERT_REQUIRED, and check that trying to
- connect to it with the given client certificate fails.
- """
- server = ThreadedEchoServer(CERTFILE,
- certreqs=ssl.CERT_REQUIRED,
- cacerts=CERTFILE, chatty=False,
- connectionchatty=False)
- with server:
- try:
- with socket.socket() as sock:
- s = ssl.wrap_socket(sock,
- certfile=certfile,
- ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect((HOST, server.port))
- except ssl.SSLError as x:
- if support.verbose:
- sys.stdout.write("\nSSLError is %s\n" % x.args[1])
- except OSError as x:
- if support.verbose:
- sys.stdout.write("\nOSError is %s\n" % x.args[1])
- except OSError as x:
- if x.errno != errno.ENOENT:
- raise
- if support.verbose:
- sys.stdout.write("\OSError is %s\n" % str(x))
- else:
- raise AssertionError("Use of invalid cert should have failed!")
-
def server_params_test(client_context, server_context, indata=b"FOO\n",
chatty=True, connectionchatty=False, sni_name=None):
"""
@@ -2354,22 +2384,38 @@
"check_hostname requires
server_hostname"):
context.wrap_socket(s)
- def test_empty_cert(self):
- """Connecting with an empty cert file"""
- bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
- "nullcert.pem"))
- def test_malformed_cert(self):
- """Connecting with a badly formatted certificate (syntax error)"""
- bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
- "badcert.pem"))
- def test_nonexisting_cert(self):
- """Connecting with a non-existing cert file"""
- bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
- "wrongcert.pem"))
- def test_malformed_key(self):
- """Connecting with a badly formatted key (syntax error)"""
- bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
- "badkey.pem"))
+ def test_wrong_cert(self):
+ """Connecting when the server rejects the client's certificate
+
+ Launch a server with CERT_REQUIRED, and check that trying to
+ connect to it with a wrong client certificate fails.
+ """
+ certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "wrongcert.pem")
+ server = ThreadedEchoServer(CERTFILE,
+ certreqs=ssl.CERT_REQUIRED,
+ cacerts=CERTFILE, chatty=False,
+ connectionchatty=False)
+ with server, \
+ socket.socket() as sock, \
+ ssl.wrap_socket(sock,
+ certfile=certfile,
+ ssl_version=ssl.PROTOCOL_TLSv1) as s:
+ try:
+ # Expect either an SSL error about the server rejecting
+ # the connection, or a low-level connection reset (which
+ # sometimes happens on Windows)
+ s.connect((HOST, server.port))
+ except ssl.SSLError as e:
+ if support.verbose:
+ sys.stdout.write("\nSSLError is %r\n" % e)
+ except OSError as e:
+ if e.errno != errno.ECONNRESET:
+ raise
+ if support.verbose:
+ sys.stdout.write("\nsocket.error is %r\n" % e)
+ else:
+ self.fail("Use of invalid cert should have failed!")
def test_rude_shutdown(self):
"""A brutal shutdown of an SSL server should raise an OSError
@@ -2615,7 +2661,7 @@
s.close()
def test_socketserver(self):
- """Using a SocketServer to create and manage SSL connections."""
+ """Using socketserver to create and manage SSL connections."""
server = make_https_server(self, certfile=CERTFILE)
# try to connect
if support.verbose:
@@ -2642,8 +2688,6 @@
def test_asyncore_server(self):
"""Check the example asyncore integration."""
- indata = "TEST MESSAGE of mixed case\n"
-
if support.verbose:
sys.stdout.write("\n")
@@ -2775,6 +2819,13 @@
# consume data
s.read()
+ # read(-1, buffer) is supported, even though read(-1) is not
+ data = b"data"
+ s.send(data)
+ buffer = bytearray(len(data))
+ self.assertEqual(s.read(-1, buffer), len(data))
+ self.assertEqual(buffer, data)
+
# Make sure sendmsg et al are disallowed to avoid
# inadvertent disclosure of data and/or corruption
# of the encrypted data stream
@@ -2784,8 +2835,32 @@
s.recvmsg_into, bytearray(100))
s.write(b"over\n")
+
+ self.assertRaises(ValueError, s.recv, -1)
+ self.assertRaises(ValueError, s.read, -1)
+
s.close()
+ def test_recv_zero(self):
+ server = ThreadedEchoServer(CERTFILE)
+ server.__enter__()
+ self.addCleanup(server.__exit__, None, None)
+ s = socket.create_connection((HOST, server.port))
+ self.addCleanup(s.close)
+ s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
+ self.addCleanup(s.close)
+
+ # recv/read(0) should return no data
+ s.send(b"data")
+ self.assertEqual(s.recv(0), b"")
+ self.assertEqual(s.read(0), b"")
+ self.assertEqual(s.read(), b"data")
+
+ # Should not block if the other end sends no data
+ s.setblocking(False)
+ self.assertEqual(s.recv(0), b"")
+ self.assertEqual(s.recv_into(bytearray()), 0)
+
def test_nonblocking_send(self):
server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,
@@ -2944,7 +3019,7 @@
with context.wrap_socket(socket.socket()) as s:
self.assertIs(s.version(), None)
s.connect((HOST, server.port))
- self.assertEqual(s.version(), "TLSv1")
+ self.assertEqual(s.version(), 'TLSv1')
self.assertIs(s.version(), None)
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled
OpenSSL")
@@ -3086,24 +3161,36 @@
(['http/3.0', 'http/4.0'], None)
]
for client_protocols, expected in protocol_tests:
- server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
server_context.load_cert_chain(CERTFILE)
server_context.set_alpn_protocols(server_protocols)
- client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
client_context.load_cert_chain(CERTFILE)
client_context.set_alpn_protocols(client_protocols)
- stats = server_params_test(client_context, server_context,
- chatty=True, connectionchatty=True)
-
- msg = "failed trying %s (s) and %s (c).\n" \
- "was expecting %s, but got %%s from the %%s" \
- % (str(server_protocols), str(client_protocols),
- str(expected))
- client_result = stats['client_alpn_protocol']
- self.assertEqual(client_result, expected, msg %
(client_result, "client"))
- server_result = stats['server_alpn_protocols'][-1] \
- if len(stats['server_alpn_protocols']) else 'nothing'
- self.assertEqual(server_result, expected, msg %
(server_result, "server"))
+
+ try:
+ stats = server_params_test(client_context,
+ server_context,
+ chatty=True,
+ connectionchatty=True)
+ except ssl.SSLError as e:
+ stats = e
+
+ if expected is None and IS_OPENSSL_1_1:
+ # OpenSSL 1.1.0 raises handshake error
+ self.assertIsInstance(stats, ssl.SSLError)
+ else:
+ msg = "failed trying %s (s) and %s (c).\n" \
+ "was expecting %s, but got %%s from the %%s" \
+ % (str(server_protocols), str(client_protocols),
+ str(expected))
+ client_result = stats['client_alpn_protocol']
+ self.assertEqual(client_result, expected,
+ msg % (client_result, "client"))
+ server_result = stats['server_alpn_protocols'][-1] \
+ if len(stats['server_alpn_protocols']) else 'nothing'
+ self.assertEqual(server_result, expected,
+ msg % (server_result, "server"))
def test_selected_npn_protocol(self):
# selected_npn_protocol() is None unless NPN is used
@@ -3251,13 +3338,23 @@
client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
client_context.verify_mode = ssl.CERT_REQUIRED
client_context.load_verify_locations(SIGNING_CA)
- client_context.set_ciphers("RC4")
- server_context.set_ciphers("AES:RC4")
+ if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
+ client_context.set_ciphers("AES128:AES256")
+ server_context.set_ciphers("AES256")
+ alg1 = "AES256"
+ alg2 = "AES-256"
+ else:
+ client_context.set_ciphers("AES:3DES")
+ server_context.set_ciphers("3DES")
+ alg1 = "3DES"
+ alg2 = "DES-CBC3"
+
stats = server_params_test(client_context, server_context)
ciphers = stats['server_shared_ciphers'][0]
self.assertGreater(len(ciphers), 0)
for name, tls_version, bits in ciphers:
- self.assertIn("RC4", name.split("-"))
+ if not alg1 in name.split("-") and alg2 not in name:
+ self.fail(name)
def test_read_write_after_close_raises_valuerror(self):
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
@@ -3325,7 +3422,7 @@
pass
for filename in [
- CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
+ CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
BADCERT, BADKEY, EMPTYCERT]:
diff --git a/lib_pypy/openssl/_cffi_src/build_openssl.py
b/lib_pypy/openssl/_cffi_src/build_openssl.py
--- a/lib_pypy/openssl/_cffi_src/build_openssl.py
+++ b/lib_pypy/openssl/_cffi_src/build_openssl.py
@@ -73,6 +73,7 @@
"rand",
"rsa",
"ssl",
+ "tls1",
"x509",
"x509name",
"x509v3",
diff --git a/lib_pypy/openssl/_cffi_src/openssl/bio.py
b/lib_pypy/openssl/_cffi_src/openssl/bio.py
--- a/lib_pypy/openssl/_cffi_src/openssl/bio.py
+++ b/lib_pypy/openssl/_cffi_src/openssl/bio.py
@@ -130,6 +130,8 @@
long BIO_set_buffer_size(BIO *, long);
long BIO_set_buffer_read_data(BIO *, void *, long);
long BIO_set_nbio(BIO *, long);
+void BIO_set_retry_read(BIO *);
+void BIO_clear_retry_flags(BIO *);
"""
CUSTOMIZATIONS = """
diff --git a/lib_pypy/openssl/_cffi_src/openssl/ssl.py
b/lib_pypy/openssl/_cffi_src/openssl/ssl.py
--- a/lib_pypy/openssl/_cffi_src/openssl/ssl.py
+++ b/lib_pypy/openssl/_cffi_src/openssl/ssl.py
@@ -133,15 +133,16 @@
typedef ... SSL_METHOD;
typedef ... SSL_CTX;
+
typedef ... SSL_SESSION;
+typedef ... SSL;
-typedef ... SSL;
+typedef ... Cryptography_STACK_OF_SSL_CIPHER;
static const long TLSEXT_NAMETYPE_host_name;
+typedef ... COMP_METHOD;
typedef ... SSL_CIPHER;
-typedef ... Cryptography_STACK_OF_SSL_CIPHER;
-typedef ... COMP_METHOD;
"""
FUNCTIONS = """
@@ -193,6 +194,7 @@
int SSL_renegotiate_pending(SSL *);
const char *SSL_get_cipher_list(const SSL *, int);
Cryptography_STACK_OF_SSL_CIPHER *SSL_get_ciphers(const SSL *);
+Cryptography_STACK_OF_SSL_CIPHER * Cryptography_get_ssl_session_ciphers(const
SSL_SESSION *);
/* context */
void SSL_CTX_free(SSL_CTX *);
@@ -242,6 +244,8 @@
Cryptography_STACK_OF_X509_NAME *SSL_load_client_CA_file(const char *);
const char *SSL_get_servername(const SSL *, const int);
+//long SSL_CTX_callback_ctrl(SSL_CTX *, int, void (*)(void));
+//long SSL_CTX_ctrl(SSL_CTX *, int, long, void *);
"""
MACROS = """
@@ -703,4 +707,8 @@
static const long TLS_ST_BEFORE = 0;
static const long TLS_ST_OK = 0;
#endif
+
+Cryptography_STACK_OF_SSL_CIPHER * Cryptography_get_ssl_session_ciphers(const
SSL_SESSION *s) {
+ return s->ciphers;
+}
"""
diff --git a/lib_pypy/openssl/_cffi_src/openssl/x509.py
b/lib_pypy/openssl/_cffi_src/openssl/x509.py
--- a/lib_pypy/openssl/_cffi_src/openssl/x509.py
+++ b/lib_pypy/openssl/_cffi_src/openssl/x509.py
@@ -18,6 +18,7 @@
typedef STACK_OF(X509_CRL) Cryptography_STACK_OF_X509_CRL;
typedef STACK_OF(X509_REVOKED) Cryptography_STACK_OF_X509_REVOKED;
typedef STACK_OF(ACCESS_DESCRIPTION) Cryptography_AUTHORITY_INFO_ACCESS;
+typedef STACK_OF(X509_OBJECT) Cryptography_STACK_OF_X509_OBJECT;
"""
TYPES = """
@@ -340,6 +341,9 @@
const X509_ALGOR **);
void AUTHORITY_INFO_ACCESS_free(Cryptography_AUTHORITY_INFO_ACCESS*);
+
+#define X509_R_CERT_ALREADY_IN_HASH_TABLE ...
+#define ASN1_R_HEADER_TOO_LONG ...
"""
CUSTOMIZATIONS = """
diff --git a/lib_pypy/openssl/_cffi_src/openssl/x509_vfy.py
b/lib_pypy/openssl/_cffi_src/openssl/x509_vfy.py
--- a/lib_pypy/openssl/_cffi_src/openssl/x509_vfy.py
+++ b/lib_pypy/openssl/_cffi_src/openssl/x509_vfy.py
@@ -15,6 +15,7 @@
* Note that the result is an opaque type.
*/
typedef STACK_OF(ASN1_OBJECT) Cryptography_STACK_OF_ASN1_OBJECT;
+typedef STACK_OF(X509_OBJECT) Cryptography_STACK_OF_X509_OBJECT;
"""
TYPES = """
@@ -22,10 +23,17 @@
static const long Cryptography_HAS_102_VERIFICATION_PARAMS;
static const long Cryptography_HAS_X509_V_FLAG_TRUSTED_FIRST;
static const long Cryptography_HAS_X509_V_FLAG_PARTIAL_CHAIN;
+static const long Cryptography_X509_LU_X509;
+static const long Cryptography_X509_LU_CLR;
typedef ... Cryptography_STACK_OF_ASN1_OBJECT;
+typedef ... Cryptography_STACK_OF_X509_OBJECT;
-typedef ... X509_STORE;
+typedef ... X509_OBJECT;
+typedef struct {
+ Cryptography_STACK_OF_X509_OBJECT * objs;
+ ...;
+} X509_STORE;
typedef ... X509_VERIFY_PARAM;
typedef ... X509_STORE_CTX;
@@ -119,6 +127,7 @@
static const long X509_V_FLAG_SUITEB_192_LOS;
static const long X509_V_FLAG_SUITEB_128_LOS;
static const long X509_V_FLAG_PARTIAL_CHAIN;
+
"""
FUNCTIONS = """
@@ -172,6 +181,9 @@
Cryptography_STACK_OF_ASN1_OBJECT *);
void X509_VERIFY_PARAM_set_depth(X509_VERIFY_PARAM *, int);
int X509_VERIFY_PARAM_get_depth(const X509_VERIFY_PARAM *);
+
+int Cryptography_X509_OBJECT_get_type(const X509_OBJECT *);
+X509 * Cryptography_X509_OBJECT_data_x509(X509_OBJECT*);
"""
MACROS = """
@@ -192,6 +204,11 @@
int X509_VERIFY_PARAM_set1_ip(X509_VERIFY_PARAM *, const unsigned char *,
size_t);
int X509_VERIFY_PARAM_set1_ip_asc(X509_VERIFY_PARAM *, const char *);
+
+/* STACK_OF(X509_OBJECT) */
+int sk_X509_OBJECT_num(Cryptography_STACK_OF_X509_OBJECT *);
+X509_OBJECT *sk_X509_OBJECT_value(Cryptography_STACK_OF_X509_OBJECT *, int);
+
"""
CUSTOMIZATIONS = """
@@ -254,4 +271,15 @@
X509_VERIFY_PARAM *_X509_STORE_get0_param(X509_STORE *store) {
return store->param;
}
+
+int Cryptography_X509_OBJECT_get_type(const X509_OBJECT * x) {
+ return x->type;
+}
+
+X509 * Cryptography_X509_OBJECT_data_x509(X509_OBJECT * x) {
+ return x->data.x509;
+}
+
+static const long Cryptography_X509_LU_X509 = X509_LU_X509;
+static const long Cryptography_X509_LU_CLR = X509_LU_CRL;
"""
diff --git a/lib_pypy/openssl/_cffi_src/openssl/x509v3.py
b/lib_pypy/openssl/_cffi_src/openssl/x509v3.py
--- a/lib_pypy/openssl/_cffi_src/openssl/x509v3.py
+++ b/lib_pypy/openssl/_cffi_src/openssl/x509v3.py
@@ -185,7 +185,7 @@
void *X509V3_EXT_d2i(X509_EXTENSION *);
/* X509_get_ext_d2i is already defined, there might be a better solution
to expose it to the lib object? */
-void * _X509_get_ext_d2i(const X509 *, int, int *, int *);
+void * _X509_get_ext_d2i(X509 *, int, int *, int *);
/* X509 is private, there is no way to access the field crldp other than
adding it to the typedef or expose a function like this: */
Cryptography_STACK_OF_DIST_POINT * _X509_get_crldp(const X509 *);
@@ -305,7 +305,7 @@
"""
CUSTOMIZATIONS = """
-void * _X509_get_ext_d2i(const X509 * x, int nid, int * a, int * b) {
+void * _X509_get_ext_d2i(X509 * x, int nid, int * a, int * b) {
return X509_get_ext_d2i(x, nid, a, b);
}
#if OPENSSL_VERSION_NUMBER >= 0x10001000L
diff --git a/lib_pypy/openssl/_stdssl/__init__.py
b/lib_pypy/openssl/_stdssl/__init__.py
--- a/lib_pypy/openssl/_stdssl/__init__.py
+++ b/lib_pypy/openssl/_stdssl/__init__.py
@@ -1,16 +1,22 @@
import sys
-import errno
import time
import _thread
+import socket
import weakref
from _openssl import ffi
from _openssl import lib
-from openssl._stdssl.certificate import _test_decode_cert
-from openssl._stdssl.utility import _str_with_len
-from openssl._stdssl.error import (ssl_error, ssl_lib_error,
+from openssl._stdssl.certificate import (_test_decode_cert,
+ _decode_certificate, _certificate_to_der)
+from openssl._stdssl.utility import _str_with_len, _bytes_with_len,
_str_to_ffi_buffer
+from openssl._stdssl.error import (ssl_error, ssl_lib_error, ssl_socket_error,
SSLError, SSLZeroReturnError, SSLWantReadError,
SSLWantWriteError, SSLSyscallError,
SSLEOFError)
+from openssl._stdssl.error import (SSL_ERROR_NONE,
+ SSL_ERROR_SSL, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE,
+ SSL_ERROR_WANT_X509_LOOKUP, SSL_ERROR_SYSCALL,
+ SSL_ERROR_ZERO_RETURN, SSL_ERROR_WANT_CONNECT,
+ SSL_ERROR_EOF, SSL_ERROR_NO_SOCKET, SSL_ERROR_INVALID_ERROR_CODE)
OPENSSL_VERSION = ffi.string(lib.OPENSSL_VERSION_TEXT).decode('utf-8')
@@ -121,22 +127,74 @@
else:
Cryptography_pem_password_cb =
ffi.callback("int(char*,int,int,void*)")(_Cryptography_pem_password_cb)
-def _ssl_select(sock, write, timeout):
- pass
- raise NotImplementedError
+from select import poll, POLLIN, POLLOUT, select
+
+HAVE_POLL = True
+
+def _ssl_select(sock, writing, timeout):
+ if HAVE_POLL:
+ p = poll()
+
+ # Nothing to do unless we're in timeout mode (not non-blocking)
+ if sock is None or timeout == 0:
+ return SOCKET_IS_NONBLOCKING
+ elif timeout < 0:
+ t = sock.gettimeout() or 0
+ if t > 0:
+ return SOCKET_HAS_TIMED_OUT
+ else:
+ return SOCKET_IS_BLOCKING
+
+ # Guard against closed socket
+ if sock.fileno() < 0:
+ return SOCKET_HAS_BEEN_CLOSED
+
+ # Prefer poll, if available, since you can poll() any fd
+ # which can't be done with select().
+ if HAVE_POLL:
+ p.register(sock.fileno(), POLLOUT | POLLIN)
+
+ #PySSL_BEGIN_ALLOW_THREADS
+ rc = len(p.poll(timeout * 1000.0))
+ #PySSL_END_ALLOW_THREADS
+ else:
+ fd = sock.fileno()
+ #if (!_PyIsSelectable_fd(s->sock_fd))
+ # return SOCKET_TOO_LARGE_FOR_SELECT;
+ if writing:
+ rr, wr, xr = select([],[fd],[], timeout)
+ else:
+ rr, wr, xr = select([fd],[],[], timeout)
+ rc = len(rr) + len(wr)
+ if rc != 0:
+ return SOCKET_OPERATION_OK
+ return SOCKET_HAS_TIMED_OUT
+
+SOCKET_IS_NONBLOCKING = 0
+SOCKET_IS_BLOCKING = 1
+SOCKET_HAS_TIMED_OUT = 2
+SOCKET_HAS_BEEN_CLOSED = 3
+SOCKET_TOO_LARGE_FOR_SELECT = 4
+SOCKET_OPERATION_OK = 5
+
+def _buffer_new(length):
+ return ffi.new("char[%d]"%length)
class _SSLSocket(object):
@staticmethod
- def _new__ssl_socket(sslctx, sock, socket_type, hostname, inbio, outbio):
- self = _SSLSocket(sslctx, sock, socket_type, hostname)
+ def _new__ssl_socket(sslctx, sock, socket_type, server_hostname, inbio,
outbio):
+ self = _SSLSocket(sslctx)
ctx = sslctx.ctx
+ if server_hostname:
+ self.server_hostname = server_hostname.decode('idna', 'strict')
+
lib.ERR_get_state()
lib.ERR_clear_error()
self.ssl = ssl = lib.SSL_new(ctx)
- # TODO _server_name_callback self.SSL_set_app_data(self.ssl, self);
+ lib.SSL_set_app_data(ssl, b"")
if sock:
lib.SSL_set_fd(ssl, sock.fileno())
else:
@@ -157,11 +215,11 @@
name = _str_to_ffi_buffer(self.server_hostname)
lib.SSL_set_tlsext_host_name(ssl, name)
- timeout = sock.gettimeout() or -1
# If the socket is in non-blocking mode or timeout mode, set the BIO
# to non-blocking mode (blocking is the default)
#
+ timeout = sock.gettimeout() or 0
if sock and timeout >= 0:
lib.BIO_set_nbio(lib.SSL_get_rbio(ssl), 1)
lib.BIO_set_nbio(lib.SSL_get_wbio(ssl), 1)
@@ -171,38 +229,43 @@
lib.SSL_set_connect_state(ssl)
else:
lib.SSL_set_accept_state(ssl)
+ self.socket_type = socket_type
#PySSL_END_ALLOW_THREADS
if sock:
- self.Socket = weakref.ref(sock)
+ self.socket = weakref.ref(sock)
return self
-
- def __init__(self, sslctx, sock, sockettype, hostname):
- self.sock = sock
+ def __init__(self, sslctx):
+ self.ctx = sslctx
+ self.peer_cert = ffi.NULL
self.ssl = ffi.NULL
- self.sockettype = sockettype
- self.ctx = sslctx
self.shutdown_seen_zero = 0
self.handshake_done = 0
self.owner = None
- if hostname:
- self.server_hostname = hostname.decode('idna', 'strict')
- else:
- self.server_hostname = None
+ self.server_hostname = None
+ self.socket = None
+
+ @property
+ def context(self):
+ return self.ctx
+
+ @context.setter
+ def context(self, value):
+ self.ctx = value
def do_handshake(self):
-
- sock = self.sock
+ sock = self.get_socket_or_None()
if sock is None:
- _setSSLError("Underlying socket connection gone",
lib.SSL_ERROR_NO_SOCKET)
-
+ raise ssl_error("Underlying socket connection gone",
SSL_ERROR_NO_SOCKET)
ssl = self.ssl
-
- nonblocking = timeout >= 0
- lib.BIO_set_nbio(lib.SSL_getrbio(ssl), nonblocking)
- lib.BIO_set_nbio(lib.SSL_getwbio(ssl), nonblocking)
+ timeout = 0
+ if sock:
+ timeout = sock.gettimeout() or 0
+ nonblocking = timeout >= 0
+ lib.BIO_set_nbio(lib.SSL_get_rbio(ssl), nonblocking)
+ lib.BIO_set_nbio(lib.SSL_get_wbio(ssl), nonblocking)
has_timeout = timeout > 0
has_timeout = (timeout > 0);
@@ -210,7 +273,6 @@
if has_timeout:
# REVIEW, cpython uses a monotonic clock here
deadline = time.time() + timeout;
-
# Actually negotiate SSL connection
# XXX If SSL_do_handshake() returns 0, it's also a failure.
while True:
@@ -226,9 +288,9 @@
# REIVIEW monotonic clock?
timeout = deadline - time.time()
- if err == lib.SSL_ERROR_WANT_READ:
+ if err == SSL_ERROR_WANT_READ:
sockstate = _ssl_select(sock, 0, timeout)
- elif err == lib.SSL_ERROR_WANT_WRITE:
+ elif err == SSL_ERROR_WANT_WRITE:
sockstate = _ssl_select(sock, 1, timeout)
else:
sockstate = SOCKET_OPERATION_OK
@@ -243,10 +305,10 @@
break
if not (err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE):
break
- #if (ret < 1:
- # return PySSL_SetError(self, ret, __FILE__, __LINE__);
+ if ret < 1:
+ raise ssl_lib_error()
- if self.peer_cert:
+ if self.peer_cert != ffi.NULL:
lib.X509_free(self.peer_cert)
#PySSL_BEGIN_ALLOW_THREADS
self.peer_cert = lib.SSL_get_peer_certificate(ssl)
@@ -254,10 +316,329 @@
self.handshake_done = 1
return None
+ def peer_certificate(self, binary_mode):
+ if not self.handshake_done:
+ raise ValueError("handshake not done yet")
+ if self.peer_cert == ffi.NULL:
+ return None
+ if binary_mode:
+ # return cert in DER-encoded format
+ return _certificate_to_der(self.peer_cert)
+ else:
+ verification =
lib.SSL_CTX_get_verify_mode(lib.SSL_get_SSL_CTX(self.ssl))
+ if (verification & lib.SSL_VERIFY_PEER) == 0:
+ return {}
+ else:
+ return _decode_certificate(self.peer_cert)
+
+ def write(self, bytestring):
+ deadline = 0
+ b = _str_to_ffi_buffer(bytestring)
+ sock = self.get_socket_or_None()
+ ssl = self.ssl
+ if sock:
+ timeout = sock.gettimeout() or 0
+ nonblocking = timeout >= 0
+ lib.BIO_set_nbio(lib.SSL_get_rbio(ssl), nonblocking)
+ lib.BIO_set_nbio(lib.SSL_get_wbio(ssl), nonblocking)
+
+ timeout = sock.gettimeout() or 0
+ has_timeout = timeout > 0
+ if has_timeout:
+ # TODO monotonic clock?
+ deadline = time.time() + timeout
+
+ sockstate = _ssl_select(sock, 1, timeout)
+ if sockstate == SOCKET_HAS_TIMED_OUT:
+ raise socket.TimeoutError("The write operation timed out")
+ elif sockstate == SOCKET_HAS_BEEN_CLOSED:
+ raise ssl_error("Underlying socket has been closed.")
+ elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT:
+ raise ssl_error("Underlying socket too large for select().")
+
+ while True:
+ #PySSL_START_ALLOW_THREADS
+ length = lib.SSL_write(self.ssl, b, len(b))
+ err = lib.SSL_get_error(self.ssl, length)
+ #PySSL_END_ALLOW_THREADS
+
+ # TODO if (PyErr_CheckSignals())
+ # TODO goto error;
+
+ if has_timeout:
+ # TODO monotonic clock
+ timeout = deadline - time.time()
+
+ if err == SSL_ERROR_WANT_READ:
+ sockstate = _ssl_select(sock, 0, timeout)
+ elif err == SSL_ERROR_WANT_WRITE:
+ sockstate = _ssl_select(sock, 1, timeout)
+ else:
+ sockstate = SOCKET_OPERATION_OK
+
+ if sockstate == SOCKET_HAS_TIMED_OUT:
+ raise socket.TimeoutError("The write operation timed out")
+ elif sockstate == SOCKET_HAS_BEEN_CLOSED:
+ raise ssl_error("Underlying socket has been closed.")
+ elif sockstate == SOCKET_IS_NONBLOCKING:
+ break
+ if not (err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE):
+ break
+
+ if length > 0:
+ return length
+ else:
+ raise ssl_lib_error()
+ # return PySSL_SetError(self, len, __FILE__, __LINE__);
+
+ def read(self, length, buffer_into=None):
+ sock = self.get_socket_or_None()
+ ssl = self.ssl
+
+ if sock is None:
+ raise ssl_error("Underlying socket connection gone",
SSL_ERROR_NO_SOCKET)
+
+ if not buffer_into:
+ dest = _buffer_new(length)
+ mem = dest
+ else:
+ import pdb; pdb.set_trace()
+ mem = ffi.from_buffer(buffer_into)
+ if length <= 0 or length > len(buffer_into):
+ if len(buffer_into) != length:
+ raise OverflowError("maximum length can't fit in a C
'int'")
+
+ if sock:
+ timeout = sock.gettimeout() or 0
+ nonblocking = timeout >= 0
+ lib.BIO_set_nbio(lib.SSL_get_rbio(ssl), nonblocking)
+ lib.BIO_set_nbio(lib.SSL_get_wbio(ssl), nonblocking)
+
+ deadline = 0
+ timeout = sock.gettimeout() or 0
+ has_timeout = timeout > 0
+ if has_timeout:
+ # TODO monotonic clock?
+ deadline = time.time() + timeout
+
+ shutdown = False
+ while True:
+ #PySSL_BEGIN_ALLOW_THREADS
+ count = lib.SSL_read(self.ssl, mem, length);
+ err = lib.SSL_get_error(self.ssl, count);
+ #PySSL_END_ALLOW_THREADS
+
+ # TODO
+ #if (PyErr_CheckSignals())
+ # goto error;
+
+ if has_timeout:
+ timeout = deadline - time.time() # TODO ?
_PyTime_GetMonotonicClock();
+
+ if err == SSL_ERROR_WANT_READ:
+ sockstate = _ssl_select(sock, 0, timeout)
+ elif err == SSL_ERROR_WANT_WRITE:
+ sockstate = _ssl_select(sock, 1, timeout)
+ elif err == SSL_ERROR_ZERO_RETURN and \
+ lib.SSL_get_shutdown(self.ssl) == lib.SSL_RECEIVED_SHUTDOWN:
+ shutdown = True
+ break;
+ else:
+ sockstate = SOCKET_OPERATION_OK
+
+ if sockstate == SOCKET_HAS_TIMED_OUT:
+ raise socket.TimeoutError("The read operation timed out")
+ elif sockstate == SOCKET_IS_NONBLOCKING:
+ break
+ if not (err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE):
+ break
+
+ if count <= 0:
+ raise ssl_socket_error(self, err)
+
+ if not buffer_into:
+ return _bytes_with_len(dest, count)
+
+ return count
+
+ def selected_alpn_protocol(self):
+ out = ffi.new("const unsigned char **")
+ outlen = ffi.new("unsigned int*")
+
+ lib.SSL_get0_alpn_selected(self.ssl, out, outlen);
+ if out == ffi.NULL:
+ return None
+ return _str_with_len(ffi.cast("char*",out[0]), outlen[0]);
+
+ def shared_ciphers(self):
+ sess = lib.SSL_get_session(self.ssl)
+
+ ciphers = lib.Cryptography_get_ssl_session_ciphers(sess)
+ if sess is None or ciphers == ffi.NULL:
+ return None
+ res = []
+ count = lib.sk_SSL_CIPHER_num(ciphers)
+ for i in range(count):
+ tup = cipher_to_tuple(lib.sk_SSL_CIPHER_value(ciphers, i))
+ if not tup:
+ return None
+ res.append(tup)
+ return res
+
+ def cipher(self):
+ if self.ssl == ffi.NULL:
+ return None
+ current = lib.SSL_get_current_cipher(self.ssl)
+ if current == ffi.NULL:
+ return None
+ return cipher_to_tuple(current)
+
+ def compression(self):
+ if not lib.Cryptography_HAS_COMPRESSION or self.ssl == ffi.NULL:
+ return None
+
+ comp_method = lib.SSL_get_current_compression(self.ssl);
+ if comp_method == ffi.NULL: # or comp_method.type == lib.NID_undef:
+ return None
+ short_name = lib.SSL_COMP_get_name(comp_method)
+ if short_name == ffi.NULL:
+ return None
+ return _fs_decode(_str_from_buf(short_name))
+
+ def version(self):
+ if self.ssl == ffi.NULL:
+ return None
+ version = _str_from_buf(lib.SSL_get_version(self.ssl))
+ if version == "unknown":
+ return None
+ return version
+
+ def get_socket_or_None(self):
+ if self.socket is None:
+ return None
+ return self.socket()
+
+ def shutdown(self):
+ sock = self.get_socket_or_None()
+ nonblocking = False
+ ssl = self.ssl
+
+ if sock is not None:
+ # Guard against closed socket
+ if sock.fileno() < 0:
+ raise ssl_error("Underlying socket connection gone",
SSL_ERROR_NO_SOCKET)
+
+ timeout = sock.gettimeout() or 0
+ nonblocking = timeout >= 0
+ if sock and timeout >= 0:
+ lib.BIO_set_nbio(lib.SSL_get_rbio(ssl), nonblocking)
+ lib.BIO_set_nbio(lib.SSL_get_wbio(ssl), nonblocking)
+ else:
+ timeout = 0
+
+ has_timeout = (timeout > 0);
+ if has_timeout:
+ # TODO monotonic clock
+ deadline = time.time() + timeout;
+
+ zeros = 0
+
+ while True:
+ # TODO PySSL_BEGIN_ALLOW_THREADS
+ # Disable read-ahead so that unwrap can work correctly.
+ # Otherwise OpenSSL might read in too much data,
+ # eating clear text data that happens to be
+ # transmitted after the SSL shutdown.
+ # Should be safe to call repeatedly every time this
+ # function is used and the shutdown_seen_zero != 0
+ # condition is met.
+ #
+ if self.shutdown_seen_zero:
+ lib.SSL_set_read_ahead(self.ssl, 0)
+ err = lib.SSL_shutdown(self.ssl)
+ # TODO PySSL_END_ALLOW_THREADS
+
+ # If err == 1, a secure shutdown with SSL_shutdown() is complete
+ if err > 0:
+ break
+ if err == 0:
+ # Don't loop endlessly; instead preserve legacy
+ # behaviour of trying SSL_shutdown() only twice.
+ # This looks necessary for OpenSSL < 0.9.8m
+ zeros += 1
+ if zeros > 1:
+ break
+ # Shutdown was sent, now try receiving
+ self.shutdown_seen_zero = 1
+ continue
+
+ if has_timeout:
+ # TODO monotonic clock
+ timeout = deadline - time.time() #_PyTime_GetMonotonicClock();
+
+ # Possibly retry shutdown until timeout or failure
+ ssl_err = lib.SSL_get_error(self.ssl, err)
+ if ssl_err == SSL_ERROR_WANT_READ:
+ sockstate = _ssl_select(sock, 0, timeout)
+ elif ssl_err == SSL_ERROR_WANT_WRITE:
+ sockstate = _ssl_select(sock, 1, timeout)
+ else:
+ break
+
+ if sockstate == SOCKET_HAS_TIMED_OUT:
+ if ssl_err == SSL_ERROR_WANT_READ:
+ raise socket.TimeoutError("The read operation timed out")
+ else:
+ raise socket.TimeoutError("The write operation timed out")
+ elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT:
+ raise ssl_error("Underlying socket too large for select().")
+ elif sockstate != SOCKET_OPERATION_OK:
+ # Retain the SSL error code
+ break;
+
+ if err < 0:
+ raise ssl_socket_error(self, err)
+ if sock:
+ return sock
+ else:
+ return None
+
+
+def _fs_decode(name):
+ # TODO return PyUnicode_DecodeFSDefault(short_name);
+ return name.decode('utf-8')
+
+
+def cipher_to_tuple(cipher):
+ ccipher_name = lib.SSL_CIPHER_get_name(cipher)
+ if ccipher_name == ffi.NULL:
+ cipher_name = None
+ else:
+ cipher_name = _str_from_buf(ccipher_name)
+
+ ccipher_protocol = lib.SSL_CIPHER_get_version(cipher)
+ if ccipher_protocol == ffi.NULL:
+ cipher_protocol = None
+ else:
+ cipher_protocol = _str_from_buf(ccipher_protocol)
+
+ bits = lib.SSL_CIPHER_get_bits(cipher, ffi.NULL)
+ return (cipher_name, cipher_protocol, bits)
+
+
+
+SSL_CTX_STATS_NAMES = """
+ number connect connect_good connect_renegotiate accept accept_good
+ accept_renegotiate hits misses timeouts cache_full""".split()
+SSL_CTX_STATS = []
+for name in SSL_CTX_STATS_NAMES:
+ attr = 'SSL_CTX_sess_'+name
+ assert hasattr(lib, attr)
+ SSL_CTX_STATS.append((name, getattr(lib, attr)))
class _SSLContext(object):
- __slots__ = ('ctx', 'check_hostname')
+ __slots__ = ('ctx', '_check_hostname', 'servername_callback')
def __new__(cls, protocol):
self = object.__new__(cls)
@@ -281,7 +662,7 @@
if self.ctx == ffi.NULL:
raise ssl_error("failed to allocate SSL context")
- self.check_hostname = False
+ self._check_hostname = False
# TODO self.register_finalizer(space)
# Defaults
@@ -304,8 +685,7 @@
else:
key = lib.EC_KEY_new_by_curve_name(lib.NID_X9_62_prime256v1)
if not key:
- # TODO copy from ropenssl?
- raise _ssl_seterror(None, 0)
+ raise ssl_lib_error()
try:
lib.SSL_CTX_set_tmp_ecdh(self.ctx, key)
finally:
@@ -384,6 +764,18 @@
if not lib.X509_VERIFY_PARAM_set_flags(param, set):
raise ssl_error(None, 0)
+ @property
+ def check_hostname(self):
+ return self._check_hostname
+
+ @check_hostname.setter
+ def check_hostname(self, value):
+ check_hostname = bool(value)
+ if check_hostname and lib.SSL_CTX_get_verify_mode(self.ctx) ==
lib.SSL_VERIFY_NONE:
+ raise ValueError("check_hostname needs a SSL context with either "
+ "CERT_OPTIONAL or CERT_REQUIRED")
+ self._check_hostname = check_hostname
+
def set_ciphers(self, cipherlist):
cipherlistbuf = _str_to_ffi_buffer(cipherlist)
ret = lib.SSL_CTX_set_cipher_list(self.ctx, cipherlistbuf)
@@ -416,6 +808,7 @@
lib.SSL_CTX_set_default_passwd_cb_userdata(self.ctx,
ffi.new_handle(pw_info))
try:
+ ffi.errno = 0
certfilebuf = _str_to_ffi_buffer(certfile)
ret = lib.SSL_CTX_use_certificate_chain_file(self.ctx, certfilebuf)
if ret != 1:
@@ -429,8 +822,9 @@
else:
raise ssl_lib_error()
- keyfilebuf = _str_to_ffi_buffer(keyfile)
- ret = lib.SSL_CTX_use_PrivateKey_file(self.ctx, keyfilebuf,
+ ffi.errno = 0
+ buf = _str_to_ffi_buffer(keyfile)
+ ret = lib.SSL_CTX_use_PrivateKey_file(self.ctx, buf,
lib.SSL_FILETYPE_PEM)
if ret != 1:
if pw_info.operationerror:
@@ -459,6 +853,116 @@
return _SSLSocket._new__ssl_socket(self, sock, server_side,
server_hostname, None, None)
+ def load_verify_locations(self, cafile=None, capath=None, cadata=None):
+ ffi.errno = 0
+ if cadata is None:
+ ca_file_type = -1
+ else:
+ if not isinstance(cadata, str):
+ ca_file_type = lib.SSL_FILETYPE_ASN1
+ else:
+ ca_file_type = lib.SSL_FILETYPE_PEM
+ try:
+ cadata = cadata.encode('ascii')
+ except UnicodeEncodeError:
+ raise TypeError("cadata should be a ASCII string or a
bytes-like object")
+ if cafile is None and capath is None and cadata is None:
+ raise TypeError("cafile and capath cannot be both omitted")
+ # load from cadata
+ if cadata is not None:
+ buf = _str_to_ffi_buffer(cadata)
+ self._add_ca_certs(buf, len(buf), ca_file_type)
+
+ # load cafile or capath
+ if cafile or capath:
+ if cafile is None:
+ cafilebuf = ffi.NULL
+ else:
+ cafilebuf = _str_to_ffi_buffer(cafile)
+ if capath is None:
+ capathbuf = ffi.NULL
+ else:
+ capathbuf = _str_to_ffi_buffer(capath)
+ ret = lib.SSL_CTX_load_verify_locations(self.ctx, cafilebuf,
capathbuf)
+ if ret != 1:
+ _errno = ffi.errno
+ if _errno:
+ lib.ERR_clear_error()
+ raise OSError(_errno, '')
+ else:
+ raise ssl_lib_error()
+
+ def _add_ca_certs(self, data, size, ca_file_type):
+ biobuf = lib.BIO_new_mem_buf(data, size)
+ if biobuf == ffi.NULL:
+ raise ssl_error("Can't allocate buffer")
+ try:
+ store = lib.SSL_CTX_get_cert_store(self.ctx)
+ loaded = 0
+ while True:
+ if ca_file_type == lib.SSL_FILETYPE_ASN1:
+ cert = lib.d2i_X509_bio(biobuf, ffi.NULL)
+ else:
+ cert = lib.PEM_read_bio_X509(biobuf, ffi.NULL, ffi.NULL,
ffi.NULL)
+ if not cert:
+ break
+ try:
+ r = lib.X509_STORE_add_cert(store, cert)
+ finally:
+ lib.X509_free(cert)
+ if not r:
+ err = lib.ERR_peek_last_error()
+ if (lib.ERR_GET_LIB(err) == lib.ERR_LIB_X509 and
+ lib.ERR_GET_REASON(err) ==
+ lib.X509_R_CERT_ALREADY_IN_HASH_TABLE):
+ # cert already in hash table, not an error
+ lib.ERR_clear_error()
+ else:
+ break
+ loaded += 1
+
+ err = lib.ERR_peek_last_error()
+ if (ca_file_type == lib.SSL_FILETYPE_ASN1 and
+ loaded > 0 and
+ lib.ERR_GET_LIB(err) == lib.ERR_LIB_ASN1 and
+ lib.ERR_GET_REASON(err) == lib.ASN1_R_HEADER_TOO_LONG):
+ # EOF ASN1 file, not an error
+ lib.ERR_clear_error()
+ elif (ca_file_type == lib.SSL_FILETYPE_PEM and
+ loaded > 0 and
+ lib.ERR_GET_LIB(err) == lib.ERR_LIB_PEM and
+ lib.ERR_GET_REASON(err) == lib.PEM_R_NO_START_LINE):
+ # EOF PEM file, not an error
+ lib.ERR_clear_error()
+ else:
+ raise ssl_lib_error()
+ finally:
+ lib.BIO_free(biobuf)
+
+ def cert_store_stats(self):
+ store = lib.SSL_CTX_get_cert_store(self.ctx)
+ x509 = 0
+ x509_ca = 0
+ crl = 0
+ objs = store.objs
+ count = lib.sk_X509_OBJECT_num(objs)
+ for i in range(count):
+ obj = lib.sk_X509_OBJECT_value(objs, i)
+ _type = lib.Cryptography_X509_OBJECT_get_type(obj)
+ if _type == lib.Cryptography_X509_LU_X509:
+ x509 += 1
+ cert = lib.Cryptography_X509_OBJECT_data_x509(obj)
+ if lib.X509_check_ca(cert):
+ x509_ca += 1
+ elif _type == lib.Cryptography_X509_LU_CRL:
+ crl += 1
+ else:
+ # Ignore X509_LU_FAIL, X509_LU_RETRY, X509_LU_PKEY.
+ # As far as I can tell they are internal states and never
+ # stored in a cert store
+ pass
+ return {'x509': x509, 'x509_ca': x509_ca, 'crl': crl}
+
# def _finalize_(self):
# ctx = self.ctx
@@ -472,17 +976,17 @@
# self = space.allocate_instance(SSLContext, w_subtype)
# self.__init__(space, protocol)
# return space.wrap(self)
-# def session_stats_w(self, space):
-# w_stats = space.newdict()
-# for name, ssl_func in SSL_CTX_STATS:
-# w_value = space.wrap(ssl_func(self.ctx))
-# space.setitem_str(w_stats, name, w_value)
-# return w_stats
-#
-# def descr_set_default_verify_paths(self, space):
-# if not libssl_SSL_CTX_set_default_verify_paths(self.ctx):
-# raise ssl_error(space, "")
-#
+
+ def session_stats(self):
+ stats = {}
+ for name, ssl_func in SSL_CTX_STATS:
+ stats[name] = ssl_func(self.ctx)
+ return stats
+
+ def set_default_verify_paths(self):
+ if not lib.SSL_CTX_set_default_verify_paths(self.ctx):
+ raise ssl_error("")
+
# def descr_get_options(self, space):
# return space.newlong(libssl_SSL_CTX_get_options(self.ctx))
#
@@ -500,51 +1004,6 @@
# if set:
# libssl_SSL_CTX_set_options(self.ctx, set)
#
-# def descr_get_verify_mode(self, space):
-# mode = libssl_SSL_CTX_get_verify_mode(self.ctx)
-# if mode == SSL_VERIFY_NONE:
-# return space.newlong(PY_SSL_CERT_NONE)
-# elif mode == SSL_VERIFY_PEER:
-# return space.newlong(PY_SSL_CERT_OPTIONAL)
-# elif mode == SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT:
-# return space.newlong(PY_SSL_CERT_REQUIRED)
-# raise ssl_error(space, "invalid return value from
SSL_CTX_get_verify_mode")
-#
-# def descr_set_verify_mode(self, space, w_mode):
-# n = space.int_w(w_mode)
-# if n == PY_SSL_CERT_NONE:
-# mode = SSL_VERIFY_NONE
-# elif n == PY_SSL_CERT_OPTIONAL:
-# mode = SSL_VERIFY_PEER
-# elif n == PY_SSL_CERT_REQUIRED:
-# mode = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT
-# else:
-# raise oefmt(space.w_ValueError,
-# "invalid value for verify_mode")
-# if mode == SSL_VERIFY_NONE and self.check_hostname:
-# raise oefmt(space.w_ValueError,
-# "Cannot set verify_mode to CERT_NONE when "
-# "check_hostname is enabled.")
-# libssl_SSL_CTX_set_verify(self.ctx, mode, None)
-#
-# def descr_get_verify_flags(self, space):
-# store = libssl_SSL_CTX_get_cert_store(self.ctx)
-# flags = libssl_X509_VERIFY_PARAM_get_flags(store[0].c_param)
-# return space.wrap(flags)
-#
-# def descr_set_verify_flags(self, space, w_obj):
-# new_flags = space.int_w(w_obj)
-# store = libssl_SSL_CTX_get_cert_store(self.ctx)
-# flags = libssl_X509_VERIFY_PARAM_get_flags(store[0].c_param)
-# flags_clear = flags & ~new_flags
-# flags_set = ~flags & new_flags
-# if flags_clear and not libssl_X509_VERIFY_PARAM_clear_flags(
-# store[0].c_param, flags_clear):
-# raise _ssl_seterror(space, None, 0)
-# if flags_set and not libssl_X509_VERIFY_PARAM_set_flags(
-# store[0].c_param, flags_set):
-# raise _ssl_seterror(space, None, 0)
-#
# def descr_get_check_hostname(self, space):
# return space.newbool(self.check_hostname)
#
@@ -556,126 +1015,35 @@
# "CERT_OPTIONAL or CERT_REQUIRED")
# self.check_hostname = check_hostname
#
-# @unwrap_spec(filepath=str)
-# def load_dh_params_w(self, space, filepath):
-# bio = libssl_BIO_new_file(filepath, "r")
-# if not bio:
-# errno = get_saved_errno()
-# libssl_ERR_clear_error()
-# raise wrap_oserror(space, OSError(errno, ''),
-# exception_name = 'w_IOError')
-# try:
-# dh = libssl_PEM_read_bio_DHparams(bio, None, None, None)
-# finally:
-# libssl_BIO_free(bio)
-# if not dh:
-# errno = get_saved_errno()
-# if errno != 0:
-# libssl_ERR_clear_error()
-# raise wrap_oserror(space, OSError(errno, ''))
-# else:
-# raise _ssl_seterror(space, None, 0)
-# try:
-# if libssl_SSL_CTX_set_tmp_dh(self.ctx, dh) == 0:
-# raise _ssl_seterror(space, None, 0)
-# finally:
-# libssl_DH_free(dh)
-#
-# def load_verify_locations_w(self, space, w_cafile=None, w_capath=None,
-# w_cadata=None):
-# if space.is_none(w_cafile):
-# cafile = None
-# else:
-# cafile = space.str_w(w_cafile)
-# if space.is_none(w_capath):
-# capath = None
-# else:
-# capath = space.str_w(w_capath)
-# if space.is_none(w_cadata):
-# cadata = None
-# ca_file_type = -1
-# else:
-# if not space.isinstance_w(w_cadata, space.w_unicode):
-# ca_file_type = SSL_FILETYPE_ASN1
-# cadata = space.bufferstr_w(w_cadata)
-# else:
-# ca_file_type = SSL_FILETYPE_PEM
-# try:
-# cadata = space.unicode_w(w_cadata).encode('ascii')
-# except UnicodeEncodeError:
-# raise oefmt(space.w_TypeError,
-# "cadata should be a ASCII string or a "
-# "bytes-like object")
-# if cafile is None and capath is None and cadata is None:
-# raise oefmt(space.w_TypeError,
-# "cafile and capath cannot be both omitted")
-# # load from cadata
-# if cadata is not None:
-# with rffi.scoped_nonmovingbuffer(cadata) as buf:
-# self._add_ca_certs(space, buf, len(cadata), ca_file_type)
-#
-# # load cafile or capath
-# if cafile is not None or capath is not None:
-# ret = libssl_SSL_CTX_load_verify_locations(
-# self.ctx, cafile, capath)
-# if ret != 1:
-# errno = get_saved_errno()
-# if errno:
-# libssl_ERR_clear_error()
-# raise wrap_oserror(space, OSError(errno, ''),
-# exception_name = 'w_IOError')
-# else:
-# raise _ssl_seterror(space, None, -1)
-#
-# def _add_ca_certs(self, space, data, size, ca_file_type):
-# biobuf = libssl_BIO_new_mem_buf(data, size)
-# if not biobuf:
-# raise ssl_error(space, "Can't allocate buffer")
-# try:
-# store = libssl_SSL_CTX_get_cert_store(self.ctx)
-# loaded = 0
-# while True:
-# if ca_file_type == SSL_FILETYPE_ASN1:
-# cert = libssl_d2i_X509_bio(
-# biobuf, None)
-# else:
-# cert = libssl_PEM_read_bio_X509(
-# biobuf, None, None, None)
-# if not cert:
-# break
-# try:
-# r = libssl_X509_STORE_add_cert(store, cert)
-# finally:
-# libssl_X509_free(cert)
-# if not r:
-# err = libssl_ERR_peek_last_error()
-# if (libssl_ERR_GET_LIB(err) == ERR_LIB_X509 and
-# libssl_ERR_GET_REASON(err) ==
-# X509_R_CERT_ALREADY_IN_HASH_TABLE):
-# # cert already in hash table, not an error
-# libssl_ERR_clear_error()
-# else:
-# break
-# loaded += 1
-#
-# err = libssl_ERR_peek_last_error()
-# if (ca_file_type == SSL_FILETYPE_ASN1 and
-# loaded > 0 and
-# libssl_ERR_GET_LIB(err) == ERR_LIB_ASN1 and
-# libssl_ERR_GET_REASON(err) == ASN1_R_HEADER_TOO_LONG):
-# # EOF ASN1 file, not an error
-# libssl_ERR_clear_error()
-# elif (ca_file_type == SSL_FILETYPE_PEM and
-# loaded > 0 and
-# libssl_ERR_GET_LIB(err) == ERR_LIB_PEM and
-# libssl_ERR_GET_REASON(err) == PEM_R_NO_START_LINE):
-# # EOF PEM file, not an error
-# libssl_ERR_clear_error()
-# else:
-# raise _ssl_seterror(space, None, 0)
-# finally:
-# libssl_BIO_free(biobuf)
-#
+ def load_dh_params(self, filepath):
+ ffi.errno = 0
+ if filepath is None:
+ raise TypeError("filepath must not be None")
+ buf = _str_to_ffi_buffer(filepath, zeroterm=True)
+ mode = ffi.new("char[]",b"r")
+ ffi.errno = 0
+ bio = lib.BIO_new_file(buf, mode)
+ if bio == ffi.NULL:
+ _errno = ffi.errno
+ lib.ERR_clear_error()
+ raise OSError(_errno, '')
+ try:
+ dh = lib.PEM_read_bio_DHparams(bio, ffi.NULL, ffi.NULL, ffi.NULL)
+ finally:
+ lib.BIO_free(bio)
+ if dh == ffi.NULL:
+ _errno = ffi.errno
+ if _errno != 0:
+ lib.ERR_clear_error()
+ raise OSError(_errno, '')
+ else:
+ raise ssl_lib_error()
+ try:
+ if lib.SSL_CTX_set_tmp_dh(self.ctx, dh) == 0:
+ raise ssl_lib_error()
+ finally:
+ lib.DH_free(dh)
+
# def cert_store_stats_w(self, space):
# store = libssl_SSL_CTX_get_cert_store(self.ctx)
# x509 = 0
@@ -720,63 +1088,126 @@
#
# self.alpn_protocols = SSLAlpnProtocols(self.ctx, protos)
#
-# def get_ca_certs_w(self, space, w_binary_form=None):
-# if w_binary_form and space.is_true(w_binary_form):
-# binary_mode = True
-# else:
-# binary_mode = False
-# rlist = []
-# store = libssl_SSL_CTX_get_cert_store(self.ctx)
-# for i in range(libssl_sk_X509_OBJECT_num(store[0].c_objs)):
-# obj = libssl_sk_X509_OBJECT_value(store[0].c_objs, i)
-# if intmask(obj.c_type) != X509_LU_X509:
-# # not a x509 cert
-# continue
-# # CA for any purpose
-# cert = libssl_pypy_X509_OBJECT_data_x509(obj)
-# if not libssl_X509_check_ca(cert):
-# continue
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit