Thanks Mike, yes you are right sharing the session can cause an issue I
already tried using something similar to the following code:
async def get_areas(geometry) -> List[Dict[str, Any]]:
sql = select(
models. GeoTable , func.ST_Intersection(models. GeoTable .geom,
geometry)
).where(func.ST_Intersects(models. GeoTable .geom, geometry))
async for session in get_async_session():
rows = await session.execute(sql)
return [
{
"column": _f.column_value,
}
for _r in rows.all()
]
I thought that maybe sharing the connection might speed up the transaction.
On Wednesday, December 7, 2022 at 1:47:15 PM UTC-6 Mike Bayer wrote:
> it's not clear what would cause the time delay, seems like a timeout after
> a deadlock of some kind. however first thing is that your async code is
> written incorrectly, as it is sharing the same AsyncSession among two
> concurrent async tasks, which will cause random errors at any number of
> levels (ORM Session, Core connection, asyncpg connection) and is likely to
> be related to your issue.
>
> for your async.gather(), each task should receive (or create on its own) a
> separate AsyncSession that's independent of the other, and each
> AsyncSession should proceed on its own connection/transaction. You can't
> do concurrent work within a single transaction even with an asyncpg
> connection.
>
> On Wed, Dec 7, 2022, at 1:41 PM, Jaime Valdez wrote:
>
> Hi group,
>
> I'm having some issues trying to do some queries using the
> create_async_engine, the following code is the fastest one.
>
> import time
> import asyncio
>
> from typing import Any, Dict, List
>
> from sqlalchemy import create_engine
> from sqlalchemy.orm import sessionmaker
> from sqlalchemy import func, select
>
> from asyncio import run as aiorun
>
> import models
>
> def get_session():
> engine = create_engine("postgresql+psycopg2://
> postgres:[email protected]:5431/db
> <http://postgres:[email protected]:5431/db>",
> echo=True,
> pool_size=20,
> max_overflow=0,
> pool_pre_ping=True,
> )
> return sessionmaker(bind=engine)
>
> sess = get_session()
> session = sess()
>
> _geom = "SRID=4269;POLYGON(( ... ))"
>
> async def get_areas(geometry) -> List[Dict[str, Any]]:
> _res = []
>
> sql = select(
> models.GeoTable, func.ST_Intersection(models.GeoTable.geom,
> geometry)
> ).where(func.ST_Intersects(models.GeoTable.geom, geometry))
>
> if rows := session.execute(sql).scalars():
> _res.extend(
> {
> "column": _f.column_value,
> }
> for _f in rows
> )
> return _res
>
> loop = asyncio.get_event_loop()
>
> async def _search():
> tic = time.perf_counter()
>
> (
> result1,
> result2,
> ) = loop.run_until_complete(asyncio.gather(
> get_areas(_geom),
> get_areas(_geom)
> ))
> print(result1)
> print(result2)
>
> toc = time.perf_counter()
> print(f"{toc - tic:0.4f} seconds")
>
> aiorun(_search())
> loop.close()
>
> This code takes about 1.5 secs to complete. But using this other approach
> (by using async engine) as the code below:
>
> import time
> import asyncio
>
> from typing import Any, Dict, List, AsyncGenerator
>
> from sqlalchemy.ext.asyncio import create_async_engine
> from sqlalchemy.pool import NullPool
> from sqlalchemy.ext.asyncio import AsyncSession
> from sqlalchemy.orm import sessionmaker
> from sqlalchemy import func, select
>
> from asyncio import run as aiorun
>
> import models
>
> async_engine = create_async_engine(
> "postgresql+asyncpg://postgres:[email protected]:5431/db
> <http://postgres:[email protected]:5431/db>",
> )
>
> async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
> async_session = sessionmaker(bind=async_engine, class_=AsyncSession,
> expire_on_commit=False)
> async with async_session() as session:
> yield session
>
> _geom = "SRID=4269;POLYGON(( ... ))"
>
> async def get_areas(session, geometry) -> List[Dict[str, Any]]:
> sql = select(
> models.GeoTable, func.ST_Intersection(models.GeoTable.geom,
> geometry)
> ).where(func.ST_Intersects(models.GeoTable.geom, geometry))
>
> rows = await session.execute(sql)
> return [
> {
> "column": _f.column_value,
> }
> for _r in rows.all()
> ]
>
> async def _search():
> tic = time.perf_counter()
>
> async for session in get_async_session():
> (
> result1,
> result2,
> ) = await asyncio.gather(
> get_areas(session, _geom),
> get_areas(session, _geom),
> )
> print(result1)
> print(result2)
>
> toc = time.perf_counter()
> print(f"{toc - tic:0.4f} seconds")
>
> aiorun(_search())
>
> This code takes about 354.1 seconds to complete, I wonder if I’m doing
> something wrong?
>
> Thanks!
>
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google Groups
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/sqlalchemy/21c67742-87a5-4ab4-b5e5-4b8ae7f50900n%40googlegroups.com
>
> <https://groups.google.com/d/msgid/sqlalchemy/21c67742-87a5-4ab4-b5e5-4b8ae7f50900n%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>
>
>
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/sqlalchemy/8903c66d-2596-4cd1-88f0-154bccd17eebn%40googlegroups.com.