X509 uses ASN1 GENERALIZEDTIME or UTCTIME to store certificate validity. pyOpenSSL 0.7 and above allow us to retrieve both “notBefore” and “notAfter” as strings. Parsing them turned out to be a challenge since they can be in a variety of formats (YYYYMMDDhhmmssZ, YYYYMMDDhhmmss+hhmm or YYYMMDDhhmmss-hhmm).
This will be used to verify the validity of cluster certificates in LUVerifyCluster. Signed-off-by: Michael Hanselmann <[email protected]> --- Makefile.am | 1 + lib/utils.py | 65 ++++++++++++++++++++++++++++++++++++++++ test/data/cert1.pem | 14 +++++++++ test/ganeti.utils_unittest.py | 66 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 146 insertions(+), 0 deletions(-) create mode 100644 test/data/cert1.pem diff --git a/Makefile.am b/Makefile.am index 6262581..a1e9431 100644 --- a/Makefile.am +++ b/Makefile.am @@ -312,6 +312,7 @@ TEST_FILES = \ test/data/bdev-8.3-both.txt \ test/data/bdev-disk.txt \ test/data/bdev-net.txt \ + test/data/cert1.pem \ test/data/proc_drbd8.txt \ test/data/proc_drbd80-emptyline.txt \ test/data/proc_drbd83.txt diff --git a/lib/utils.py b/lib/utils.py index ec08b1e..6bc9efd 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -43,6 +43,8 @@ import resource import logging import logging.handlers import signal +import datetime +import calendar from cStringIO import StringIO @@ -1949,6 +1951,69 @@ def TailFile(fname, lines=20): return rows[-lines:] +def _ParseAsn1Generalizedtime(value): + """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL. + + @type value: string + @param value: ASN1 GENERALIZEDTIME timestamp + + """ + m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value) + if m: + # We have an offset + asn1time = m.group(1) + hours = int(m.group(2)) + minutes = int(m.group(3)) + utcoffset = (60 * hours) + minutes + else: + if not value.endswith("Z"): + raise ValueError("Missing timezone") + asn1time = value[:-1] + utcoffset = 0 + + parsed = time.strptime(asn1time, "%Y%m%d%H%M%S") + + tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset) + + return calendar.timegm(tt.utctimetuple()) + + +def GetX509CertValidity(cert): + """Returns the validity period of the certificate. + + @type cert: OpenSSL.crypto.X509 + @param cert: X509 certificate object + + """ + # The get_notBefore and get_notAfter functions are only supported in + # pyOpenSSL 0.7 and above. + try: + get_notbefore_fn = cert.get_notBefore + except AttributeError: + not_before = None + else: + not_before_asn1 = get_notbefore_fn() + + if not_before_asn1 is None: + not_before = None + else: + not_before = _ParseAsn1Generalizedtime(not_before_asn1) + + try: + get_notafter_fn = cert.get_notAfter + except AttributeError: + not_after = None + else: + not_after_asn1 = get_notafter_fn() + + if not_after_asn1 is None: + not_after = None + else: + not_after = _ParseAsn1Generalizedtime(not_after_asn1) + + return (not_before, not_after) + + def SafeEncode(text): """Return a 'safe' version of a source string. diff --git a/test/data/cert1.pem b/test/data/cert1.pem new file mode 100644 index 0000000..3c6b59c --- /dev/null +++ b/test/data/cert1.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICKzCCAdWgAwIBAgIJALdZsXwXOtW7MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjIzMTAxMjQ3WhcNMTAwMzAyMTAxMjQ3WjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALIL +AmF7Hay9WuhREpRqG2KPCFNbjVGeZ6cS/1FImhHCw40JWDElQJp4lprIly7mkp+7 +seIEa7/kf0y9iy0o7s0CAwEAAaOBpzCBpDAdBgNVHQ4EFgQUBKWDVk2Hp9jW+hiD +wuuecaBB0W0wdQYDVR0jBG4wbIAUBKWDVk2Hp9jW+hiDwuuecaBB0W2hSaRHMEUx +CzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRl +cm5ldCBXaWRnaXRzIFB0eSBMdGSCCQC3WbF8FzrVuzAMBgNVHRMEBTADAQH/MA0G +CSqGSIb3DQEBBQUAA0EAg7hwCEhY2+MmQYXqe8szmgkXe73qv+i2XyZGytUcdaB/ +sd2ydbMLIZlWHD5Zb6xBVDVJpLttduW0cK9daFvElQ== +-----END CERTIFICATE----- diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 4d19f85..1776173 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -35,6 +35,9 @@ import shutil import re import select import string +import OpenSSL +import warnings +import distutils.version import ganeti import testutils @@ -1260,5 +1263,68 @@ class TestUnescapeAndSplit(unittest.TestCase): self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b) +class TestParseAsn1Generalizedtime(unittest.TestCase): + def test(self): + # UTC + self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"), + 1266860512) + self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"), + (2**31) - 1) + + # With offset + self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"), + 1266860512) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"), + 1266931012) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"), + 1266931088) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"), + 1266931295) + self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"), + 3600) + + # Leap seconds are not supported by datetime.datetime + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "19841231235960+0000") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "19920630235960+0000") + + # Errors + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "20100222174152") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "Mon Feb 22 17:47:02 UTC 2010") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "2010-02-22 17:42:02") + + +class TestGetX509CertValidity(testutils.GanetiTestCase): + def setUp(self): + testutils.GanetiTestCase.setUp(self) + + pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__) + + # Test whether we have pyOpenSSL 0.7 or above + self.pyopenssl0_7 = (pyopenssl_version >= "0.7") + + if not self.pyopenssl0_7: + warnings.warn("This test requires pyOpenSSL 0.7 or above to" + " function correctly") + + def _LoadCert(self, name): + return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + self._ReadTestData(name)) + + def test(self): + validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem")) + if self.pyopenssl0_7: + self.assertEqual(validity, (1266919967, 1267524767)) + else: + self.assertEqual(validity, (None, None)) + + if __name__ == '__main__': testutils.GanetiTestProgram() -- 1.6.6
