Nir Soffer has uploaded a new change for review. Change subject: sslutils: Support non-blocking accept_ssl ......................................................................
sslutils: Support non-blocking accept_ssl Previously the client socket was in blocking mode when accept_ssl() was called. This invokes openssl's SSL_accept() which *blocks* until the SSL handshake is done, blocking the entire protocol detector. Using non-blocking client socket, accpet_ssl() returns immediately with return code 0, meaning that the SSL handshake is not done yet. This patch adds a new "accepting" property, so users of SSLSocket can tell if the socket is ready for use. If the socket is not ready yet when it is readable or writable, the owner must invoke "accept_ssl()" multiple times until the ssl handshake is done. This patch replaces the partial fix in commit ef4ca69543, adding accept timeout. Using non-blocking client socket, we don't care about accept timeout, since it can never block. Socket stuck in accepting state will be removed automatically by the periodical cleanup task in the protocol detector. Change-Id: I6c1cd2d0ff69925efe11ea7b166e1545d111a640 Signed-off-by: Nir Soffer <[email protected]> --- M lib/vdsm/sslutils.py M tests/sslTests.py M vdsm/protocoldetector.py 3 files changed, 164 insertions(+), 32 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/15/33815/1 diff --git a/lib/vdsm/sslutils.py b/lib/vdsm/sslutils.py index 74d2bdf..a63dc01 100644 --- a/lib/vdsm/sslutils.py +++ b/lib/vdsm/sslutils.py @@ -25,17 +25,25 @@ from M2Crypto import SSL, X509, threading -DEFAULT_ACCEPT_TIMEOUT = 5 - # M2Crypto.threading needs initialization. # See https://bugzilla.redhat.com/482420 threading.init() class SSLSocket(object): - def __init__(self, connection): + def __init__(self, connection, accepting=False): self.connection = connection + self._accepting = accepting self._data = None + + @property + def accepting(self): + """ + Returns True if the socket is in the middle of a non-blocking ssl + handshake. The owner of the socket must poll the socket and invoke + accept_ssl() when the socket is readable. + """ + return self._accepting def gettimeout(self): return self.connection.socket.gettimeout() @@ -80,6 +88,14 @@ else: return self.connection.makefile(mode, bufsize) + def accept_ssl(self): + """ + Try to complete the ssl handshake. + """ + established = self.connection.accept_ssl() + self._accepting = not established + return established + def __getattr__(self, name): return getattr(self.connection, name) @@ -108,25 +124,29 @@ self.connection = SSL.Connection(self.context, sock=raw) - self.accept_timeout = DEFAULT_ACCEPT_TIMEOUT - def fileno(self): return self.connection.socket.fileno() def accept(self): client, address = self.connection.socket.accept() + + # If using non-blocking server socket, it is likely that the caller is + # interested in non-blocking client socket. We must set the client + # socket to non-blocking now, otherwise client.accept_ssl() would block + # the entire non-blocking server until the ssl handshake is done. + if self.connection.socket.gettimeout() == 0.0: + client.setblocking(0) + client = SSL.Connection(self.context, client) client.addr = address try: client.setup_ssl() client.set_accept_state() - client.settimeout(self.accept_timeout) - client.accept_ssl() - client.settimeout(None) + accepting = not client.accept_ssl() except SSL.SSLError as e: raise SSL.SSLError("%s, client %s" % (e, address[0])) - client = SSLSocket(client) + client = SSLSocket(client, accepting=accepting) return client, address diff --git a/tests/sslTests.py b/tests/sslTests.py index 84fe1e3..be81633 100644 --- a/tests/sslTests.py +++ b/tests/sslTests.py @@ -18,18 +18,21 @@ # Refer to the README and COPYING files for full details of the license # +import asyncore import errno +import logging import os import re import SimpleXMLRPCServer import socket import ssl import subprocess +import sys import tempfile import threading import xmlrpclib -from contextlib import contextmanager, closing +from contextlib import contextmanager from M2Crypto import SSL from sslhelper import KEY_FILE, CRT_FILE, OTHER_KEY_FILE, OTHER_CRT_FILE from testlib import VdsmTestCase as TestCaseBase @@ -107,27 +110,6 @@ server.stop() -class SocketTests(TestCaseBase): - - def test_block_socket(self): - server = TestServer() - server.server.socket.accept_timeout = 1 - timeout = server.server.socket.accept_timeout + 1 - server.start() - try: - with closing(socket.socket(socket.AF_INET, - socket.SOCK_STREAM)) as client_socket: - client_socket.settimeout(timeout) - client_socket.connect((HOST, server.port)) - # Wait for data that will never arrive. - # This will return successfuly if the other side closes the - # connection (as expected) or raise timeout (which means the - # test failed) - client_socket.recv(100) - finally: - server.stop() - - class VerifyingTransportTests(TestCaseBase): def test_valid(self): @@ -150,6 +132,131 @@ client.add(2, 3) +class AsyncEchoHandler(asyncore.dispatcher_with_send): + + def handle_read(self): + if getattr(self.socket, 'accepting', False): + logging.info('handle_read: socket is accepting') + self.socket.accept_ssl() + return + logging.info('handle_read: socket is ready') + data = self.recv(8192) + logging.info('server received %r', data) + if data: + self.send(data) + + def handle_close(self): + logging.info('client disconnected') + self.close() + + def handle_error(self): + e = sys.exc_info()[1] + if isinstance(e, SSL.SSLError) and str(e) == 'unexpected eof': + self.handle_close() + else: + asyncore.dispatcher_with_send.handle_error(self) + + def initiate_send(self): + """ + Override to avoid sending empty strings, which raises + M2Crypto.SSL.SSLError('unexpecdted eof') + """ + if self.out_buffer: + asyncore.dispatcher_with_send.initiate_send(self) + + +class AsyncEchoServer(asyncore.dispatcher): + + def __init__(self, socket_map): + sock = SSLServerSocket(socket.socket(), keyfile=KEY_FILE, + certfile=CRT_FILE, ca_certs=CRT_FILE) + asyncore.dispatcher.__init__(self, sock, map=socket_map) + self.set_reuse_addr() + self.bind(('127.0.0.1', 0)) # Let the kernel choose a port + self.listen(50) + + def handle_accept(self): + try: + pair = self.accept() + except SSL.SSLError as e: + # sslutils makes bad M2Crypto errors even worse + if not str(e).startswith('unexpected eof'): + raise + else: + if pair is not None: + sock, addr = pair + logging.info('server accpected client %r', addr) + AsyncEchoHandler(sock, map=self._map) + + +class AsyncTests(TestCaseBase): + + CONCURRENCY = 50 + + def setUp(self): + self.ready = threading.Event() + self.stop = threading.Event() + self.server = None + self.server_port = None + self.socket_map = {} + self.start_server() + + def tearDown(self): + self.stop.set() + + def test_echo(self): + self.check_echo() + + def test_concurrent_echo(self): + done = [False] * self.CONCURRENCY + + def client(i): + self.check_echo() + done[i] = True + + threads = [] + for i in range(self.CONCURRENCY): + t = threading.Thread(target=client, args=(i,)) + t.daemon = True + t.start() + threads.append(t) + for t in threads: + t.join() + + self.assertTrue(all(done)) + + def check_echo(self): + data = 'testing 1 2 3...\n' + s = socket.socket() + try: + s = ssl.wrap_socket(s, KEY_FILE, CRT_FILE) + s.settimeout(1) + logging.info('client connecting to %r', self.server_port) + s.connect(('127.0.0.1', self.server_port)) + logging.info('client sending %r', data) + s.send(data) + received = s.recv(128) + logging.info('client received %r', received) + self.assertEqual(received, data) + finally: + s.close() + + def start_server(self): + t = threading.Thread(target=self.serve) + t.daemon = True + t.start() + self.ready.wait() + + def serve(self): + self.server = AsyncEchoServer(self.socket_map) + self.server_port = self.server.getsockname()[1] + logging.info('server listening on %r', self.server_port) + self.ready.set() + while not self.stop.is_set(): + asyncore.loop(timeout=1, map=self.socket_map) + logging.info('server stopped') + + class SSLServerThread(threading.Thread): """A very simple server thread. diff --git a/vdsm/protocoldetector.py b/vdsm/protocoldetector.py index 7e15f9c..6d1675c 100644 --- a/vdsm/protocoldetector.py +++ b/vdsm/protocoldetector.py @@ -198,6 +198,11 @@ def _handle_connection_read(self, fd): _, client_socket = self._pending_connections[fd] try: + # When using M2Crypto, we may need to invoke accept_ssl() multiple + # times until the ssl handshake is complete. + if getattr(client_socket, 'accepting', False): + client_socket.accept_ssl() + return data = client_socket.recv(self._required_size, socket.MSG_PEEK) except socket.error as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): @@ -233,6 +238,7 @@ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(addr[0][4]) server_socket.listen(5) + server_socket.setblocking(0) if self._sslctx: server_socket = SSLServerSocket(raw=server_socket, @@ -240,7 +246,6 @@ keyfile=self._sslctx.key_file, ca_certs=self._sslctx.ca_cert) - server_socket.setblocking(0) return server_socket -- To view, visit http://gerrit.ovirt.org/33815 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I6c1cd2d0ff69925efe11ea7b166e1545d111a640 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
