This tests the general workflow for OTP including most possible
token combinations. This includes 5872 tests. Further optimization
is possible to reduce the number of duplicate tests run.

Things not yet tested:
* ipa-kdb
* ipa-otpd
* otptoken-sync
* RADIUS proxy
* token self-management
* type specific attributes
From 0ae7f469d5b9496cf9a63cc7f4b4b099d35dfab2 Mon Sep 17 00:00:00 2001
From: Nathaniel McCallum <npmccal...@redhat.com>
Date: Thu, 20 Nov 2014 11:02:00 -0500
Subject: [PATCH] Add initial tests for OTP

This tests the general workflow for OTP including most possible
token combinations. This includes 5872 tests. Further optimization
is possible to reduce the number of duplicate tests run.

Things not yet tested:
* ipa-kdb
* ipa-otpd
* otptoken-sync
* RADIUS proxy
* token self-management
* type specific attributes
---
 ipatests/test_xmlrpc/test_otptoken_plugin.py | 373 +++++++++++++++++++++++++++
 1 file changed, 373 insertions(+)
 create mode 100644 ipatests/test_xmlrpc/test_otptoken_plugin.py

diff --git a/ipatests/test_xmlrpc/test_otptoken_plugin.py b/ipatests/test_xmlrpc/test_otptoken_plugin.py
new file mode 100644
index 0000000000000000000000000000000000000000..ca5cc5fb65ad4a869dbc0428de1eb7652e0aeea5
--- /dev/null
+++ b/ipatests/test_xmlrpc/test_otptoken_plugin.py
@@ -0,0 +1,373 @@
+# Authors:
+#   Nathaniel McCallum <npmccal...@redhat.com>
+#
+# Copyright (C) 2014  Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib/plugins/otptoken.py` module.
+"""
+
+import base64
+import datetime
+import hashlib
+import os
+import urlparse
+import UserDict
+import uuid
+
+import ldap
+import pyotp
+
+from ipalib import api
+from xmlrpc_test import XMLRPC_test
+from ipatests.util import assert_deepequal
+from ipapython.dn import DN
+from ipapython.version import API_VERSION
+
+class Token(UserDict.DictMixin):
+    _TYPED = (
+        (u'TOTP', u'ipatokentotpclockoffset'),
+        (u'TOTP', u'ipatokentotptimestep'),
+        (u'HOTP', u'ipatokenhotpcounter'),
+    )
+
+    def __getitem__(self, key):
+        if key in self.__data:
+            return self.__data[key]
+
+        for t, a in self._TYPED:
+            if key == a and self.get(u'type', None) != t:
+                raise KeyError(repr(key))
+
+        if key in self.__defaults:
+            return self.__defaults[key]
+
+        raise KeyError(repr(key))
+
+    def __setitem__(self, key, value):
+        if key == u'type':
+            value = value.upper()
+
+            for t, a in self._TYPED:
+                if key != t:
+                    self.__data.pop(a, None)            
+        else:
+            for t, a in self._TYPED:
+                if key == a and self.__data.get(u'type', t) != t:
+                    raise KeyError(repr(key) + str(self.__data.get(u'type', t)))
+
+        self.__data[key] = value
+
+    def __delitem__(self, key):
+        del self.__data[key]
+
+    def keys(self):
+        type = self.get(u'type', None)
+
+        exclude = []
+        for t, a in self._TYPED:
+            if type != t:
+                exclude.append(a) 
+
+        keys = set(self.__data.keys() + self.__defaults.keys())
+        return keys.difference(exclude)
+        
+    def __init__(self, obj=None, **kwargs):
+        self.__defaults = {}
+        for i in range(len(api.Object['otptoken'].params)):
+            param = api.Object['otptoken'].params[i]
+            if param.default is not None:
+                self.__defaults[param.name] = param.default
+
+        self.__defaults.update({
+            u'ipatokenuniqueid': unicode(uuid.uuid4()),
+            u'ipatokenotpkey': os.urandom(20)
+        })
+
+        self.__data = {}
+        if obj is not None:
+            self.update(obj)
+        if len(kwargs):
+            self.update(kwargs)
+
+    def __repr__(self):
+        prefixes = ['ipatoken' + x for x in ('totp', 'hotp', 'otp', '')]
+        names = {
+            'algorithm': 'algo',
+            'timestep': 'ts',
+            'clockoffset': 'offset',
+        }
+
+        args = {}
+        for k, v in self.items():
+            for prefix in prefixes:
+                if k.startswith(prefix):
+                    k = k[len(prefix):]
+                    break
+
+            if k in ('key', 'uniqueid', 'owner'):
+                continue
+
+            args[names.get(k, k)] = unicode(v)
+
+        return u', '.join([k + '=' + v for k, v in sorted(args.items())])
+
+    @property
+    def id(self):
+        return self[u'ipatokenuniqueid']
+
+    def nice(self, nice):
+        return self.id + u": " + nice
+
+    def otp(self, at=0):
+        kwargs = {}
+        if self[u'type'] == u"TOTP":
+            kwargs['interval'] = self[u'ipatokentotptimestep']
+
+            td = datetime.timedelta(0, kwargs['interval']) * at
+            at = datetime.datetime.now() + td
+        elif at < 0:
+            return None
+
+        otp = getattr(pyotp, self[u'type'])(
+            base64.b32encode(self[u'ipatokenotpkey']),
+            self[u'ipatokenotpdigits'],
+            getattr(hashlib, self[u'ipatokenotpalgorithm']),
+            **kwargs
+        )
+
+        code = str(otp.at(at))
+        while len(code) < otp.digits:
+            code = "0" + code
+
+        return code
+
+    def expected(self):
+        result = {}
+        for k, v in self.items():
+            if isinstance(v, bool):
+                result[k] = (unicode(v).upper(),)
+            elif isinstance(v, int):
+                result[k] = (unicode(v),)
+            else:
+                result[k] = (v,)
+
+        result[u'type'] = result[u'type'][0]
+        result[u'dn'] = u'ipatokenuniqueid=%s,cn=otp,dc=example,dc=com' % self.id
+
+        return {
+            u'summary': u'Added OTP token "%s"' % self.id,
+            u'result': result,
+            u'value': self.id,
+        }
+
+    def path(self, user):
+        return '/%s@%s:%s' % (user, api.env.realm, self.id)
+
+    def query(self, user):
+        types = {
+            "HOTP": {u'counter': self.get(u'ipatokenhotpcounter')},
+            "TOTP": {u'period': self.get(u'ipatokentotptimestep')},
+        }
+
+        query = {
+            u'algorithm': self.get(u'ipatokenotpalgorithm'),
+            u'issuer': u'%s@%s' % (user, api.env.realm),
+            u'digits': self.get(u'ipatokenotpdigits'),
+            u'secret': base64.b32encode(self[u'ipatokenotpkey']),
+        }
+
+        query.update(types.get(self[u'type'], {}))
+        query = {k: (unicode(v),) for k, v in query.items()}
+        return query
+        
+
+class test_otp(XMLRPC_test):
+    TODAY = datetime.datetime.now().replace(microsecond=0)
+    YESTERDAY = TODAY - datetime.timedelta(1)
+    TOMORROW = TODAY + datetime.timedelta(1)
+
+    user = u'tuser1'
+    password = u'random'
+
+    digests = (u'sha1', u'sha256', u'sha384', u'sha512')
+    intervals = (15, 30, 60)
+    digits = (6, 8)
+
+    keywords = (
+        ({}, True),
+        ({u'ipatokendisabled': False}, True),
+        ({u'ipatokendisabled': True}, False),
+        ({u'ipatokennotbefore': YESTERDAY}, True),
+        ({u'ipatokennotbefore': TOMORROW}, False),
+        ({u'ipatokennotafter': YESTERDAY}, False),
+        ({u'ipatokennotafter': TOMORROW}, True),
+    )
+
+    keywords_hotp = (
+        ({}, True),
+    )
+
+    keywords_totp = (
+        ({}, True),
+    )
+
+    @classmethod
+    def setUpClass(cls):
+        super(test_otp, cls).setUpClass()
+        cls.connection = ldap.initialize('ldap://{host}'
+                                         .format(host=api.env.host))
+
+    def make(self, nice, func, *args, **kwargs):
+        test = lambda: func(*args, **kwargs)
+        test.description = nice
+        return (test,)
+
+    def command(self, nice, cmd, *args, **kwargs):
+        if not 'version' in kwargs:
+            kwargs['version'] = API_VERSION
+
+        return self.make(nice, api.Command[cmd], *args, **kwargs)
+
+    def check_login(self, password, success=True):
+        message = "Authentication unexpectedly %s (password: %s)" % (
+            "failed" if success else "succeeded",
+            password
+        )
+
+        dn = DN(('uid', self.user), api.env.container_user, api.env.basedn)
+        try:
+            self.connection.simple_bind_s(str(dn), password)
+        except ldap.INVALID_CREDENTIALS as e:
+            if success:
+                raise AssertionError(message)
+            api.Command['user_unlock'](self.user)
+        else:
+            if not success:
+                raise AssertionError(message)
+
+    def check_otp_add(self, token):
+        # Add the token
+        result = api.Command['otptoken_add'](version=API_VERSION, **token)
+
+        # Remove the URI and validate the rest of the return value.
+        uri = result.get('result', {}).pop('uri', None)
+        assert_deepequal(token.expected(), result)
+
+        # Validate the URI.
+        split = urlparse.urlsplit(uri)
+        assert split.scheme == u'otpauth'
+        assert split.netloc.upper() == token[u'type']
+        assert split.path == token.path(self.user)
+        assert_deepequal(token.query(self.user),
+                         urlparse.parse_qs(split.query))
+
+    def do_token(self, enabled, token):
+        def login(nice, at, enabled=True, password=self.password):
+            if isinstance(at, basestring):
+                code = at
+            else:
+                code = token.otp(at)
+                if code is None: # Skip invalid test offsets.
+                    return self.make(token.nice(nice), lambda: enabled)
+
+            return self.make(token.nice(nice), self.check_login,
+                             password + code, enabled)
+
+        yield self.make(token.nice("Add token (%s)" % repr(token)),
+                        self.check_otp_add, token)
+        
+        yield login("Check invalid 1FA", "", not enabled)
+        yield login("Check fake OTP", "123456", False)
+        yield login("Check distant past OTP", -1000, False)
+        yield login("Check past OTP", -2, enabled)
+        yield login("Check current OTP", 0, enabled)
+        yield login("Check duplicate OTP", 0, False)
+        yield login("Check next OTP", 1, enabled)
+        yield login("Check future OTP", 3, enabled)
+        yield login("Check bad password", 4, False, "badpassword")
+        yield login("Check no password", 5, False, "")
+        yield login("Check distant future OTP", 1000, False)
+
+        yield self.command(token.nice('Delete token'), 'otptoken_del', token.id)
+
+    def do_all(self):
+        # Check authentication with OTP enabled, but no tokens created.
+        yield self.make("Check valid 1FA", self.check_login,
+                        self.password, True)
+        yield self.make("Check invalid 2FA", self.check_login,
+                        self.password + "123456", False)
+
+        for digits in self.digits:
+            for digest in self.digests:
+                for kw, e in self.keywords:
+                    for hkw, he in self.keywords_hotp:
+                        token = Token(
+                            ipatokenotpalgorithm=digest,
+                            ipatokenotpdigits=digits,
+                            ipatokenowner=self.user,
+                            type=u'HOTP',
+                            **dict(kw.items() + hkw.items())
+                        )
+
+                        for test in self.do_token(e and he, token):
+                            yield test
+
+                    for interval in self.intervals:
+                        for tkw, te in self.keywords_totp:
+                            token = Token(
+                                ipatokentotptimestep=interval,
+                                ipatokenotpalgorithm=digest,
+                                ipatokenotpdigits=digits,
+                                ipatokenowner=self.user,
+                                type=u'TOTP',
+                                **dict(kw.items() + tkw.items())
+                            )
+    
+                            for test in self.do_token(e and te, token):
+                                yield test
+
+    def test_generate(self):
+        # Create the user.
+        yield self.command("Add user", 'user_add', self.user,
+                           givenname=u'Test', sn=u'User1',
+                           userpassword=self.password)
+
+        # Check non-OTP authentication.
+        yield self.make('Check normal authentication', self.check_login,
+                        self.password, True)
+        yield self.make('Check invalid authentication', self.check_login,
+                        self.password + "123456", False)
+
+        # Perform tests with global enablement.
+        yield self.command('Enable OTP globally', 'config_mod',
+                           ipauserauthtype=(u'otp'))
+        for test in self.do_all():
+            yield test
+        yield self.command('Disable OTP globally', 'config_mod',
+                           ipauserauthtype=())
+
+        # Perform tests with per-user enablement.
+        yield self.command('Enable OTP per-user', 'user_mod',
+                           self.user, ipauserauthtype=(u'otp'))
+        for test in self.do_all():
+            yield test
+        yield self.command('Disable OTP per-user', 'user_mod',
+                           self.user, ipauserauthtype=())
+
+        # Remove the user.
+        yield self.command('Cleanup user', 'user_del', self.user)
-- 
2.1.0

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to