changeset d3a8ed75e4c0 in trytond:default details: https://hg.tryton.org/trytond?cmd=changeset;node=d3a8ed75e4c0 description: Protect trusted devices against brute force attack
issue9386 review321511002 diffstat: CHANGELOG | 1 + trytond/res/__init__.py | 1 + trytond/res/user.py | 88 +++++++++++++++++++++++++++++++++++++++++---- trytond/tests/test_user.py | 45 +++++++++++++++++++++++ 4 files changed, 126 insertions(+), 9 deletions(-) diffs (251 lines): diff -r da66215e2990 -r d3a8ed75e4c0 CHANGELOG --- a/CHANGELOG Sat Feb 20 00:58:39 2021 +0100 +++ b/CHANGELOG Sun Feb 21 16:23:11 2021 +0100 @@ -1,3 +1,4 @@ +* Protect trusted devices against brute force attack * Make ModelView.parse_view public * Add help for each value to selection and multiselection * Use safe_join in SharedDataMiddlewareIndex (issue10068) diff -r da66215e2990 -r d3a8ed75e4c0 trytond/res/__init__.py --- a/trytond/res/__init__.py Sat Feb 20 00:58:39 2021 +0100 +++ b/trytond/res/__init__.py Sun Feb 21 16:23:11 2021 +0100 @@ -15,6 +15,7 @@ group.Group, user.User, user.LoginAttempt, + user.UserDevice, user.UserAction, user.UserGroup, user.Warning_, diff -r da66215e2990 -r d3a8ed75e4c0 trytond/res/user.py --- a/trytond/res/user.py Sat Feb 20 00:58:39 2021 +0100 +++ b/trytond/res/user.py Sun Feb 21 16:23:11 2021 +0100 @@ -368,10 +368,12 @@ def write(cls, users, values, *args): pool = Pool() Session = pool.get('ir.session') + UserDevice = pool.get('res.user.device') actions = iter((users, values) + args) all_users = [] session_to_clear = [] + users_to_clear = [] args = [] for users, values in zip(actions, actions): all_users += users @@ -379,10 +381,12 @@ if 'password' in values: session_to_clear += users + users_to_clear += [u.login for u in users] super(User, cls).write(*args) Session.clear(session_to_clear) + UserDevice.clear(users_to_clear) # Clean cursor cache as it could be filled by domain_get for cache in Transaction().cache.values(): @@ -618,15 +622,20 @@ ''' Return user id if password matches ''' - LoginAttempt = Pool().get('res.user.login.attempt') + pool = Pool() + LoginAttempt = pool.get('res.user.login.attempt') + UserDevice = pool.get('res.user.device') + count_ip = LoginAttempt.count_ip() if count_ip > config.getint( 'session', 'max_attempt_ip_network', default=300): # Do not add attempt as the goal is to prevent flooding raise RateLimitException() - count = LoginAttempt.count(login) + device_cookie = UserDevice.get_valid_cookie( + login, parameters.get('device_cookie')) + count = LoginAttempt.count(login, device_cookie) if count > config.getint('session', 'max_attempt', default=5): - LoginAttempt.add(login) + LoginAttempt.add(login, device_cookie) raise RateLimitException() Transaction().atexit(time.sleep, random.randint(0, 2 ** count - 1)) for methods in config.get( @@ -642,9 +651,9 @@ if len(user_ids) != 1 or not all(user_ids): break if len(user_ids) == 1 and all(user_ids): - LoginAttempt.remove(login) + LoginAttempt.remove(login, device_cookie) return user_ids.pop() - LoginAttempt.add(login) + LoginAttempt.add(login, device_cookie) @classmethod def _login_password(cls, login, parameters): @@ -740,6 +749,7 @@ """ __name__ = 'res.user.login.attempt' login = fields.Char('Login', size=512) + device_cookie = fields.Char("Device Cookie") ip_address = fields.Char("IP Address") ip_network = fields.Char("IP Network") @@ -771,7 +781,7 @@ @classmethod @_login_size - def add(cls, login): + def add(cls, login, device_cookie=None): cursor = Transaction().connection.cursor() table = cls.__table__() cursor.execute(*table.delete(where=table.create_date < cls.delay())) @@ -779,24 +789,28 @@ ip_address, ip_network = cls.ipaddress() cls.create([{ 'login': login, + 'device_cookie': device_cookie, 'ip_address': str(ip_address), 'ip_network': str(ip_network), }]) @classmethod @_login_size - def remove(cls, login): + def remove(cls, login, cookie): cursor = Transaction().connection.cursor() table = cls.__table__() - cursor.execute(*table.delete(where=table.login == login)) + cursor.execute(*table.delete( + where=(table.login == login) & (table.device_cookie == cookie) + )) @classmethod @_login_size - def count(cls, login): + def count(cls, login, device_cookie): cursor = Transaction().connection.cursor() table = cls.__table__() cursor.execute(*table.select(Count(Literal('*')), where=(table.login == login) + & (table.device_cookie == device_cookie) & (table.create_date >= cls.delay()))) return cursor.fetchone()[0] @@ -813,6 +827,62 @@ del _login_size +class UserDevice(ModelSQL): + "User Device" + __name__ = 'res.user.device' + + login = fields.Char("Login", required=True) + cookie = fields.Char("Cookie", readonly=True, required=True) + + @classmethod + def __setup__(cls): + super().__setup__() + cls.__rpc__.update({ + 'renew': RPC(readonly=False), + }) + + @classmethod + def get_valid_cookie(cls, login, cookie): + try: + device, = cls.search([ + ('login', '=', login), + ('cookie', '=', cookie), + ], limit=1) + except ValueError: + return None + + return device.cookie + + @classmethod + def renew(cls, current_cookie): + pool = Pool() + User = pool.get('res.user') + + user = User(Transaction().user) + new_cookie = uuid.uuid4().hex + current_devices = cls.search([ + ('login', '=', user.login), + ('cookie', '=', current_cookie), + ]) + if current_devices: + cls.write(current_devices, { + 'cookie': new_cookie + }) + else: + cls.create([{ + 'login': user.login, + 'cookie': new_cookie, + }]) + return new_cookie + + @classmethod + def clear(cls, logins): + for sub_logins in grouped_slice(logins): + cls.delete(cls.search([ + ('login', 'in', list(sub_logins)), + ])) + + class UserAction(ModelSQL): 'User - Action' __name__ = 'res.user-ir.action' diff -r da66215e2990 -r d3a8ed75e4c0 trytond/tests/test_user.py --- a/trytond/tests/test_user.py Sat Feb 20 00:58:39 2021 +0100 +++ b/trytond/tests/test_user.py Sun Feb 21 16:23:11 2021 +0100 @@ -244,6 +244,51 @@ with set_authentications(methods): self.assertEqual(User.get_login('user', {}), result) + @with_transaction() + def test_bad_authentication_logging(self): + "Test the logging of log in attempts" + pool = Pool() + LoginAttempt = pool.get('res.user.login.attempt') + + self.create_user('user', '12345') + self.check_user('user', '12345') + self.assertEqual(LoginAttempt.count('user', None), 1) + + @with_transaction() + def test_bad_authentication_valid_cookie(self): + "Test the logging of log in attempts with a valid cookie" + pool = Pool() + User = pool.get('res.user') + UserDevice = pool.get('res.user.device') + LoginAttempt = pool.get('res.user.login.attempt') + + user = self.create_user('user', '12345') + with Transaction().set_user(user.id): + cookie = UserDevice.renew(None) + + User.get_login('user', {'password': '', 'device_cookie': cookie}) + self.assertEqual(LoginAttempt.count('user', None), 0) + self.assertEqual(LoginAttempt.count('user', cookie), 1) + + @with_transaction() + def test_bad_authentication_invalid_cookie(self): + "Test the logging of log in attempts without a valid cookie" + pool = Pool() + User = pool.get('res.user') + UserDevice = pool.get('res.user.device') + LoginAttempt = pool.get('res.user.login.attempt') + + user = self.create_user('user', '12345') + with Transaction().set_user(user.id): + cookie = UserDevice.renew(None) + + User.get_login('user', { + 'password': '', + 'device_cookie': cookie + 'wrong', + }) + self.assertEqual(LoginAttempt.count('user', None), 1) + self.assertEqual(LoginAttempt.count('user', cookie), 0) + def suite(): return unittest.TestLoader().loadTestsFromTestCase(UserTestCase)