Author: Amaury Forgeot d'Arc <[email protected]>
Branch:
Changeset: r44090:9d6a4800b387
Date: 2011-05-10 23:08 +0200
http://bitbucket.org/pypy/pypy/changeset/9d6a4800b387/
Log: ssl module: add certificate validation, cipher methods...
test_ssl.py passes tests with the "-u network" option
diff --git a/lib-python/modified-2.7/test/test_ssl.py
b/lib-python/modified-2.7/test/test_ssl.py
--- a/lib-python/modified-2.7/test/test_ssl.py
+++ b/lib-python/modified-2.7/test/test_ssl.py
@@ -105,7 +105,6 @@
print "didn't raise TypeError"
ssl.RAND_add("this is a random string", 75.0)
- @test_support.impl_detail("obscure test")
def test_parse_cert(self):
# note that this uses an 'unofficial' function in _ssl.c,
# provided solely for this test, to exercise the certificate
diff --git a/pypy/module/_ssl/__init__.py b/pypy/module/_ssl/__init__.py
--- a/pypy/module/_ssl/__init__.py
+++ b/pypy/module/_ssl/__init__.py
@@ -7,6 +7,7 @@
interpleveldefs = {
'sslwrap': 'interp_ssl.sslwrap',
'SSLError': 'interp_ssl.get_error(space)',
+ '_test_decode_cert': 'interp_ssl._test_decode_cert',
}
appleveldefs = {
diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py
--- a/pypy/module/_ssl/interp_ssl.py
+++ b/pypy/module/_ssl/interp_ssl.py
@@ -4,6 +4,7 @@
from pypy.interpreter.typedef import TypeDef
from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.rlib.rarithmetic import intmask
from pypy.rlib import rpoll, rsocket
from pypy.rlib.ropenssl import *
@@ -68,11 +69,8 @@
def ssl_error(space, msg, errno=0):
w_exception_class = get_error(space)
- if errno:
- w_exception = space.call_function(w_exception_class,
- space.wrap(errno), space.wrap(msg))
- else:
- w_exception = space.call_function(w_exception_class, space.wrap(msg))
+ w_exception = space.call_function(w_exception_class,
+ space.wrap(errno), space.wrap(msg))
return OperationError(w_exception_class, w_exception)
if HAVE_OPENSSL_RAND:
@@ -169,10 +167,10 @@
num_bytes = 0
while True:
err = 0
-
+
num_bytes = libssl_SSL_write(self.ssl, data, len(data))
err = libssl_SSL_get_error(self.ssl, num_bytes)
-
+
if err == SSL_ERROR_WANT_READ:
sockstate = check_socket_and_wait_for_timeout(self.space,
self.w_socket, False)
@@ -181,19 +179,19 @@
self.w_socket, True)
else:
sockstate = SOCKET_OPERATION_OK
-
+
if sockstate == SOCKET_HAS_TIMED_OUT:
raise ssl_error(self.space, "The write operation timed out")
elif sockstate == SOCKET_HAS_BEEN_CLOSED:
raise ssl_error(self.space, "Underlying socket has been
closed.")
elif sockstate == SOCKET_IS_NONBLOCKING:
break
-
+
if err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE:
continue
else:
break
-
+
if num_bytes > 0:
return self.space.wrap(num_bytes)
else:
@@ -379,6 +377,247 @@
return self.w_socket
+ def cipher(self, space):
+ if not self.ssl:
+ return space.w_None
+ current = libssl_SSL_get_current_cipher(self.ssl)
+ if not current:
+ return space.w_None
+
+ name = libssl_SSL_CIPHER_get_name(current)
+ if name:
+ w_name = space.wrap(rffi.charp2str(name))
+ else:
+ w_name = space.w_None
+
+ proto = libssl_SSL_CIPHER_get_version(current)
+ if proto:
+ w_proto = space.wrap(rffi.charp2str(name))
+ else:
+ w_proto = space.w_None
+
+ bits = libssl_SSL_CIPHER_get_bits(current,
+ lltype.nullptr(rffi.INTP.TO))
+ w_bits = space.newint(bits)
+
+ return space.newtuple([w_name, w_proto, w_bits])
+
+ @unwrap_spec(der=bool)
+ def peer_certificate(self, der=False):
+ """peer_certificate([der=False]) -> certificate
+
+ Returns the certificate for the peer. If no certificate was provided,
+ returns None. If a certificate was provided, but not validated,
returns
+ an empty dictionary. Otherwise returns a dict containing information
+ about the peer certificate.
+
+ If the optional argument is True, returns a DER-encoded copy of the
+ peer certificate, or None if no certificate was provided. This will
+ return the certificate even if it wasn't validated."""
+ if not self.peer_cert:
+ return self.space.w_None
+
+ if der:
+ # return cert in DER-encoded format
+ with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
+ buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO)
+ length = libssl_i2d_X509(self.peer_cert, buf_ptr)
+ if length < 0:
+ raise _ssl_seterror(self.space, self, length)
+ try:
+ # this is actually an immutable bytes sequence
+ return self.space.wrap(rffi.charp2str(buf_ptr[0]))
+ finally:
+ libssl_OPENSSL_free(buf_ptr[0])
+ else:
+ verification = libssl_SSL_CTX_get_verify_mode(
+ libssl_SSL_get_SSL_CTX(self.ssl))
+ if not verification & SSL_VERIFY_PEER:
+ return self.space.newdict()
+ else:
+ return _decode_certificate(self.space, self.peer_cert)
+
+def _decode_certificate(space, certificate, verbose=False):
+ w_retval = space.newdict()
+
+ w_peer = _create_tuple_for_X509_NAME(
+ space, libssl_X509_get_subject_name(certificate))
+ space.setitem(w_retval, space.wrap("subject"), w_peer)
+
+ if verbose:
+ w_issuer = _create_tuple_for_X509_NAME(
+ space, libssl_X509_get_issuer_name(certificate))
+ space.setitem(w_retval, space.wrap("issuer"), w_issuer)
+
+ space.setitem(w_retval, space.wrap("version"),
+ space.wrap(libssl_X509_get_version(certificate)))
+
+ biobuf = libssl_BIO_new(libssl_BIO_s_mem())
+ try:
+
+ if verbose:
+ libssl_BIO_reset(biobuf)
+ serialNumber = libssl_X509_get_serialNumber(certificate)
+ libssl_i2a_ASN1_INTEGER(biobuf, serialNumber)
+ # should not exceed 20 octets, 160 bits, so buf is big enough
+ with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
+ length = libssl_BIO_gets(biobuf, buf, 99)
+ if length < 0:
+ raise _ssl_seterror(space, None, length)
+
+ w_serial = space.wrap(rffi.charpsize2str(buf, length))
+ space.setitem(w_retval, space.wrap("serialNumber"), w_serial)
+
+ libssl_BIO_reset(biobuf)
+ notBefore = libssl_X509_get_notBefore(certificate)
+ libssl_ASN1_TIME_print(biobuf, notBefore)
+ with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
+ length = libssl_BIO_gets(biobuf, buf, 99)
+ if length < 0:
+ raise _ssl_seterror(space, None, length)
+ w_date = space.wrap(rffi.charpsize2str(buf, length))
+ space.setitem(w_retval, space.wrap("notBefore"), w_date)
+
+ libssl_BIO_reset(biobuf)
+ notAfter = libssl_X509_get_notAfter(certificate)
+ libssl_ASN1_TIME_print(biobuf, notAfter)
+ with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf:
+ length = libssl_BIO_gets(biobuf, buf, 99)
+ if length < 0:
+ raise _ssl_seterror(space, None, length)
+ w_date = space.wrap(rffi.charpsize2str(buf, length))
+ space.setitem(w_retval, space.wrap("notAfter"), w_date)
+ finally:
+ libssl_BIO_free(biobuf)
+
+ # Now look for subjectAltName
+ w_alt_names = _get_peer_alt_names(space, certificate)
+ if w_alt_names is not space.w_None:
+ space.setitem(w_retval, space.wrap("subjectAltName"), w_alt_names)
+
+ return w_retval
+
+def _create_tuple_for_X509_NAME(space, xname):
+ entry_count = libssl_X509_NAME_entry_count(xname)
+ dn_w = []
+ rdn_w = []
+ rdn_level = -1
+ for index in range(entry_count):
+ entry = libssl_X509_NAME_get_entry(xname, index)
+ # check to see if we've gotten to a new RDN
+ entry_level = intmask(entry[0].c_set)
+ if rdn_level >= 0:
+ if rdn_level != entry_level:
+ # yes, new RDN
+ # add old RDN to DN
+ dn_w.append(space.newtuple(list(rdn_w)))
+ rdn_w = []
+ rdn_level = entry_level
+
+ # Now add this attribute to the current RDN
+ name = libssl_X509_NAME_ENTRY_get_object(entry)
+ value = libssl_X509_NAME_ENTRY_get_data(entry)
+ attr = _create_tuple_for_attribute(space, name, value)
+ rdn_w.append(attr)
+
+ # Now, there is typically a dangling RDN
+ if rdn_w:
+ dn_w.append(space.newtuple(list(rdn_w)))
+ return space.newtuple(list(dn_w))
+
+def _get_peer_alt_names(space, certificate):
+ # this code follows the procedure outlined in
+ # OpenSSL's crypto/x509v3/v3_prn.c:X509v3_EXT_print()
+ # function to extract the STACK_OF(GENERAL_NAME),
+ # then iterates through the stack to add the
+ # names.
+
+ if not certificate:
+ return space.w_None
+
+ # get a memory buffer
+ biobuf = libssl_BIO_new(libssl_BIO_s_mem())
+
+ try:
+ alt_names_w = []
+ i = 0
+ while True:
+ i = libssl_X509_get_ext_by_NID(
+ certificate, NID_subject_alt_name, i)
+ if i < 0:
+ break
+
+ # now decode the altName
+ ext = libssl_X509_get_ext(certificate, i)
+ method = libssl_X509V3_EXT_get(ext)
+ if not method:
+ raise ssl_error(space,
+ "No method for internalizing subjectAltName!'")
+
+ with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as p_ptr:
+ p_ptr[0] = ext[0].c_value.c_data
+ length = intmask(ext[0].c_value.c_length)
+ null = lltype.nullptr(rffi.VOIDP.TO)
+ if method[0].c_it:
+ names = rffi.cast(GENERAL_NAMES, libssl_ASN1_item_d2i(
+ null, p_ptr, length,
+ libssl_ASN1_ITEM_ptr(method[0].c_it)))
+ else:
+ names = rffi.cast(GENERAL_NAMES, method[0].c_d2i(
+ null, p_ptr, length))
+
+ for j in range(libssl_sk_GENERAL_NAME_num(names)):
+ # Get a rendering of each name in the set of names
+
+ name = libssl_sk_GENERAL_NAME_value(names, j)
+ if intmask(name[0].c_type) == GEN_DIRNAME:
+
+ # we special-case DirName as a tuple of tuples of
attributes
+ dirname = libssl_pypy_GENERAL_NAME_dirn(name)
+ w_t = space.newtuple([
+ space.wrap("DirName"),
+ _create_tuple_for_X509_NAME(space, dirname)
+ ])
+ else:
+
+ # for everything else, we use the OpenSSL print form
+
+ libssl_BIO_reset(biobuf)
+ libssl_GENERAL_NAME_print(biobuf, name)
+ with lltype.scoped_alloc(rffi.CCHARP.TO, 2048) as buf:
+ length = libssl_BIO_gets(biobuf, buf, 2047)
+ if length < 0:
+ raise _ssl_seterror(space, None, 0)
+
+ v = rffi.charpsize2str(buf, length)
+ v1, v2 = v.split(':', 1)
+ w_t = space.newtuple([space.wrap(v1),
+ space.wrap(v2)])
+
+ alt_names_w.append(w_t)
+ finally:
+ libssl_BIO_free(biobuf)
+
+ if alt_names_w:
+ return space.newtuple(list(alt_names_w))
+ else:
+ return space.w_None
+
+def _create_tuple_for_attribute(space, name, value):
+ with lltype.scoped_alloc(rffi.CCHARP.TO, X509_NAME_MAXLEN) as buf:
+ length = libssl_OBJ_obj2txt(buf, X509_NAME_MAXLEN, name, 0)
+ if length < 0:
+ raise _ssl_seterror(space, None, 0)
+ w_name = space.wrap(rffi.charpsize2str(buf, length))
+
+ with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr:
+ length = libssl_ASN1_STRING_to_UTF8(buf_ptr, value)
+ if length < 0:
+ raise _ssl_seterror(space, None, 0)
+ w_value = space.wrap(rffi.charpsize2str(buf_ptr[0], length))
+ w_value = space.call_method(w_value, "decode", space.wrap("utf-8"))
+
+ return space.newtuple([w_name, w_value])
SSLObject.typedef = TypeDef("SSLObject",
server = interp2app(SSLObject.server),
@@ -386,12 +625,15 @@
write = interp2app(SSLObject.write),
pending = interp2app(SSLObject.pending),
read = interp2app(SSLObject.read),
- do_handshake=interp2app(SSLObject.do_handshake),
- shutdown=interp2app(SSLObject.shutdown),
+ do_handshake = interp2app(SSLObject.do_handshake),
+ shutdown = interp2app(SSLObject.shutdown),
+ cipher = interp2app(SSLObject.cipher),
+ peer_certificate = interp2app(SSLObject.peer_certificate),
)
-def new_sslobject(space, w_sock, side, w_key_file, w_cert_file):
+def new_sslobject(space, w_sock, side, w_key_file, w_cert_file,
+ cert_mode, protocol, w_cacerts_file, w_ciphers):
ss = SSLObject(space)
sock_fd = space.int_w(space.call_method(w_sock, "fileno"))
@@ -408,18 +650,47 @@
cert_file = None
else:
cert_file = space.str_w(w_cert_file)
+ if space.is_w(w_cacerts_file, space.w_None):
+ cacerts_file = None
+ else:
+ cacerts_file = space.str_w(w_cacerts_file)
+ if space.is_w(w_ciphers, space.w_None):
+ ciphers = None
+ else:
+ ciphers = space.str_w(w_ciphers)
if side == PY_SSL_SERVER and (not key_file or not cert_file):
raise ssl_error(space, "Both the key & certificate files "
"must be specified for server-side operation")
- ss.ctx = libssl_SSL_CTX_new(libssl_SSLv23_method()) # set up context
+ # set up context
+ if protocol == PY_SSL_VERSION_TLS1:
+ method = libssl_TLSv1_method()
+ elif protocol == PY_SSL_VERSION_SSL3:
+ method = libssl_SSLv3_method()
+ elif protocol == PY_SSL_VERSION_SSL2:
+ method = libssl_SSLv2_method()
+ elif protocol == PY_SSL_VERSION_SSL23:
+ method = libssl_SSLv23_method()
+ else:
+ raise ssl_error(space, "Invalid SSL protocol variant specified")
+ ss.ctx = libssl_SSL_CTX_new(method)
if not ss.ctx:
- raise ssl_error(space, "Invalid SSL protocol variant specified")
+ raise ssl_error(space, "Could not create SSL context")
- # XXX SSL_CTX_set_cipher_list?
+ if ciphers:
+ ret = libssl_SSL_CTX_set_cipher_list(ss.ctx, ciphers)
+ if ret == 0:
+ raise ssl_error(space, "No cipher can be selected.")
- # XXX SSL_CTX_load_verify_locations?
+ if cert_mode != PY_SSL_CERT_NONE:
+ if not cacerts_file:
+ raise ssl_error(space,
+ "No root certificates specified for "
+ "verification of other-side certificates.")
+ ret = libssl_SSL_CTX_load_verify_locations(ss.ctx, cacerts_file, None)
+ if ret != 1:
+ raise _ssl_seterror(space, None, 0)
if key_file:
ret = libssl_SSL_CTX_use_PrivateKey_file(ss.ctx, key_file,
@@ -434,7 +705,12 @@
# ssl compatibility
libssl_SSL_CTX_set_options(ss.ctx, SSL_OP_ALL)
- libssl_SSL_CTX_set_verify(ss.ctx, SSL_VERIFY_NONE, None) # set verify level
+ verification_mode = SSL_VERIFY_NONE
+ if cert_mode == PY_SSL_CERT_OPTIONAL:
+ verification_mode = SSL_VERIFY_PEER
+ elif cert_mode == PY_SSL_CERT_REQUIRED:
+ verification_mode = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT
+ libssl_SSL_CTX_set_verify(ss.ctx, verification_mode, None)
ss.ssl = libssl_SSL_new(ss.ctx) # new ssl struct
libssl_SSL_set_fd(ss.ssl, sock_fd) # set the socket for SSL
libssl_SSL_set_mode(ss.ssl, SSL_MODE_AUTO_RETRY)
@@ -443,8 +719,8 @@
# to non-blocking mode (blocking is the default)
if has_timeout:
# Set both the read and write BIO's to non-blocking mode
- libssl_BIO_ctrl(libssl_SSL_get_rbio(ss.ssl), BIO_C_SET_NBIO, 1, None)
- libssl_BIO_ctrl(libssl_SSL_get_wbio(ss.ssl), BIO_C_SET_NBIO, 1, None)
+ libssl_BIO_set_nbio(libssl_SSL_get_rbio(ss.ssl), 1)
+ libssl_BIO_set_nbio(libssl_SSL_get_wbio(ss.ssl), 1)
libssl_SSL_set_connect_state(ss.ssl)
if side == PY_SSL_CLIENT:
@@ -505,7 +781,10 @@
def _ssl_seterror(space, ss, ret):
assert ret <= 0
- err = libssl_SSL_get_error(ss.ssl, ret)
+ if ss and ss.ssl:
+ err = libssl_SSL_get_error(ss.ssl, ret)
+ else:
+ err = SSL_ERROR_SSL
errstr = ""
errval = 0
@@ -557,10 +836,12 @@
@unwrap_spec(side=int, cert_mode=int, protocol=int)
def sslwrap(space, w_socket, side, w_key_file=None, w_cert_file=None,
cert_mode=PY_SSL_CERT_NONE, protocol=PY_SSL_VERSION_SSL23,
- w_cacerts_file=None, w_cipher=None):
+ w_cacerts_file=None, w_ciphers=None):
"""sslwrap(socket, side, [keyfile, certfile]) -> sslobject"""
return space.wrap(new_sslobject(
- space, w_socket, side, w_key_file, w_cert_file))
+ space, w_socket, side, w_key_file, w_cert_file,
+ cert_mode, protocol,
+ w_cacerts_file, w_ciphers))
class Cache:
def __init__(self, space):
@@ -570,3 +851,25 @@
def get_error(space):
return space.fromcache(Cache).w_error
+
+@unwrap_spec(filename=str, verbose=bool)
+def _test_decode_cert(space, filename, verbose=True):
+ cert = libssl_BIO_new(libssl_BIO_s_file())
+ if not cert:
+ raise ssl_error(space, "Can't malloc memory to read file")
+
+ try:
+ if libssl_BIO_read_filename(cert, filename) <= 0:
+ raise ssl_error(space, "Can't open file")
+
+ x = libssl_PEM_read_bio_X509_AUX(cert, None, None, None)
+ if not x:
+ raise ssl_error(space, "Error decoding PEM-encoded file")
+
+ try:
+ return _decode_certificate(space, x, verbose)
+ finally:
+ libssl_X509_free(x)
+ finally:
+ libssl_BIO_free(cert)
+
diff --git a/pypy/rlib/ropenssl.py b/pypy/rlib/ropenssl.py
--- a/pypy/rlib/ropenssl.py
+++ b/pypy/rlib/ropenssl.py
@@ -15,19 +15,27 @@
'winsock2.h',
# wincrypt.h defines X509_NAME, include it here
# so that openssl/ssl.h can repair this nonsense.
- 'wincrypt.h',
- 'openssl/ssl.h',
- 'openssl/err.h',
- 'openssl/evp.h']
+ 'wincrypt.h']
else:
libraries = ['ssl', 'crypto']
- includes = ['openssl/ssl.h', 'openssl/err.h',
- 'openssl/evp.h']
+ includes = []
+
+includes += [
+ 'openssl/ssl.h',
+ 'openssl/err.h',
+ 'openssl/rand.h',
+ 'openssl/evp.h',
+ 'openssl/x509v3.h']
eci = ExternalCompilationInfo(
libraries = libraries,
includes = includes,
export_symbols = [],
+ post_include_bits = [
+ # Unnamed structures are not supported by rffi_platform.
+ # So we replace an attribute access with a macro call.
+ '#define pypy_GENERAL_NAME_dirn(name) (name->d.dirn)',
+ ],
)
eci = rffi_platform.configure_external_library(
@@ -43,6 +51,10 @@
else:
from pypy.rlib._rsocket_rffi import FD_SETSIZE as MAX_FD_SIZE
+ASN1_STRING = lltype.Ptr(lltype.ForwardReference())
+ASN1_ITEM = rffi.COpaquePtr('ASN1_ITEM')
+X509_NAME = rffi.COpaquePtr('X509_NAME')
+
class CConfig:
_compilation_info_ = eci
@@ -53,6 +65,8 @@
SSL_FILETYPE_PEM = rffi_platform.ConstantInteger("SSL_FILETYPE_PEM")
SSL_OP_ALL = rffi_platform.ConstantInteger("SSL_OP_ALL")
SSL_VERIFY_NONE = rffi_platform.ConstantInteger("SSL_VERIFY_NONE")
+ SSL_VERIFY_PEER = rffi_platform.ConstantInteger("SSL_VERIFY_PEER")
+ SSL_VERIFY_FAIL_IF_NO_PEER_CERT =
rffi_platform.ConstantInteger("SSL_VERIFY_FAIL_IF_NO_PEER_CERT")
SSL_ERROR_WANT_READ = rffi_platform.ConstantInteger(
"SSL_ERROR_WANT_READ")
SSL_ERROR_WANT_WRITE = rffi_platform.ConstantInteger(
@@ -67,21 +81,52 @@
SSL_ERROR_SSL = rffi_platform.ConstantInteger("SSL_ERROR_SSL")
SSL_RECEIVED_SHUTDOWN = rffi_platform.ConstantInteger(
"SSL_RECEIVED_SHUTDOWN")
- SSL_CTRL_OPTIONS = rffi_platform.ConstantInteger("SSL_CTRL_OPTIONS")
- SSL_CTRL_MODE = rffi_platform.ConstantInteger("SSL_CTRL_MODE")
- BIO_C_SET_NBIO = rffi_platform.ConstantInteger("BIO_C_SET_NBIO")
SSL_MODE_AUTO_RETRY = rffi_platform.ConstantInteger("SSL_MODE_AUTO_RETRY")
+ NID_subject_alt_name =
rffi_platform.ConstantInteger("NID_subject_alt_name")
+ GEN_DIRNAME = rffi_platform.ConstantInteger("GEN_DIRNAME")
+
+ # Some structures, with only the fields used in the _ssl module
+ X509_name_entry_st = rffi_platform.Struct('struct X509_name_entry_st',
+ [('set', rffi.INT)])
+ asn1_string_st = rffi_platform.Struct('struct asn1_string_st',
+ [('length', rffi.INT),
+ ('data', rffi.CCHARP)])
+ X509_extension_st = rffi_platform.Struct(
+ 'struct X509_extension_st',
+ [('value', ASN1_STRING)])
+ ASN1_ITEM_EXP = lltype.FuncType([], ASN1_ITEM)
+ X509V3_EXT_D2I = lltype.FuncType([rffi.VOIDP, rffi.CCHARPP, rffi.LONG],
+ rffi.VOIDP)
+ v3_ext_method = rffi_platform.Struct(
+ 'struct v3_ext_method',
+ [('it', lltype.Ptr(ASN1_ITEM_EXP)),
+ ('d2i', lltype.Ptr(X509V3_EXT_D2I))])
+ GENERAL_NAME_st = rffi_platform.Struct(
+ 'struct GENERAL_NAME_st',
+ [('type', rffi.INT),
+ ])
+
+
for k, v in rffi_platform.configure(CConfig).items():
globals()[k] = v
# opaque structures
SSL_METHOD = rffi.COpaquePtr('SSL_METHOD')
SSL_CTX = rffi.COpaquePtr('SSL_CTX')
+SSL_CIPHER = rffi.COpaquePtr('SSL_CIPHER')
SSL = rffi.COpaquePtr('SSL')
BIO = rffi.COpaquePtr('BIO')
X509 = rffi.COpaquePtr('X509')
-X509_NAME = rffi.COpaquePtr('X509_NAME')
+X509_NAME_ENTRY = rffi.CArrayPtr(X509_name_entry_st)
+X509_EXTENSION = rffi.CArrayPtr(X509_extension_st)
+X509V3_EXT_METHOD = rffi.CArrayPtr(v3_ext_method)
+ASN1_OBJECT = rffi.COpaquePtr('ASN1_OBJECT')
+ASN1_STRING.TO.become(asn1_string_st)
+ASN1_TIME = rffi.COpaquePtr('ASN1_TIME')
+ASN1_INTEGER = rffi.COpaquePtr('ASN1_INTEGER')
+GENERAL_NAMES = rffi.COpaquePtr('GENERAL_NAMES')
+GENERAL_NAME = rffi.CArrayPtr(GENERAL_NAME_st)
HAVE_OPENSSL_RAND = OPENSSL_VERSION_NUMBER >= 0x0090500f
@@ -102,13 +147,22 @@
ssl_external('RAND_status', [], rffi.INT)
ssl_external('RAND_egd', [rffi.CCHARP], rffi.INT)
ssl_external('SSL_CTX_new', [SSL_METHOD], SSL_CTX)
+ssl_external('SSL_get_SSL_CTX', [SSL], SSL_CTX)
+ssl_external('TLSv1_method', [], SSL_METHOD)
+ssl_external('SSLv2_method', [], SSL_METHOD)
+ssl_external('SSLv3_method', [], SSL_METHOD)
ssl_external('SSLv23_method', [], SSL_METHOD)
ssl_external('SSL_CTX_use_PrivateKey_file', [SSL_CTX, rffi.CCHARP, rffi.INT],
rffi.INT)
ssl_external('SSL_CTX_use_certificate_chain_file', [SSL_CTX, rffi.CCHARP],
rffi.INT)
+ssl_external('SSL_CTX_set_options', [SSL_CTX, rffi.INT], rffi.INT, macro=True)
ssl_external('SSL_CTX_ctrl', [SSL_CTX, rffi.INT, rffi.INT, rffi.VOIDP],
rffi.INT)
ssl_external('SSL_CTX_set_verify', [SSL_CTX, rffi.INT, rffi.VOIDP],
lltype.Void)
+ssl_external('SSL_CTX_get_verify_mode', [SSL_CTX], rffi.INT)
+ssl_external('SSL_CTX_set_cipher_list', [SSL_CTX, rffi.CCHARP], rffi.INT)
+ssl_external('SSL_CTX_load_verify_locations', [SSL_CTX, rffi.CCHARP,
rffi.CCHARP], rffi.INT)
ssl_external('SSL_new', [SSL_CTX], SSL)
ssl_external('SSL_set_fd', [SSL, rffi.INT], rffi.INT)
+ssl_external('SSL_set_mode', [SSL, rffi.INT], rffi.INT, macro=True)
ssl_external('SSL_ctrl', [SSL, rffi.INT, rffi.INT, rffi.VOIDP], rffi.INT)
ssl_external('BIO_ctrl', [BIO, rffi.INT, rffi.INT, rffi.VOIDP], rffi.INT)
ssl_external('SSL_get_rbio', [SSL], BIO)
@@ -122,20 +176,70 @@
ssl_external('SSL_get_shutdown', [SSL], rffi.INT)
ssl_external('SSL_set_read_ahead', [SSL, rffi.INT], lltype.Void)
-ssl_external('ERR_get_error', [], rffi.INT)
-ssl_external('ERR_error_string', [rffi.ULONG, rffi.CCHARP], rffi.CCHARP)
ssl_external('SSL_get_peer_certificate', [SSL], X509)
ssl_external('X509_get_subject_name', [X509], X509_NAME)
ssl_external('X509_get_issuer_name', [X509], X509_NAME)
ssl_external('X509_NAME_oneline', [X509_NAME, rffi.CCHARP, rffi.INT],
rffi.CCHARP)
+ssl_external('X509_NAME_entry_count', [X509_NAME], rffi.INT)
+ssl_external('X509_NAME_get_entry', [X509_NAME, rffi.INT], X509_NAME_ENTRY)
+ssl_external('X509_NAME_ENTRY_get_object', [X509_NAME_ENTRY], ASN1_OBJECT)
+ssl_external('X509_NAME_ENTRY_get_data', [X509_NAME_ENTRY], ASN1_STRING)
+ssl_external('i2d_X509', [X509, rffi.CCHARPP], rffi.INT)
ssl_external('X509_free', [X509], lltype.Void)
+ssl_external('X509_get_notBefore', [X509], ASN1_TIME, macro=True)
+ssl_external('X509_get_notAfter', [X509], ASN1_TIME, macro=True)
+ssl_external('X509_get_serialNumber', [X509], ASN1_INTEGER)
+ssl_external('X509_get_version', [X509], rffi.INT, macro=True)
+ssl_external('X509_get_ext_by_NID', [X509, rffi.INT, rffi.INT], rffi.INT)
+ssl_external('X509_get_ext', [X509, rffi.INT], X509_EXTENSION)
+ssl_external('X509V3_EXT_get', [X509_EXTENSION], X509V3_EXT_METHOD)
+
+
+ssl_external('OBJ_obj2txt',
+ [rffi.CCHARP, rffi.INT, ASN1_OBJECT, rffi.INT], rffi.INT)
+ssl_external('ASN1_STRING_to_UTF8', [rffi.CCHARPP, ASN1_STRING], rffi.INT)
+ssl_external('ASN1_TIME_print', [BIO, ASN1_TIME], rffi.INT)
+ssl_external('i2a_ASN1_INTEGER', [BIO, ASN1_INTEGER], rffi.INT)
+ssl_external('ASN1_item_d2i',
+ [rffi.VOIDP, rffi.CCHARPP, rffi.LONG, ASN1_ITEM], rffi.VOIDP)
+ssl_external('ASN1_ITEM_ptr', [rffi.VOIDP], ASN1_ITEM, macro=True)
+
+ssl_external('sk_GENERAL_NAME_num', [GENERAL_NAMES], rffi.INT,
+ macro=True)
+ssl_external('sk_GENERAL_NAME_value', [GENERAL_NAMES, rffi.INT], GENERAL_NAME,
+ macro=True)
+ssl_external('GENERAL_NAME_print', [BIO, GENERAL_NAME], rffi.INT)
+ssl_external('pypy_GENERAL_NAME_dirn', [GENERAL_NAME], X509_NAME,
+ macro=True)
+
+ssl_external('SSL_get_current_cipher', [SSL], SSL_CIPHER)
+ssl_external('SSL_CIPHER_get_name', [SSL_CIPHER], rffi.CCHARP)
+ssl_external('SSL_CIPHER_get_version', [SSL_CIPHER], rffi.CCHARP)
+ssl_external('SSL_CIPHER_get_bits', [SSL_CIPHER, rffi.INTP], rffi.INT)
+
+ssl_external('ERR_get_error', [], rffi.INT)
+ssl_external('ERR_error_string', [rffi.ULONG, rffi.CCHARP], rffi.CCHARP)
+
ssl_external('SSL_free', [SSL], lltype.Void)
ssl_external('SSL_CTX_free', [SSL_CTX], lltype.Void)
+ssl_external('CRYPTO_free', [rffi.VOIDP], lltype.Void)
+libssl_OPENSSL_free = libssl_CRYPTO_free
+
ssl_external('SSL_write', [SSL, rffi.CCHARP, rffi.INT], rffi.INT)
ssl_external('SSL_pending', [SSL], rffi.INT)
ssl_external('SSL_read', [SSL, rffi.CCHARP, rffi.INT], rffi.INT)
-ssl_external('SSL_read', [SSL, rffi.CCHARP, rffi.INT], rffi.INT)
+BIO_METHOD = rffi.COpaquePtr('BIO_METHOD')
+ssl_external('BIO_s_mem', [], BIO_METHOD)
+ssl_external('BIO_s_file', [], BIO_METHOD)
+ssl_external('BIO_new', [BIO_METHOD], BIO)
+ssl_external('BIO_set_nbio', [BIO, rffi.INT], rffi.INT, macro=True)
+ssl_external('BIO_free', [BIO], rffi.INT)
+ssl_external('BIO_reset', [BIO], rffi.INT, macro=True)
+ssl_external('BIO_read_filename', [BIO, rffi.CCHARP], rffi.INT, macro=True)
+ssl_external('BIO_gets', [BIO, rffi.CCHARP, rffi.INT], rffi.INT)
+ssl_external('PEM_read_bio_X509_AUX',
+ [BIO, rffi.VOIDP, rffi.VOIDP, rffi.VOIDP], X509)
EVP_MD_CTX = rffi.COpaquePtr('EVP_MD_CTX', compilation_info=eci)
EVP_MD = rffi.COpaquePtr('EVP_MD')
@@ -159,13 +263,6 @@
EVP_MD_CTX_cleanup = external(
'EVP_MD_CTX_cleanup', [EVP_MD_CTX], rffi.INT)
-def libssl_SSL_set_mode(ssl, op):
- return libssl_SSL_ctrl(ssl, SSL_CTRL_MODE, op, None)
-def libssl_SSL_CTX_set_options(ctx, op):
- return libssl_SSL_CTX_ctrl(ctx, SSL_CTRL_OPTIONS, op, None)
-def libssl_BIO_set_nbio(bio, nonblocking):
- return libssl_BIO_ctrl(bio, BIO_C_SET_NBIO, nonblocking, None)
-
def init_ssl():
libssl_SSL_load_error_strings()
libssl_SSL_library_init()
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit