Author: jhb
Date: Wed Apr 24 00:23:06 2019
New Revision: 346617
URL: https://svnweb.freebsd.org/changeset/base/346617

Log:
  Test the AES-CCM test vectors from the NIST Known Answer Tests.
  
  The CCM test vectors use a slightly different file format in that
  there are global key-value pairs as well as section key-value pairs
  that need to be used in each test.  In addition, the sections can set
  multiple key-value pairs in the section name.  The CCM KAT parser
  class is an iterator that returns a dictionary once per test where the
  dictionary contains all of the relevant key-value pairs for a given
  test (global, section name, section, test-specific).
  
  Note that all of the CCM decrypt tests use nonce and tag lengths that
  are not supported by OCF (OCF only supports a 12 byte nonce and 16
  byte tag), so none of the decryption vectors are actually tested.
  
  Reviewed by:  ngie
  MFC after:    1 month
  Sponsored by: Chelsio Communications
  Differential Revision:        https://reviews.freebsd.org/D19978

Modified:
  head/tests/sys/opencrypto/cryptodev.py
  head/tests/sys/opencrypto/cryptotest.py

Modified: head/tests/sys/opencrypto/cryptodev.py
==============================================================================
--- head/tests/sys/opencrypto/cryptodev.py      Wed Apr 24 00:16:39 2019        
(r346616)
+++ head/tests/sys/opencrypto/cryptodev.py      Wed Apr 24 00:23:06 2019        
(r346617)
@@ -381,6 +381,112 @@ class KATParser:
 
                        yield values
 
+# The CCM files use a bit of a different syntax that doesn't quite fit
+# the generic KATParser.  In particular, some keys are set globally at
+# the start of the file, and some are set globally at the start of a
+# section.
+class KATCCMParser:
+       def __init__(self, fname):
+               self.fp = open(fname)
+               self._pending = None
+               self.read_globals()
+
+       def read_globals(self):
+               self.global_values = {}
+               while True:
+                       line = self.fp.readline()
+                       if not line:
+                               return
+                       if line[0] == '#' or not line.strip():
+                               continue
+                       if line[0] == '[':
+                               self._pending = line
+                               return
+
+                       try:
+                               f, v = line.split(' =')
+                       except:
+                               print('line:', repr(line))
+                               raise
+
+                       v = v.strip()
+
+                       if f in self.global_values:
+                               raise ValueError('already present: %r' % 
repr(f))
+                       self.global_values[f] = v
+
+       def read_section_values(self, kwpairs):
+               self.section_values = self.global_values.copy()
+               for pair in kwpairs.split(', '):
+                       f, v = pair.split(' = ')
+                       if f in self.section_values:
+                               raise ValueError('already present: %r' % 
repr(f))
+                       self.section_values[f] = v
+
+               while True:
+                       line = self.fp.readline()
+                       if not line:
+                               return
+                       if line[0] == '#' or not line.strip():
+                               continue
+                       if line[0] == '[':
+                               self._pending = line
+                               return
+
+                       try:
+                               f, v = line.split(' =')
+                       except:
+                               print('line:', repr(line))
+                               raise
+
+                       if f == 'Count':
+                               self._pending = line
+                               return
+
+                       v = v.strip()
+
+                       if f in self.section_values:
+                               raise ValueError('already present: %r' % 
repr(f))
+                       self.section_values[f] = v
+
+       def __iter__(self):
+               while True:
+                       if self._pending:
+                               line = self._pending
+                               self._pending = None
+                       else:
+                               line = self.fp.readline()
+                               if not line:
+                                       return
+
+                       if (line and line[0] == '#') or not line.strip():
+                               continue
+
+                       if line[0] == '[':
+                               section = line[1:].split(']', 1)[0]
+                               self.read_section_values(section)
+                               continue
+
+                       values = self.section_values.copy()
+
+                       while True:
+                               try:
+                                       f, v = line.split(' =')
+                               except:
+                                       print('line:', repr(line))
+                                       raise
+                               v = v.strip()
+
+                               if f in values:
+                                       raise ValueError('already present: %r' 
% repr(f))
+                               values[f] = v
+                               line = self.fp.readline().strip()
+                               if not line:
+                                       break
+
+                       yield values
+
+
 def _spdechex(s):
        return ''.join(s.split()).decode('hex')
 

Modified: head/tests/sys/opencrypto/cryptotest.py
==============================================================================
--- head/tests/sys/opencrypto/cryptotest.py     Wed Apr 24 00:16:39 2019        
(r346616)
+++ head/tests/sys/opencrypto/cryptotest.py     Wed Apr 24 00:23:06 2019        
(r346617)
@@ -71,6 +71,14 @@ def GenTestCase(cname):
                        for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
                                self.runCBC(i)
 
+               @unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on 
%s' % (cname))
+               def test_ccm(self):
+                       for i in katg('ccmtestvectors', 'V*.rsp'):
+                               self.runCCMEncrypt(i)
+
+                       for i in katg('ccmtestvectors', 'D*.rsp'):
+                               self.runCCMDecrypt(i)
+
                @unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on 
%s' % (cname))
                def test_gcm(self):
                        for i in katg('gcmtestvectors', 'gcmEncrypt*'):
@@ -219,6 +227,93 @@ def GenTestCase(cname):
                                                        raise
                                                continue
                                        self.assertEqual(r, ct)
+
+               def runCCMEncrypt(self, fname):
+                       for data in cryptodev.KATCCMParser(fname):
+                               Nlen = int(data['Nlen'])
+                               if Nlen != 12:
+                                       # OCF only supports 12 byte IVs
+                                       continue
+                               key = data['Key'].decode('hex')
+                               nonce = data['Nonce'].decode('hex')
+                               Alen = int(data['Alen'])
+                               if Alen != 0:
+                                       aad = data['Adata'].decode('hex')
+                               else:
+                                       aad = None
+                               payload = data['Payload'].decode('hex')
+                               ct = data['CT'].decode('hex')
+
+                               try:
+                                       c = Crypto(crid=crid,
+                                           cipher=cryptodev.CRYPTO_AES_CCM_16,
+                                           key=key,
+                                           
mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
+                                           mackey=key, maclen=16)
+                                       r, tag = Crypto.encrypt(c, payload,
+                                           nonce, aad)
+                               except EnvironmentError, e:
+                                       if e.errno != errno.EOPNOTSUPP:
+                                               raise
+                                       continue
+
+                               out = r + tag
+                               self.assertEqual(out, ct,
+                                   "Count " + data['Count'] + " Actual: " + \
+                                   repr(out.encode("hex")) + " Expected: " + \
+                                   repr(data) + " on " + cname)
+
+               def runCCMDecrypt(self, fname):
+                       # XXX: Note that all of the current CCM
+                       # decryption test vectors use IV and tag sizes
+                       # that aren't supported by OCF none of the
+                       # tests are actually ran.
+                       for data in cryptodev.KATCCMParser(fname):
+                               Nlen = int(data['Nlen'])
+                               if Nlen != 12:
+                                       # OCF only supports 12 byte IVs
+                                       continue
+                               Tlen = int(data['Tlen'])
+                               if Tlen != 16:
+                                       # OCF only supports 16 byte tags
+                                       continue
+                               key = data['Key'].decode('hex')
+                               nonce = data['Nonce'].decode('hex')
+                               Alen = int(data['Alen'])
+                               if Alen != 0:
+                                       aad = data['Adata'].decode('hex')
+                               else:
+                                       aad = None
+                               ct = data['CT'].decode('hex')
+                               tag = ct[-16:]
+                               ct = ct[:-16]
+
+                               try:
+                                       c = Crypto(crid=crid,
+                                           cipher=cryptodev.CRYPTO_AES_CCM_16,
+                                           key=key,
+                                           
mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
+                                           mackey=key, maclen=16)
+                               except EnvironmentError, e:
+                                       if e.errno != errno.EOPNOTSUPP:
+                                               raise
+                                       continue
+
+                               if data['Result'] == 'Fail':
+                                       self.assertRaises(IOError,
+                                           c.decrypt, payload, nonce, aad, tag)
+                               else:
+                                       r = Crypto.decrypt(c, payload, nonce,
+                                           aad, tag)
+
+                                       payload = data['Payload'].decode('hex')
+                                       Plen = int(data('Plen'))
+                                       payload = payload[:plen]
+                                       self.assertEqual(r, payload,
+                                           "Count " + data['Count'] + \
+                                           " Actual: " + repr(r.encode("hex")) 
+ \
+                                           " Expected: " + repr(data) + \
+                                           " on " + cname)
 
                ###############
                ##### DES #####
_______________________________________________
svn-src-all@freebsd.org mailing list
https://lists.freebsd.org/mailman/listinfo/svn-src-all
To unsubscribe, send any mail to "svn-src-all-unsubscr...@freebsd.org"

Reply via email to