On Thu, Sep 21, 2023 at 4:08 AM Tobias Hagelborn
<tobias.hagelb...@axis.com> wrote:
>
> From: Tobias Hagelborn <tobia...@axis.com>
>
> Retry insert operations in case the database is locked by
> an external process. For instance an external cleanup or data
> retention transaction. Use async sleep to not block the event loop.

sqlite already handles this internally with a timeout specified in
sqlite3.connect(). The default is 5 seconds; I think it would be
better to add a command line option to the server that allows a longer
timeout to be specified instead of manually retrying.

Allowing multiple queries to run in parallel (a side effect of async)
might mess up the cursor.lastrowid tracking, so I'm a little leary of
doing that. The long blocks should only actually happen when you are
doing long maintenance operations, so an option for a longer timeout
on the server is probably better (and, maybe rework your cleanup to
not lock the database for so long)


>
> Signed-off-by: Tobias Hagelborn <tobias.hagelb...@axis.com>
> ---
>  lib/hashserv/server.py | 41 +++++++++++++++++++++++++++--------------
>  1 file changed, 27 insertions(+), 14 deletions(-)
>
> diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
> index d40a2ab8..c898be3f 100644
> --- a/lib/hashserv/server.py
> +++ b/lib/hashserv/server.py
> @@ -9,6 +9,7 @@ import enum
>  import asyncio
>  import logging
>  import math
> +import sqlite3
>  import time
>  from . import create_async_client, UNIHASH_TABLE_COLUMNS, 
> OUTHASH_TABLE_COLUMNS
>  import bb.asyncrpc
> @@ -114,7 +115,7 @@ class Resolve(enum.Enum):
>      REPLACE = enum.auto()
>
>
> -def insert_table(cursor, table, data, on_conflict):
> +async def insert_table(cursor, table, data, on_conflict):
>      resolve = {
>          Resolve.FAIL: "",
>          Resolve.IGNORE: " OR IGNORE",
> @@ -129,7 +130,19 @@ def insert_table(cursor, table, data, on_conflict):
>          values=", ".join(":" + k for k in keys),
>      )
>      prevrowid = cursor.lastrowid
> -    cursor.execute(query, data)
> +
> +    RETRIES = 5
> +    for x in range(RETRIES):
> +        try:
> +            cursor.execute(query, data)
> +        except sqlite3.OperationalError as e:
> +            if "database is locked" in str(e):
> +                await asyncio.sleep(1)
> +        finally:
> +            break
> +    else:
> +        cursor.execute(query, data)
> +
>      logging.debug(
>          "Inserting %r into %s, %s",
>          data,
> @@ -138,17 +151,17 @@ def insert_table(cursor, table, data, on_conflict):
>      )
>      return (cursor.lastrowid, cursor.lastrowid != prevrowid)
>
> -def insert_unihash(cursor, data, on_conflict):
> -    return insert_table(cursor, "unihashes_v2", data, on_conflict)
> +async def insert_unihash(cursor, data, on_conflict):
> +    return await insert_table(cursor, "unihashes_v2", data, on_conflict)
>
> -def insert_outhash(cursor, data, on_conflict):
> -    return insert_table(cursor, "outhashes_v2", data, on_conflict)
> +async def insert_outhash(cursor, data, on_conflict):
> +    return await insert_table(cursor, "outhashes_v2", data, on_conflict)
>
>  async def copy_unihash_from_upstream(client, db, method, taskhash):
>      d = await client.get_taskhash(method, taskhash)
>      if d is not None:
>          with closing(db.cursor()) as cursor:
> -            insert_unihash(
> +            await insert_unihash(
>                  cursor,
>                  {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
>                  Resolve.IGNORE,
> @@ -260,7 +273,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>              elif self.upstream_client is not None:
>                  d = await self.upstream_client.get_taskhash(method, taskhash)
>                  d = {k: v for k, v in d.items() if k in 
> UNIHASH_TABLE_COLUMNS}
> -                insert_unihash(cursor, d, Resolve.IGNORE)
> +                await insert_unihash(cursor, d, Resolve.IGNORE)
>                  self.db.commit()
>
>          return d
> @@ -301,16 +314,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>
>          return d
>
> -    def update_unified(self, cursor, data):
> +    async def update_unified(self, cursor, data):
>          if data is None:
>              return
>
> -        insert_unihash(
> +        await insert_unihash(
>              cursor,
>              {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
>              Resolve.IGNORE
>          )
> -        insert_outhash(
> +        await insert_outhash(
>              cursor,
>              {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
>              Resolve.IGNORE
> @@ -386,7 +399,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                      outhash_data[k] = data[k]
>
>              # Insert the new entry, unless it already exists
> -            (rowid, inserted) = insert_outhash(cursor, outhash_data, 
> Resolve.IGNORE)
> +            (rowid, inserted) = await insert_outhash(cursor, outhash_data, 
> Resolve.IGNORE)
>
>              if inserted:
>                  # If this row is new, check if it is equivalent to another
> @@ -427,7 +440,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                              unihash = upstream_data['unihash']
>
>
> -                insert_unihash(
> +                await insert_unihash(
>                      cursor,
>                      {
>                          'method': data['method'],
> @@ -460,7 +473,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
>                  'taskhash': data['taskhash'],
>                  'unihash': data['unihash'],
>              }
> -            insert_unihash(cursor, insert_data, Resolve.IGNORE)
> +            await insert_unihash(cursor, insert_data, Resolve.IGNORE)
>              self.db.commit()
>
>              # Fetch the unihash that will be reported for the taskhash. If 
> the
> --
> 2.30.2
>
>
> 
>
-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#188019): 
https://lists.openembedded.org/g/openembedded-core/message/188019
Mute This Topic: https://lists.openembedded.org/mt/101496954/21656
Group Owner: openembedded-core+ow...@lists.openembedded.org
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[arch...@mail-archive.com]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to