changeset d3a8ed75e4c0 in trytond:default
details: https://hg.tryton.org/trytond?cmd=changeset;node=d3a8ed75e4c0
description:
        Protect trusted devices against brute force attack

        issue9386
        review321511002
diffstat:

 CHANGELOG                  |   1 +
 trytond/res/__init__.py    |   1 +
 trytond/res/user.py        |  88 +++++++++++++++++++++++++++++++++++++++++----
 trytond/tests/test_user.py |  45 +++++++++++++++++++++++
 4 files changed, 126 insertions(+), 9 deletions(-)

diffs (251 lines):

diff -r da66215e2990 -r d3a8ed75e4c0 CHANGELOG
--- a/CHANGELOG Sat Feb 20 00:58:39 2021 +0100
+++ b/CHANGELOG Sun Feb 21 16:23:11 2021 +0100
@@ -1,3 +1,4 @@
+* Protect trusted devices against brute force attack
 * Make ModelView.parse_view public
 * Add help for each value to selection and multiselection
 * Use safe_join in SharedDataMiddlewareIndex (issue10068)
diff -r da66215e2990 -r d3a8ed75e4c0 trytond/res/__init__.py
--- a/trytond/res/__init__.py   Sat Feb 20 00:58:39 2021 +0100
+++ b/trytond/res/__init__.py   Sun Feb 21 16:23:11 2021 +0100
@@ -15,6 +15,7 @@
         group.Group,
         user.User,
         user.LoginAttempt,
+        user.UserDevice,
         user.UserAction,
         user.UserGroup,
         user.Warning_,
diff -r da66215e2990 -r d3a8ed75e4c0 trytond/res/user.py
--- a/trytond/res/user.py       Sat Feb 20 00:58:39 2021 +0100
+++ b/trytond/res/user.py       Sun Feb 21 16:23:11 2021 +0100
@@ -368,10 +368,12 @@
     def write(cls, users, values, *args):
         pool = Pool()
         Session = pool.get('ir.session')
+        UserDevice = pool.get('res.user.device')
 
         actions = iter((users, values) + args)
         all_users = []
         session_to_clear = []
+        users_to_clear = []
         args = []
         for users, values in zip(actions, actions):
             all_users += users
@@ -379,10 +381,12 @@
 
             if 'password' in values:
                 session_to_clear += users
+                users_to_clear += [u.login for u in users]
 
         super(User, cls).write(*args)
 
         Session.clear(session_to_clear)
+        UserDevice.clear(users_to_clear)
 
         # Clean cursor cache as it could be filled by domain_get
         for cache in Transaction().cache.values():
@@ -618,15 +622,20 @@
         '''
         Return user id if password matches
         '''
-        LoginAttempt = Pool().get('res.user.login.attempt')
+        pool = Pool()
+        LoginAttempt = pool.get('res.user.login.attempt')
+        UserDevice = pool.get('res.user.device')
+
         count_ip = LoginAttempt.count_ip()
         if count_ip > config.getint(
                 'session', 'max_attempt_ip_network', default=300):
             # Do not add attempt as the goal is to prevent flooding
             raise RateLimitException()
-        count = LoginAttempt.count(login)
+        device_cookie = UserDevice.get_valid_cookie(
+            login, parameters.get('device_cookie'))
+        count = LoginAttempt.count(login, device_cookie)
         if count > config.getint('session', 'max_attempt', default=5):
-            LoginAttempt.add(login)
+            LoginAttempt.add(login, device_cookie)
             raise RateLimitException()
         Transaction().atexit(time.sleep, random.randint(0, 2 ** count - 1))
         for methods in config.get(
@@ -642,9 +651,9 @@
                 if len(user_ids) != 1 or not all(user_ids):
                     break
             if len(user_ids) == 1 and all(user_ids):
-                LoginAttempt.remove(login)
+                LoginAttempt.remove(login, device_cookie)
                 return user_ids.pop()
-        LoginAttempt.add(login)
+        LoginAttempt.add(login, device_cookie)
 
     @classmethod
     def _login_password(cls, login, parameters):
@@ -740,6 +749,7 @@
     """
     __name__ = 'res.user.login.attempt'
     login = fields.Char('Login', size=512)
+    device_cookie = fields.Char("Device Cookie")
     ip_address = fields.Char("IP Address")
     ip_network = fields.Char("IP Network")
 
@@ -771,7 +781,7 @@
 
     @classmethod
     @_login_size
-    def add(cls, login):
+    def add(cls, login, device_cookie=None):
         cursor = Transaction().connection.cursor()
         table = cls.__table__()
         cursor.execute(*table.delete(where=table.create_date < cls.delay()))
@@ -779,24 +789,28 @@
         ip_address, ip_network = cls.ipaddress()
         cls.create([{
                     'login': login,
+                    'device_cookie': device_cookie,
                     'ip_address': str(ip_address),
                     'ip_network': str(ip_network),
                     }])
 
     @classmethod
     @_login_size
-    def remove(cls, login):
+    def remove(cls, login, cookie):
         cursor = Transaction().connection.cursor()
         table = cls.__table__()
-        cursor.execute(*table.delete(where=table.login == login))
+        cursor.execute(*table.delete(
+                where=(table.login == login) & (table.device_cookie == cookie)
+                ))
 
     @classmethod
     @_login_size
-    def count(cls, login):
+    def count(cls, login, device_cookie):
         cursor = Transaction().connection.cursor()
         table = cls.__table__()
         cursor.execute(*table.select(Count(Literal('*')),
                 where=(table.login == login)
+                & (table.device_cookie == device_cookie)
                 & (table.create_date >= cls.delay())))
         return cursor.fetchone()[0]
 
@@ -813,6 +827,62 @@
     del _login_size
 
 
+class UserDevice(ModelSQL):
+    "User Device"
+    __name__ = 'res.user.device'
+
+    login = fields.Char("Login", required=True)
+    cookie = fields.Char("Cookie", readonly=True, required=True)
+
+    @classmethod
+    def __setup__(cls):
+        super().__setup__()
+        cls.__rpc__.update({
+                'renew': RPC(readonly=False),
+                })
+
+    @classmethod
+    def get_valid_cookie(cls, login, cookie):
+        try:
+            device, = cls.search([
+                    ('login', '=', login),
+                    ('cookie', '=', cookie),
+                    ], limit=1)
+        except ValueError:
+            return None
+
+        return device.cookie
+
+    @classmethod
+    def renew(cls, current_cookie):
+        pool = Pool()
+        User = pool.get('res.user')
+
+        user = User(Transaction().user)
+        new_cookie = uuid.uuid4().hex
+        current_devices = cls.search([
+                    ('login', '=', user.login),
+                    ('cookie', '=', current_cookie),
+                    ])
+        if current_devices:
+            cls.write(current_devices, {
+                    'cookie': new_cookie
+                    })
+        else:
+            cls.create([{
+                        'login': user.login,
+                        'cookie': new_cookie,
+                        }])
+        return new_cookie
+
+    @classmethod
+    def clear(cls, logins):
+        for sub_logins in grouped_slice(logins):
+            cls.delete(cls.search([
+                        ('login', 'in', list(sub_logins)),
+                        ]))
+
+
 class UserAction(ModelSQL):
     'User - Action'
     __name__ = 'res.user-ir.action'
diff -r da66215e2990 -r d3a8ed75e4c0 trytond/tests/test_user.py
--- a/trytond/tests/test_user.py        Sat Feb 20 00:58:39 2021 +0100
+++ b/trytond/tests/test_user.py        Sun Feb 21 16:23:11 2021 +0100
@@ -244,6 +244,51 @@
                 with set_authentications(methods):
                     self.assertEqual(User.get_login('user', {}), result)
 
+    @with_transaction()
+    def test_bad_authentication_logging(self):
+        "Test the logging of log in attempts"
+        pool = Pool()
+        LoginAttempt = pool.get('res.user.login.attempt')
+
+        self.create_user('user', '12345')
+        self.check_user('user', '12345')
+        self.assertEqual(LoginAttempt.count('user', None), 1)
+
+    @with_transaction()
+    def test_bad_authentication_valid_cookie(self):
+        "Test the logging of log in attempts with a valid cookie"
+        pool = Pool()
+        User = pool.get('res.user')
+        UserDevice = pool.get('res.user.device')
+        LoginAttempt = pool.get('res.user.login.attempt')
+
+        user = self.create_user('user', '12345')
+        with Transaction().set_user(user.id):
+            cookie = UserDevice.renew(None)
+
+        User.get_login('user', {'password': '', 'device_cookie': cookie})
+        self.assertEqual(LoginAttempt.count('user', None), 0)
+        self.assertEqual(LoginAttempt.count('user', cookie), 1)
+
+    @with_transaction()
+    def test_bad_authentication_invalid_cookie(self):
+        "Test the logging of log in attempts without a valid cookie"
+        pool = Pool()
+        User = pool.get('res.user')
+        UserDevice = pool.get('res.user.device')
+        LoginAttempt = pool.get('res.user.login.attempt')
+
+        user = self.create_user('user', '12345')
+        with Transaction().set_user(user.id):
+            cookie = UserDevice.renew(None)
+
+        User.get_login('user', {
+                'password': '',
+                'device_cookie': cookie + 'wrong',
+                })
+        self.assertEqual(LoginAttempt.count('user', None), 1)
+        self.assertEqual(LoginAttempt.count('user', cookie), 0)
+
 
 def suite():
     return unittest.TestLoader().loadTestsFromTestCase(UserTestCase)

Reply via email to