This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 7838300 DISPATCH-1230: Fix issues with detecting TLSv1.3 in proton and OpenSSL 7838300 is described below commit 783830026582fe7645442d564574d605e155e030 Author: Chuck Rolke <c...@apache.org> AuthorDate: Mon Apr 22 11:28:40 2019 -0400 DISPATCH-1230: Fix issues with detecting TLSv1.3 in proton and OpenSSL Merge branch https://github.com/fgiogetti/fgiorgetti-DISPATCH-1230-2 This closes #487 Squashed commit of the following: commit 3733d5337a673b3cf8a9a6904bef24616cf026ef Author: Fernando Giorgetti <fgior...@redhat.com> Date: Thu Apr 18 12:14:54 2019 -0300 DISPATCH-1230 - Reading OpenSSL version from command line commit 0d98c579bb47fb6af367d22478ed08d13c4f27ba Author: Chuck Rolke <c...@apache.org> Date: Tue Apr 16 12:08:17 2019 -0400 DISPATCH-1230: TLSv1.3: detect Proton 1.3 support; 1.3-only test commit d94af68ec1d7633abb9e93729f9d22d6402939a5 Author: Fernando Giorgetti <fgior...@redhat.com> Date: Tue Apr 9 12:02:57 2019 -0300 DISPATCH-1230 - Fixed issues with system_tests_ssl always allowing TLS1.2 with OpenSSL >= 1.1 --- tests/system_tests_ssl.py | 171 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 138 insertions(+), 33 deletions(-) diff --git a/tests/system_tests_ssl.py b/tests/system_tests_ssl.py index feabfea..b5b2fd0 100644 --- a/tests/system_tests_ssl.py +++ b/tests/system_tests_ssl.py @@ -23,11 +23,13 @@ Provides tests related with allowed TLS protocol version restrictions. import os import ssl import sys +import re from subprocess import Popen, PIPE from qpid_dispatch.management.client import Node from system_test import TestCase, main_module, Qdrouterd, DIR, SkipIfNeeded -from proton import SASL, Url, SSLDomain +from proton import SASL, Url, SSLDomain, SSLUnavailable from proton.utils import BlockingConnection +from distutils.version import StrictVersion import proton import cproton import unittest2 as unittest @@ -39,6 +41,7 @@ class RouterTestSslBase(TestCase): """ # If unable to determine which protocol versions are allowed system wide DISABLE_SSL_TESTING = False + DISABLE_REASON = "Unable to determine MinProtocol" @staticmethod def ssl_file(name): @@ -86,6 +89,7 @@ class RouterTestSslClient(RouterTestSslBase): PORT_TLS1 = 0 PORT_TLS11 = 0 PORT_TLS12 = 0 + PORT_TLS13 = 0 PORT_TLS1_TLS11 = 0 PORT_TLS1_TLS12 = 0 PORT_TLS11_TLS12 = 0 @@ -95,7 +99,26 @@ class RouterTestSslClient(RouterTestSslBase): TIMEOUT = 3 # If using OpenSSL 1.1 or greater, TLSv1.2 is always being allowed - OPENSSL_VER_1_1_GT = ssl.OPENSSL_VERSION_INFO[:2] >= (1, 1) + OPENSSL_OUT_VER = None + try: + OPENSSL_VER_1_1_GT = ssl.OPENSSL_VERSION_INFO[:2] >= (1, 1) + except AttributeError: + OPENSSL_VER_1_1_GT = False + + # If still False, try getting it from "openssl version" (command output) + # The version from ssl.OPENSSL_VERSION_INFO reflects OpenSSL version in which + # Python was compiled with, not the one installed in the system. + if not OPENSSL_VER_1_1_GT: + print("Python libraries SSL Version < 1.1") + try: + p = Popen(['openssl', 'version'], stdout=PIPE, universal_newlines=True) + openssl_out = p.communicate()[0] + m = re.search('[0-9]+\.[0-9]+\.[0-9]+', openssl_out) + OPENSSL_OUT_VER = m.group(0) + OPENSSL_VER_1_1_GT = StrictVersion(OPENSSL_OUT_VER) >= StrictVersion('1.1') + print("OpenSSL Version found = %s" % OPENSSL_OUT_VER) + except: + pass # Following variables define TLS versions allowed by openssl OPENSSL_MIN_VER = 0 @@ -103,20 +126,53 @@ class RouterTestSslClient(RouterTestSslBase): OPENSSL_ALLOW_TLSV1 = True OPENSSL_ALLOW_TLSV1_1 = True OPENSSL_ALLOW_TLSV1_2 = True + OPENSSL_ALLOW_TLSV1_3 = False + + # Test if OpenSSL has TLSv1_3 + OPENSSL_HAS_TLSV1_3 = False + if OPENSSL_VER_1_1_GT: + try: + ssl.TLSVersion.TLSv1_3 + OPENSSL_HAS_TLSV1_3 = True + except: + pass + + # Test if Proton supports TLSv1_3 + try: + dummydomain = SSLDomain(SSLDomain.MODE_CLIENT) + PROTON_HAS_TLSV1_3 = cproton.PN_OK == cproton.pn_ssl_domain_set_protocols(dummydomain._domain, "TLSv1.3") + print("TLSV1_3? Proton has: %s, OpenSSL has: %s" % (PROTON_HAS_TLSV1_3, OPENSSL_HAS_TLSV1_3)) + except SSLUnavailable: + PROTON_HAS_TLSV1_3 = False # When using OpenSSL >= 1.1 and python >= 3.7, we can retrieve OpenSSL min and max protocols if OPENSSL_VER_1_1_GT: if sys.version_info >= (3, 7): - OPENSSL_CTX = ssl.create_default_context() - OPENSSL_MIN_VER = OPENSSL_CTX.minimum_version - OPENSSL_MAX_VER = OPENSSL_CTX.maximum_version if OPENSSL_CTX.maximum_version > 0 else 9999 - OPENSSL_ALLOW_TLSV1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1 <= OPENSSL_MAX_VER - OPENSSL_ALLOW_TLSV1_1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_1 <= OPENSSL_MAX_VER - OPENSSL_ALLOW_TLSV1_2 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_2 <= OPENSSL_MAX_VER + if OPENSSL_HAS_TLSV1_3 and not PROTON_HAS_TLSV1_3: + # If OpenSSL has 1.3 but proton won't let us turn it on and off then + # this test fails because v1.3 runs unexpectedly. + RouterTestSslBase.DISABLE_SSL_TESTING = True + RouterTestSslBase.DISABLE_REASON = "Proton version does not support TLSv1.3 but OpenSSL does" + else: + OPENSSL_CTX = ssl.create_default_context() + OPENSSL_MIN_VER = OPENSSL_CTX.minimum_version + OPENSSL_MAX_VER = OPENSSL_CTX.maximum_version if OPENSSL_CTX.maximum_version > 0 else 9999 + OPENSSL_ALLOW_TLSV1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1 <= OPENSSL_MAX_VER + OPENSSL_ALLOW_TLSV1_1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_1 <= OPENSSL_MAX_VER + OPENSSL_ALLOW_TLSV1_2 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_2 <= OPENSSL_MAX_VER + OPENSSL_ALLOW_TLSV1_3 = OPENSSL_HAS_TLSV1_3 and PROTON_HAS_TLSV1_3 \ + and OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_3 <= OPENSSL_MAX_VER else: # At this point we are not able to precisely determine what are the minimum and maximum # TLS versions allowed in the system, so tests will be disabled RouterTestSslBase.DISABLE_SSL_TESTING = True + RouterTestSslBase.DISABLE_REASON = "OpenSSL >= 1.1 but Python < 3.7 - Unable to determine MinProtocol" + else: + if OPENSSL_HAS_TLSV1_3 and not PROTON_HAS_TLSV1_3: + # If OpenSSL has 1.3 but proton won't let us turn it on and off then + # this test fails because v1.3 runs unexpectedly. + RouterTestSslBase.DISABLE_SSL_TESTING = True + RouterTestSslBase.DISABLE_REASON = "Proton version does not support TLSv1.3 but OpenSSL does" @classmethod def setUpClass(cls): @@ -144,6 +200,7 @@ class RouterTestSslClient(RouterTestSslBase): cls.PORT_TLS1 = cls.tester.get_port() cls.PORT_TLS11 = cls.tester.get_port() cls.PORT_TLS12 = cls.tester.get_port() + cls.PORT_TLS13 = cls.tester.get_port() cls.PORT_TLS1_TLS11 = cls.tester.get_port() cls.PORT_TLS1_TLS12 = cls.tester.get_port() cls.PORT_TLS11_TLS12 = cls.tester.get_port() @@ -272,6 +329,22 @@ class RouterTestSslClient(RouterTestSslBase): 'password': 'server-password'}) ] + if cls.OPENSSL_ALLOW_TLSV1_3: + conf += [ + # TLSv1.3 only + ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.PORT_TLS13, + 'authenticatePeer': 'no', + 'sslProfile': 'ssl-profile-tls13'}), + # SSL Profile for TLSv1.3 + ('sslProfile', {'name': 'ssl-profile-tls13', + 'caCertFile': cls.ssl_file('ca-certificate.pem'), + 'certFile': cls.ssl_file('server-certificate.pem'), + 'privateKeyFile': cls.ssl_file('server-private-key.pem'), + 'protocols': 'TLSv1.3', + 'password': 'server-password'}) + + ] + config = Qdrouterd.Config(conf) cls.routers.append(cls.tester.qdrouterd("A", config, wait=False)) @@ -289,6 +362,10 @@ class RouterTestSslClient(RouterTestSslBase): for proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2']: results.append(self.is_proto_allowed(listener_port, proto)) + if self.OPENSSL_ALLOW_TLSV1_3: + results.append(self.is_proto_allowed(listener_port, 'TLSv1.3')) + else: + results.append(False) return results def is_proto_allowed(self, listener_port, tls_protocol): @@ -314,6 +391,8 @@ class RouterTestSslClient(RouterTestSslBase): return False except proton.ConnectionException: return False + except: + return False # TLS version provided was accepted connection.close() @@ -366,75 +445,85 @@ class RouterTestSslClient(RouterTestSslBase): :param expected_results: :return: """ - (tlsv1, tlsv1_1, tlsv1_2) = expected_results + (tlsv1, tlsv1_1, tlsv1_2, tlsv1_3) = expected_results return [self.OPENSSL_ALLOW_TLSV1 and tlsv1, self.OPENSSL_ALLOW_TLSV1_1 and tlsv1_1, - self.OPENSSL_VER_1_1_GT or (self.OPENSSL_ALLOW_TLSV1_2 and tlsv1_2)] + self.OPENSSL_ALLOW_TLSV1_2 and tlsv1_2, + self.OPENSSL_ALLOW_TLSV1_3 and tlsv1_3] - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls1_only(self): """ Expects TLSv1 only is allowed """ - self.assertEquals(self.get_expected_tls_result([True, False, False]), + self.assertEquals(self.get_expected_tls_result([True, False, False, False]), self.get_allowed_protocols(self.PORT_TLS1)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls11_only(self): """ Expects TLSv1.1 only is allowed """ - self.assertEquals(self.get_expected_tls_result([False, True, False]), + self.assertEquals(self.get_expected_tls_result([False, True, False, False]), self.get_allowed_protocols(self.PORT_TLS11)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls12_only(self): """ Expects TLSv1.2 only is allowed """ - self.assertEquals(self.get_expected_tls_result([False, False, True]), + self.assertEquals(self.get_expected_tls_result([False, False, True, False]), self.get_allowed_protocols(self.PORT_TLS12)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) + def test_tls13_only(self): + """ + Expects TLSv1.3 only is allowed + """ + self.assertEquals(self.get_expected_tls_result([False, False, False, True]), + self.get_allowed_protocols(self.PORT_TLS13)) + + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls1_tls11_only(self): """ Expects TLSv1 and TLSv1.1 only are allowed """ - self.assertEquals(self.get_expected_tls_result([True, True, False]), + self.assertEquals(self.get_expected_tls_result([True, True, False, False]), self.get_allowed_protocols(self.PORT_TLS1_TLS11)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls1_tls12_only(self): """ Expects TLSv1 and TLSv1.2 only are allowed """ - self.assertEquals(self.get_expected_tls_result([True, False, True]), + self.assertEquals(self.get_expected_tls_result([True, False, True, False]), self.get_allowed_protocols(self.PORT_TLS1_TLS12)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls11_tls12_only(self): """ Expects TLSv1.1 and TLSv1.2 only are allowed """ - self.assertEquals(self.get_expected_tls_result([False, True, True]), + self.assertEquals(self.get_expected_tls_result([False, True, True, False]), self.get_allowed_protocols(self.PORT_TLS11_TLS12)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_tls_all(self): """ - Expects all supported versions: TLSv1, TLSv1.1 and TLSv1.2 to be allowed + Expects all supported versions: TLSv1, TLSv1.1, TLSv1.2 and TLSv1.3 to be allowed """ - self.assertEquals(self.get_expected_tls_result([True, True, True]), + self.assertEquals(self.get_expected_tls_result([True, True, True, True]), self.get_allowed_protocols(self.PORT_TLS_ALL)) - @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON) def test_ssl_invalid(self): """ Expects connection is rejected as SSL is no longer supported """ self.assertEqual(False, self.is_proto_allowed(self.PORT_SSL3, 'SSLv3')) - @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(), + "Cyrus library not available. skipping test") def test_ssl_sasl_client_valid(self): """ Attempts to connect a Proton client using a valid SASL authentication info @@ -444,10 +533,12 @@ class RouterTestSslClient(RouterTestSslBase): if not SASL.extended(): self.skipTest("Cyrus library not available. skipping test") - self.assertTrue(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1")) - self.assertTrue(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.2")) + exp_tls_results = self.get_expected_tls_result([True, False, True, False]) + self.assertEqual(exp_tls_results[0], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1")) + self.assertEqual(exp_tls_results[2], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.2")) - @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(), + "Cyrus library not available. skipping test") def test_ssl_sasl_client_invalid(self): """ Attempts to connect a Proton client using a valid SASL authentication info @@ -457,7 +548,8 @@ class RouterTestSslClient(RouterTestSslBase): if not SASL.extended(): self.skipTest("Cyrus library not available. skipping test") - self.assertFalse(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.1")) + exp_tls_results = self.get_expected_tls_result([True, False, True, False]) + self.assertEqual(exp_tls_results[1], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.1")) class RouterTestSslInterRouter(RouterTestSslBase): @@ -657,7 +749,8 @@ class RouterTestSslInterRouter(RouterTestSslBase): node.close() return router_nodes - @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test") + @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(), + "Cyrus library not available. skipping test") def test_connected_tls_sasl_routers(self): """ Validates if all expected routers are connected in the network @@ -670,7 +763,19 @@ class RouterTestSslInterRouter(RouterTestSslBase): for node in router_nodes: self.assertTrue(node in self.connected_tls_sasl_routers, "%s should not be connected" % node) - self.assertEqual(len(router_nodes), 4) + + # Router A and B are always expected (no tls version restriction) + expected_nodes = len(self.connected_tls_sasl_routers) + + # Router C only if TLSv1.2 is allowed + if not RouterTestSslClient.OPENSSL_ALLOW_TLSV1_2: + expected_nodes -= 1 + + # Router D only if TLSv1.1 is allowed + if not RouterTestSslClient.OPENSSL_ALLOW_TLSV1_1: + expected_nodes -= 1 + + self.assertEqual(len(router_nodes), expected_nodes) if __name__ == '__main__': --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org