X509 uses ASN1 GENERALIZEDTIME or UTCTIME to store certificate
validity. pyOpenSSL 0.7 and above allow us to retrieve both
“notBefore” and “notAfter” as strings. Parsing them turned
out to be a challenge since they can be in a variety of formats
(YYYYMMDDhhmmssZ, YYYYMMDDhhmmss+hhmm or YYYMMDDhhmmss-hhmm).

This will be used to verify the validity of cluster certificates
in LUVerifyCluster.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 Makefile.am                   |    1 +
 lib/utils.py                  |   65 ++++++++++++++++++++++++++++++++++++++++
 test/data/cert1.pem           |   14 +++++++++
 test/ganeti.utils_unittest.py |   66 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 146 insertions(+), 0 deletions(-)
 create mode 100644 test/data/cert1.pem

diff --git a/Makefile.am b/Makefile.am
index 6262581..a1e9431 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -312,6 +312,7 @@ TEST_FILES = \
        test/data/bdev-8.3-both.txt \
        test/data/bdev-disk.txt \
        test/data/bdev-net.txt \
+       test/data/cert1.pem \
        test/data/proc_drbd8.txt \
        test/data/proc_drbd80-emptyline.txt \
        test/data/proc_drbd83.txt
diff --git a/lib/utils.py b/lib/utils.py
index ec08b1e..6bc9efd 100644
--- a/lib/utils.py
+++ b/lib/utils.py
@@ -43,6 +43,8 @@ import resource
 import logging
 import logging.handlers
 import signal
+import datetime
+import calendar
 
 from cStringIO import StringIO
 
@@ -1949,6 +1951,69 @@ def TailFile(fname, lines=20):
   return rows[-lines:]
 
 
+def _ParseAsn1Generalizedtime(value):
+  """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+
+  @type value: string
+  @param value: ASN1 GENERALIZEDTIME timestamp
+
+  """
+  m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+  if m:
+    # We have an offset
+    asn1time = m.group(1)
+    hours = int(m.group(2))
+    minutes = int(m.group(3))
+    utcoffset = (60 * hours) + minutes
+  else:
+    if not value.endswith("Z"):
+      raise ValueError("Missing timezone")
+    asn1time = value[:-1]
+    utcoffset = 0
+
+  parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+
+  tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+
+  return calendar.timegm(tt.utctimetuple())
+
+
+def GetX509CertValidity(cert):
+  """Returns the validity period of the certificate.
+
+  @type cert: OpenSSL.crypto.X509
+  @param cert: X509 certificate object
+
+  """
+  # The get_notBefore and get_notAfter functions are only supported in
+  # pyOpenSSL 0.7 and above.
+  try:
+    get_notbefore_fn = cert.get_notBefore
+  except AttributeError:
+    not_before = None
+  else:
+    not_before_asn1 = get_notbefore_fn()
+
+    if not_before_asn1 is None:
+      not_before = None
+    else:
+      not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+
+  try:
+    get_notafter_fn = cert.get_notAfter
+  except AttributeError:
+    not_after = None
+  else:
+    not_after_asn1 = get_notafter_fn()
+
+    if not_after_asn1 is None:
+      not_after = None
+    else:
+      not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+
+  return (not_before, not_after)
+
+
 def SafeEncode(text):
   """Return a 'safe' version of a source string.
 
diff --git a/test/data/cert1.pem b/test/data/cert1.pem
new file mode 100644
index 0000000..3c6b59c
--- /dev/null
+++ b/test/data/cert1.pem
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICKzCCAdWgAwIBAgIJALdZsXwXOtW7MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjIzMTAxMjQ3WhcNMTAwMzAyMTAxMjQ3WjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALIL
+AmF7Hay9WuhREpRqG2KPCFNbjVGeZ6cS/1FImhHCw40JWDElQJp4lprIly7mkp+7
+seIEa7/kf0y9iy0o7s0CAwEAAaOBpzCBpDAdBgNVHQ4EFgQUBKWDVk2Hp9jW+hiD
+wuuecaBB0W0wdQYDVR0jBG4wbIAUBKWDVk2Hp9jW+hiDwuuecaBB0W2hSaRHMEUx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGSCCQC3WbF8FzrVuzAMBgNVHRMEBTADAQH/MA0G
+CSqGSIb3DQEBBQUAA0EAg7hwCEhY2+MmQYXqe8szmgkXe73qv+i2XyZGytUcdaB/
+sd2ydbMLIZlWHD5Zb6xBVDVJpLttduW0cK9daFvElQ==
+-----END CERTIFICATE-----
diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py
index 4d19f85..1776173 100755
--- a/test/ganeti.utils_unittest.py
+++ b/test/ganeti.utils_unittest.py
@@ -35,6 +35,9 @@ import shutil
 import re
 import select
 import string
+import OpenSSL
+import warnings
+import distutils.version
 
 import ganeti
 import testutils
@@ -1260,5 +1263,68 @@ class TestUnescapeAndSplit(unittest.TestCase):
       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
 
 
+class TestParseAsn1Generalizedtime(unittest.TestCase):
+  def test(self):
+    # UTC
+    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
+                     1266860512)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
+                     (2**31) - 1)
+
+    # With offset
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
+                     1266860512)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
+                     1266931012)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
+                     1266931088)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
+                     1266931295)
+    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
+                     3600)
+
+    # Leap seconds are not supported by datetime.datetime
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                      "19841231235960+0000")
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                      "19920630235960+0000")
+
+    # Errors
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                      "20100222174152")
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                      "Mon Feb 22 17:47:02 UTC 2010")
+    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                      "2010-02-22 17:42:02")
+
+
+class TestGetX509CertValidity(testutils.GanetiTestCase):
+  def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
+
+    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
+
+    # Test whether we have pyOpenSSL 0.7 or above
+    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
+
+    if not self.pyopenssl0_7:
+      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
+                    " function correctly")
+
+  def _LoadCert(self, name):
+    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           self._ReadTestData(name))
+
+  def test(self):
+    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
+    if self.pyopenssl0_7:
+      self.assertEqual(validity, (1266919967, 1267524767))
+    else:
+      self.assertEqual(validity, (None, None))
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()
-- 
1.6.6

Reply via email to