On Thu, 2014-11-20 at 11:13 -0500, Nathaniel McCallum wrote:
> This tests the general workflow for OTP including most possible
> token combinations. This includes 5872 tests. Further optimization
> is possible to reduce the number of duplicate tests run.
> 
> Things not yet tested:
> * ipa-kdb
> * ipa-otpd
> * otptoken-sync
> * RADIUS proxy
> * token self-management
> * type specific attributes

Attached is the latest iteration of the OTP test work. This now includes
all major cases except token self-management and RADIUS proxy (which
will come in its own patch). Token self-management is held up by the
fact that I can't get alternate ccaches to work with the API. I have
tried kinit-ing to an independent ccache and exporting KRB5CCNAME, but
this doesn't work for some reason I can't figure out.

I ended up creating my own fixture mechanism. I'm not in love with it,
but it is simple and at least gets the scoping correct. It also
generates individual tests for each parameterized state, so the output
is both correct and obvious.

I also implemented OTP myself. This isn't much code, but pyotp has a
major bug and is dead upstream. I'd like to migrate to
python-cryptography when it lands as a dependency of FreeIPA. But due to
timing issues, we can't land it now. This will be a small patch in the
future.

Even with the caveats above, I feel like the test coverage provided by
this test is worth review/merge. As a rough estimate, I think this is
about 70% code coverage. Of the remaining coverage, I see:
* RADIUS proxy - 10%
* token self-management - 10%
* misc testable - 5%
* misc untestable - 5%

All tests in this patch succeed on 4.1.2.

Nathaniel
From c7b01ea4415db3847110ffe51a9bb5193072d1a8 Mon Sep 17 00:00:00 2001
From: Nathaniel McCallum <npmccal...@redhat.com>
Date: Thu, 20 Nov 2014 11:02:00 -0500
Subject: [PATCH] Add initial tests for OTP

This tests the general workflow for OTP including most possible
token combinations. It includes tests for all token options,
enablement scenarios and token synchronization.
---
 ipatests/test_xmlrpc/test_otptoken_plugin.py | 536 +++++++++++++++++++++++++++
 1 file changed, 536 insertions(+)
 create mode 100644 ipatests/test_xmlrpc/test_otptoken_plugin.py

diff --git a/ipatests/test_xmlrpc/test_otptoken_plugin.py b/ipatests/test_xmlrpc/test_otptoken_plugin.py
new file mode 100644
index 0000000000000000000000000000000000000000..7a27d720cb9789c648e45aece1ebd0b6b751e3d8
--- /dev/null
+++ b/ipatests/test_xmlrpc/test_otptoken_plugin.py
@@ -0,0 +1,536 @@
+# Authors:
+#   Nathaniel McCallum <npmccal...@redhat.com>
+#
+# Copyright (C) 2014  Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipalib/plugins/otptoken.py` module.
+"""
+
+import abc
+import base64
+import hashlib
+import hmac
+import os
+import unittest
+import urlparse
+import uuid
+import random
+import string
+import struct
+import subprocess
+import time
+import types
+
+from datetime import datetime, timedelta
+
+from ipalib import api
+from xmlrpc_test import XMLRPC_test, Declarative
+from ipatests.util import assert_deepequal, raises
+from ipapython.version import API_VERSION
+from ipapython.dn import DN
+import ldap
+
+
+class Fixture(object):
+    __metaclass__ = abc.ABCMeta
+
+    @property
+    def context(self):
+        try:
+            return self.__fixtures
+        except AttributeError:
+            return tuple()
+
+    @context.setter
+    def context(self, value):
+        self.__fixtures = value
+
+    @abc.abstractmethod
+    def __enter__(self):
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def __exit__(self, type, value, traceback):
+        raise NotImplementedError()
+
+    def __repr__(self):
+        return '%s()' % self.__class__.__name__
+
+
+def fixturize(*args):
+    def outer(func):
+        def inner(s, fixtures=args, ctx=()):
+            try:
+                fixture = fixtures[0]
+            except IndexError:
+                yield (lambda *a: func(s, *a),) + ctx
+            else:
+                for f in fixture if type(fixture) is tuple else (fixture,):
+                    if isinstance(f, Fixture):
+                        f.context = ctx
+
+                    if hasattr(f, "__enter__"):
+                        with f as tmp:
+                            for test in inner(s, fixtures[1:], ctx + (tmp,)):
+                                yield test
+                    else:
+                        for test in inner(s, fixtures[1:], ctx + (f,)):
+                            yield test
+
+        inner.__name__ = func.__name__
+        return inner
+    return outer
+
+
+def cmd(cmd, *args, **kwargs):
+    return api.Command[cmd](*args, version=API_VERSION, **kwargs)
+
+
+class AuthenticationError(Exception):
+    pass
+
+
+class Login(object):
+    def __repr__(self):
+        return '%s()' % self.__class__.__name__
+
+    def __init__(self):
+        self.__fails = 0
+
+    def __call__(self, uid, pwd):
+        try:
+            self.login(uid, pwd)
+            self.__fails = 0
+        except:
+            self.__fails += 1
+            if self.__fails > 2:
+                cmd('user_unlock', uid)
+                self.__fails = 0
+            raise
+
+    def login(self, uid, pwd):
+        raise NotImplementedError()
+
+
+class LDAPLogin(Login):
+    def __init__(self):
+        super(LDAPLogin, self).__init__()
+
+        self.__conn = ldap.initialize('ldap://' + api.env.host)
+
+    def login(self, uid, pwd):
+        dn = DN(('uid', uid), api.env.container_user, api.env.basedn)
+        try:
+            self.__conn.simple_bind_s(str(dn), pwd)
+        except ldap.INVALID_CREDENTIALS as e:
+            raise AuthenticationError(e.message)
+
+
+class Krb5Login(Login):
+    def __init__(self):
+        super(Krb5Login, self).__init__()
+
+        self.__fast = subprocess.Popen(
+            ['/usr/bin/klist'],
+            stdout=subprocess.PIPE
+        ).communicate()[0].split('\n')[0].split(': ')[1]
+
+    def kinit(self, uid, pwd, newpwd=None, ccache="FILE:/dev/null"):
+        data = pwd + '\n'
+        if newpwd is not None:
+            data += newpwd + '\n' + newpwd + '\n'
+
+        p = subprocess.Popen(
+            ['/usr/bin/kinit',
+             '-T', self.__fast,
+             '-c', ccache,
+             uid + '@' + api.env.realm],
+            stdin=subprocess.PIPE,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE
+        )
+        out, err = p.communicate(data)
+        if p.returncode != 0:
+            raise AuthenticationError(out, err)
+
+    def login(self, uid, pwd):
+        self.kinit(uid, pwd)
+
+
+class User(Fixture):
+    def __init__(self, **kwargs):
+        self.username = u''.join(random.sample(string.lowercase, 20))
+        self.password = u''.join(random.sample(string.letters, 20))
+        self.__kwargs = kwargs
+
+    def __enter__(self):
+        cmd('user_add', self.username,
+            userpassword=self.password,
+            givenname=self.username[:len(self.username) / 2],
+            sn=self.username[len(self.username) / 2:],
+            **self.__kwargs)
+
+        # Change password
+        Krb5Login().kinit(self.username, self.password, self.password)
+        return self
+
+    def __exit__(self, type, value, traceback):
+        cmd('user_del', self.username)
+
+
+class Enablement(Fixture):
+    def __repr__(self):
+        return "Enablement(%s)" % ("user" if self.user else "global")
+
+    def __set_authtype(self, authtype):
+        if self.user:
+            for fixture in self.context:
+                if isinstance(fixture, User):
+                    cmd('user_mod', fixture.username, ipauserauthtype=authtype)
+                    return
+            assert False
+        else:
+            cmd('config_mod', ipauserauthtype=authtype)
+            time.sleep(60)  # Work around cfg update hack in ipa-kdb
+
+    def __init__(self, user=True):
+        self.user = user
+
+    def __enter__(self):
+        self.__set_authtype((u'otp',))
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.__set_authtype(())
+
+
+class Token(dict, Fixture):
+    @property
+    def type(self):
+        return self[u'type'].upper()
+
+    @property
+    def active(self):
+        if self.get(u'ipatokendisabled', False):
+            return False
+
+        nb = self.get(u'ipatokennotbefore', None)
+        if nb is not None and datetime.now() < nb:
+            return False
+
+        na = self.get(u'ipatokennotafter', None)
+        if na is not None and datetime.now() > na:
+            return False
+
+        return True
+
+    def __repr__(self):
+        prefixes = ['ipatoken' + x for x in ('totp', 'hotp', 'otp', '')]
+
+        args = {}
+        for k, v in self.items():
+            for prefix in prefixes:
+                if k.startswith(prefix):
+                    k = k[len(prefix):]
+                    break
+
+            if k in ('key', 'uniqueid', 'owner'):
+                continue
+
+            if isinstance(v, datetime):
+                v = v.strftime("%Y%m%d%H%M%SZ")
+
+            args[k] = v
+
+        return "%s(%s:%d|%d%s%s%r)" % (
+            args.pop('type').upper(),
+            args.pop('algorithm'),
+            args.pop('digits'),
+            args.pop('counter', args.pop('timestep', 0)),
+            ':' if self.type == 'TOTP' else '',
+            str(args.pop('clockoffset', '')),
+            args
+        )
+
+    def otp(self, at=0):
+        # I first attempted implementing this with pyotp. However, pyotp has
+        # a critical bug which appeared in testing. I fixed this bug and
+        # submitted it upstream: https://github.com/nathforge/pyotp/pull/9
+        #
+        # However, upstream pyotp appears to be dead. For now, I have
+        # implemented the algorithm myself. In the future, it would be nice
+        # to use python-cryptography here.
+
+        # If the token is time-based, calculate the counter from the time.
+        if self.type == u"TOTP":
+            intrvl = self[u'ipatokentotptimestep']
+            offset = self.get(u'ipatokentotpclockoffset', 0)
+            at = (time.time() + offset + intrvl * at) / intrvl
+
+        # Otherwise, just account for the specified counter offset.
+        elif self.type == u"HOTP":
+            if at < 0:  # Skip invalid test offsets.
+                raise unittest.SkipTest()
+            at += self.get(u'ipatokenhotpcounter', 0)
+
+        # Create the HMAC of the current counter
+        countr = struct.pack("!Q", at)
+        hasher = getattr(hashlib, self[u'ipatokenotpalgorithm'])
+        digest = hmac.HMAC(self[u'ipatokenotpkey'], countr, hasher).digest()
+
+        # Get the number of digits
+        digits = self[u'ipatokenotpdigits']
+
+        # Truncate the digest
+        offset = ord(digest[-1]) & 0xf
+        binary = (ord(digest[offset+0]) & 0x7f) << 0x18
+        binary |= (ord(digest[offset+1]) & 0xff) << 0x10
+        binary |= (ord(digest[offset+2]) & 0xff) << 0x08
+        binary |= (ord(digest[offset+3]) & 0xff) << 0x00
+        binary = binary % (10 ** digits)
+
+        return "0" * (digits - len(str(binary))) + str(binary)
+
+    def __init__(self, **kwargs):
+        super(Token, self).__init__(**kwargs)
+
+        # Add in default values.
+        self.setdefault(u'ipatokenuniqueid', unicode(uuid.uuid4()))
+        self.setdefault(u'ipatokenotpkey', os.urandom(20))
+        for i in range(len(api.Object['otptoken'].params)):
+            param = api.Object['otptoken'].params[i]
+            if param.default is not None and param.name:
+                self.setdefault(param.name, param.default)
+
+        # Remove defaults that don't apply.
+        types = {
+            "HOTP": (u'ipatokenhotpcounter',),
+            "TOTP": (u'ipatokentotptimestep', u'ipatokentotpclockoffset'),
+        }
+        for k, names in types.items():
+            if k != self.type:
+                for name in names:
+                    del self[name]
+
+    def __enter__(self):
+        kwargs = {}
+        kwargs.update(self)
+
+        # Get the owner from the user fixture.
+        for fixture in self.context:
+            if isinstance(fixture, User):
+                kwargs[u'ipatokenowner'] = fixture.username
+                break
+
+        # Add the token.
+        result = cmd('otptoken_add', **kwargs)
+
+        try:
+            # Remove the URI and validate the rest of the return value.
+            uri = result.get('result', {}).pop('uri', None)
+            expected = {}
+            for k, v in kwargs.items():
+                if k == 'setattr':
+                    continue
+
+                if isinstance(v, bool):
+                    expected[k] = (unicode(v).upper(),)
+                elif isinstance(v, int):
+                    expected[k] = (unicode(v),)
+                else:
+                    expected[k] = (v,)
+            expected[u'type'] = expected[u'type'][0].upper()
+            expected[u'dn'] = u'ipatokenuniqueid=%s,cn=otp,dc=example,dc=com'
+            expected[u'dn'] %= self[u'ipatokenuniqueid']
+            assert_deepequal({
+                u'summary': u'Added OTP token "%s"' % self[u'ipatokenuniqueid'],
+                u'result': expected,
+                u'value': self[u'ipatokenuniqueid'],
+            }, result)
+
+            # Validate the URI.
+            split = urlparse.urlsplit(uri)
+            assert split.scheme == u'otpauth'
+            assert split.netloc.upper() == self.type
+            assert split.path == '/%s@%s:%s' % (
+                kwargs[u'ipatokenowner'],
+                api.env.realm,
+                self[u'ipatokenuniqueid']
+            )
+
+            # Validate the query.
+            types = {
+                "HOTP": {u'counter': self.get(u'ipatokenhotpcounter')},
+                "TOTP": {u'period': self.get(u'ipatokentotptimestep')},
+            }
+            query = {
+                u'algorithm': self.get(u'ipatokenotpalgorithm'),
+                u'issuer': u'%s@%s' % (kwargs[u'ipatokenowner'], api.env.realm),
+                u'digits': self.get(u'ipatokenotpdigits'),
+                u'secret': base64.b32encode(self[u'ipatokenotpkey']),
+            }
+            query.update(types.get(self.type, {}))
+            assert_deepequal(
+                {k: (unicode(v),) for k, v in query.items()},
+                urlparse.parse_qs(split.query)
+            )
+        except:
+            cmd('otptoken_del', self[u'ipatokenuniqueid'])
+            raise
+
+        return self
+
+    def __exit__(self, type, value, traceback):
+        cmd('otptoken_del', self[u'ipatokenuniqueid'])
+
+
+def auth(success, login, user, token, pwd=None, at=0):
+    if pwd is None:
+        pwd = user.password
+
+    if isinstance(at, basestring):
+        code = at
+
+    else:
+        code = token.otp(at) if token else '123456'
+        if code is None:  # Skip invalid test offsets.
+            raise unittest.SkipTest()
+
+    if success:
+        login(user.username, pwd + code)
+    else:
+        raises(AuthenticationError, login, user.username, pwd + code)
+
+
+class TestEnablement:
+    # This describes the success conditions for authentication.
+    #   1. If user-auth-type does not include otp: password-only.
+    #   2. Otherwise, if no token exists: password-only.
+    #   3. Otherwise: password AND code.
+    #
+    # All states not accounted for here MUST fail.
+    #
+    # Format: (enabled, token created, pwd given, token given): success
+    EXPECTATIONS = {
+        (False, False, True, False): True,  # Condition 1
+        (False, True,  True, False): True,  # Condition 1
+        (True,  False, True, False): True,  # Condition 2
+        (True,  True,  True, True):  True,  # Condition 3
+    }
+
+    @fixturize(Krb5Login(), User(), (
+            None,
+            Enablement(True),
+            Enablement(False)
+        ), (
+            None,
+            Token()
+        ), (
+            (True,  None, ''),        # Password-only
+            (True,  None, 0),         # Both
+            (False, '', 1),           # Code-only
+            (False, '', ''),          # Neither
+            (False, 'xxx', 2),        # Bad password
+            (False, None, '123456'),  # Bad code
+        )
+    )
+    def test(self, login, user, enablement, token, params):
+        success, pwd, at = params
+
+        if success:
+            success = self.EXPECTATIONS.get((
+                enablement is not None,
+                token is not None,
+                pwd is None,
+                isinstance(at, int)
+            ), False)
+
+        auth(success, login, user, token, pwd, at)
+
+
+class TestTokens:
+    TODAY = datetime.now().replace(microsecond=0)
+    TOMORROW = TODAY + timedelta(1)
+    YESTERDAY = TODAY - timedelta(1)
+
+    @fixturize(
+        Krb5Login(),
+        User(ipauserauthtype=(u'otp')), (
+            Token(type=u'hotp'),
+            Token(type=u'HOTP'),
+            Token(type=u'HOTP', ipatokenhotpcounter=1000),
+
+            Token(type=u'totp'),
+            Token(type=u'TOTP'),
+            Token(type=u'TOTP', ipatokentotptimestep=60),
+            Token(type=u'TOTP', ipatokentotpclockoffset=30000),
+
+            Token(ipatokenotpalgorithm=u'sha1'),
+            Token(ipatokenotpalgorithm=u'sha256'),
+            Token(ipatokenotpalgorithm=u'sha384'),
+            Token(ipatokenotpalgorithm=u'sha512'),
+
+            Token(ipatokenotpdigits=6),
+            Token(ipatokenotpdigits=8),
+
+            Token(ipatokendisabled=False),
+            Token(ipatokennotbefore=YESTERDAY),
+            Token(ipatokennotafter=TOMORROW),
+
+            Token(ipatokendisabled=True),
+            Token(ipatokennotbefore=TOMORROW),
+            Token(ipatokennotafter=YESTERDAY),
+        ), (
+            (False, -1000),  # Check distant past OTP
+            (True,  -2),     # Check past OTP
+            (True,   0),     # Check current OTP
+            (False,  0),     # Check duplicate OTP
+            (True,   1),     # Check next OTP
+            (True,   3),     # Check future OTP
+            (False,  1000),  # Check distant future OTP
+        )
+    )
+    def test(self, login, user, token, params):
+        auth(params[0] and token.active, login, user, token, None, params[1])
+
+
+class TestSync:
+    """NOTE: this test requires ca.crt in your confdir so that the client can
+validate the server's certificate."""
+
+    @fixturize(Krb5Login(), User(ipauserauthtype=(u'otp')), (
+            Token(type=u'hotp'),
+            Token(type=u'totp')
+        ), (
+            (False, True, 0),   # Normal token operation
+            (False, False, 7),  # Skip-ahead failure
+            (True, True, 7),    # Skip-ahead success after sync
+        )
+    )
+    def test(self, login, user, token, params):
+        sync, success, at = params
+
+        if sync:
+            cmd('otptoken_sync',
+                user=user.username,
+                password=user.password,
+                first_code=unicode(token.otp(at)),
+                second_code=unicode(token.otp(at + 1)))
+            at += 2
+
+        auth(success, login, user, token, None, at)
-- 
2.1.0

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to