From: Tobias Hagelborn <tobia...@axis.com>

Retry insert operations in case the database is locked by
an external process. For instance an external cleanup or data
retention transaction. Use async sleep to not block the event loop.

Signed-off-by: Tobias Hagelborn <tobias.hagelb...@axis.com>
---
 lib/hashserv/server.py | 41 +++++++++++++++++++++++++++--------------
 1 file changed, 27 insertions(+), 14 deletions(-)

diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index d40a2ab8..c898be3f 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -9,6 +9,7 @@ import enum
 import asyncio
 import logging
 import math
+import sqlite3
 import time
 from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
 import bb.asyncrpc
@@ -114,7 +115,7 @@ class Resolve(enum.Enum):
     REPLACE = enum.auto()
 
 
-def insert_table(cursor, table, data, on_conflict):
+async def insert_table(cursor, table, data, on_conflict):
     resolve = {
         Resolve.FAIL: "",
         Resolve.IGNORE: " OR IGNORE",
@@ -129,7 +130,19 @@ def insert_table(cursor, table, data, on_conflict):
         values=", ".join(":" + k for k in keys),
     )
     prevrowid = cursor.lastrowid
-    cursor.execute(query, data)
+
+    RETRIES = 5
+    for x in range(RETRIES):
+        try:
+            cursor.execute(query, data)
+        except sqlite3.OperationalError as e:
+            if "database is locked" in str(e):
+                await asyncio.sleep(1)
+        finally:
+            break
+    else:
+        cursor.execute(query, data)
+
     logging.debug(
         "Inserting %r into %s, %s",
         data,
@@ -138,17 +151,17 @@ def insert_table(cursor, table, data, on_conflict):
     )
     return (cursor.lastrowid, cursor.lastrowid != prevrowid)
 
-def insert_unihash(cursor, data, on_conflict):
-    return insert_table(cursor, "unihashes_v2", data, on_conflict)
+async def insert_unihash(cursor, data, on_conflict):
+    return await insert_table(cursor, "unihashes_v2", data, on_conflict)
 
-def insert_outhash(cursor, data, on_conflict):
-    return insert_table(cursor, "outhashes_v2", data, on_conflict)
+async def insert_outhash(cursor, data, on_conflict):
+    return await insert_table(cursor, "outhashes_v2", data, on_conflict)
 
 async def copy_unihash_from_upstream(client, db, method, taskhash):
     d = await client.get_taskhash(method, taskhash)
     if d is not None:
         with closing(db.cursor()) as cursor:
-            insert_unihash(
+            await insert_unihash(
                 cursor,
                 {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
                 Resolve.IGNORE,
@@ -260,7 +273,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
             elif self.upstream_client is not None:
                 d = await self.upstream_client.get_taskhash(method, taskhash)
                 d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
-                insert_unihash(cursor, d, Resolve.IGNORE)
+                await insert_unihash(cursor, d, Resolve.IGNORE)
                 self.db.commit()
 
         return d
@@ -301,16 +314,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
 
         return d
 
-    def update_unified(self, cursor, data):
+    async def update_unified(self, cursor, data):
         if data is None:
             return
 
-        insert_unihash(
+        await insert_unihash(
             cursor,
             {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
             Resolve.IGNORE
         )
-        insert_outhash(
+        await insert_outhash(
             cursor,
             {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
             Resolve.IGNORE
@@ -386,7 +399,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
                     outhash_data[k] = data[k]
 
             # Insert the new entry, unless it already exists
-            (rowid, inserted) = insert_outhash(cursor, outhash_data, 
Resolve.IGNORE)
+            (rowid, inserted) = await insert_outhash(cursor, outhash_data, 
Resolve.IGNORE)
 
             if inserted:
                 # If this row is new, check if it is equivalent to another
@@ -427,7 +440,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
                             unihash = upstream_data['unihash']
 
 
-                insert_unihash(
+                await insert_unihash(
                     cursor,
                     {
                         'method': data['method'],
@@ -460,7 +473,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
                 'taskhash': data['taskhash'],
                 'unihash': data['unihash'],
             }
-            insert_unihash(cursor, insert_data, Resolve.IGNORE)
+            await insert_unihash(cursor, insert_data, Resolve.IGNORE)
             self.db.commit()
 
             # Fetch the unihash that will be reported for the taskhash. If the
-- 
2.30.2

-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#187987): 
https://lists.openembedded.org/g/openembedded-core/message/187987
Mute This Topic: https://lists.openembedded.org/mt/101496954/21656
Group Owner: openembedded-core+ow...@lists.openembedded.org
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[arch...@mail-archive.com]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to