Thomas Nyberg wrote:
Hi,

Yeah so flask does support async (when installed with `pip3 install flask[async]), but you are making a good point that flask in this case is a distraction. Here's an example using just the standard library that exhibits the same issue:

`app.py`
```
import asyncio
import threading
import time
from queue import Queue


in_queue = Queue()
out_queue = Queue()


def worker():
     print("worker started running")
     while True:
         future = in_queue.get()
         print(f"worker got future: {future}")
         time.sleep(5)
         print("worker sleeped")
         out_queue.put(future)


def finalizer():
     print("finalizer started running")
     while True:
         future = out_queue.get()
         print(f"finalizer got future: {future}")
         future.set_result("completed")
         print("finalizer set result")


threading.Thread(target=worker).start()
threading.Thread(target=finalizer).start()


async def main():
     future = asyncio.get_event_loop().create_future()
     in_queue.put(future)
     print(f"main put future: {future}")
     result = await future
     print(result)


if __name__ == "__main__":
     loop = asyncio.get_event_loop()
     loop.run_until_complete(main())
```

If I run that I see the following printed out (after which is just hangs):

```
$ python3 app.py
worker started running
finalizer started running
main put future: <Future pending>
worker got future: <Future pending>
worker sleeped
finalizer got future: <Future pending cb=[Task.task_wakeup()]>
finalizer set result
```

I believe async uses a cooperative multitasking setup under the hood, so I presume the way I'm doing this threading just isn't playing well with that (and presumably some csp yield isn't happening somewhere). Anyway at this point I feel like the easiest approach is to just throw away threads entirely and learn how to do all I want fully in the brave new async world, but I'm still curious why this is failing and how to make this sort of setup work since it points to my not understanding the basic implementation/semantics of async in python.

Thanks for any help!

/Thomas

On 3/22/24 08:27, Lars Liedtke via Python-list wrote:
Hey,

As far as I know (might be old news) flask does not support asyncio.

You would have to use a different framework, like e.g. FastAPI or similar. Maybe someone has already written "flask with asyncio" but I don't know about that.

Cheers

Lars


Lars Liedtke
Lead Developer

[Tel.]  +49 721 98993-
[Fax]   +49 721 98993-
[E-Mail]        l...@solute.de<mailto:l...@solute.de>


solute GmbH
Zeppelinstraße 15
76185 Karlsruhe
Germany

[Marken]

Geschäftsführer | Managing Director: Dr. Thilo Gans, Bernd Vermaaten
Webseite | www.solute.de <http://www.solute.de/>
Sitz | Registered Office: Karlsruhe
Registergericht | Register Court: Amtsgericht Mannheim
Registernummer | Register No.: HRB 748044
USt-ID | VAT ID: DE234663798



Informationen zum Datenschutz | Information about privacy policy
https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php




Am 20.03.24 um 09:22 schrieb Thomas Nyberg via Python-list:

Hello,

I have a simple (and not working) example of what I'm trying to do. This is a simplified version of what I'm trying to achieve (obviously the background workers and finalizer functions will do more later):

`app.py`

```
import asyncio
import threading
import time
from queue import Queue

from flask import Flask

in_queue = Queue()
out_queue = Queue()


def worker():
    print("worker started running")
    while True:
        future = in_queue.get()
        print(f"worker got future: {future}")
        time.sleep(5)
        print("worker sleeped")
        out_queue.put(future)


def finalizer():
    print("finalizer started running")
    while True:
        future = out_queue.get()
        print(f"finalizer got future: {future}")
        future.set_result("completed")
        print("finalizer set result")


threading.Thread(target=worker, daemon=True).start()
threading.Thread(target=finalizer, daemon=True).start()

app = Flask(__name__)


@app.route("/")
async def root():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result = await future
    return result


if __name__ == "__main__":
    app.run()
```

If I start up that server, and execute `curl http://localhost:5000`, it prints out the following in the server before hanging:

```
$ python3 app.py
worker started running
finalizer started running
* Serving Flask app 'app'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
root put future: <Future pending>
worker got future: <Future pending cb=[Task.task_wakeup()]>
worker sleeped
finalizer got future: <Future pending cb=[Task.task_wakeup()]>
finalizer set result
```

Judging by what's printing out, the `final result = await future` doesn't seem to be happy here.

Maybe someone sees something obvious I'm doing wrong here? I presume I'm mixing threads and asyncio in a way I shouldn't be.

Aside from possible issues mixing threads and asyncio (I'm no expert on asyncio), there's also the issue that there's nothing to cause the threads to exit. The following doesn't use asyncio, but also hangs after the main thread has got the result:

```
import queue
import threading
import time

in_queue = queue.Queue()
out_queue = queue.Queue()
result_queue = queue.Queue()

def worker():
    print("worker started running")
    while True:
        item = in_queue.get()
        print(f"worker got item: {item}")
        time.sleep(5)
        print("worker sleeped")
        out_queue.put(item)


def finalizer():
    print("finalizer started running")
    while True:
        item = out_queue.get()
        print(f"finalizer got item: {item}")
        result_queue.put(item)
        print("finalizer set result")


threading.Thread(target=worker).start()
threading.Thread(target=finalizer).start()
# threading.Thread(target=worker, daemon=True).start()
# threading.Thread(target=finalizer, daemon=True).start()


def main():
    item = "Item to process"
    in_queue.put("Item to process")
    print(f"main put item: {item}")
    result = None
    while True:
        try:
            result = result_queue.get(timeout=1)
        except queue.Empty:
            # No result yet
            print("main waiting for result")
            continue
        break
    print(f"main got result {result}")


if __name__ == "__main__":
    main()
```

By default, the main process won't exit until there are no non-daemon threads still running. You can either send some sort of signal to the threads signal the threads to exit the loop and return cleanly (you'd also need a timeout on the queue `get()` calls). Or you can create the threads as "daemon" threads (as in the commented-out lines), in which case they'll be killed when all non-daemon threads have exited. Daemon threads don't get a chance to do any cleanup, close resources, etc. when they're killed, though, so aren't always appropriate.

--
Mark.
--
https://mail.python.org/mailman/listinfo/python-list

Reply via email to