On Fri, Aug 12, 2016 at 10:08 AM, Steven D'Aprano
<steve+pyt...@pearwood.info> wrote:
> Let me simulate a slow function call:
>
>
> import random, time
>
> def work(id):
>     print("starting with id", id)
>     workload = random.randint(5, 15)
>     for i in range(workload):
>         time.sleep(0.2)  # pretend to do some real work
>         print("processing id", id)  # let the user see some progress
>     print("done with id", id)
>     return 10 + id
>
>
> pending = [1, 2, 3, 4]
>
> for i, n in enumerate(pending):
>     pending[i] = work(n)
>
>
> How do I write work() so that it cooperatively multi-tasks with other ...
> threads? processes? what the hell do we call these things? What does this
> example become in the asynchronous world?
>
> In this case, all the work is pure computation, so I don't expect to save
> any time by doing this, because the work is still all being done in the
> same process/thread, not in parallel. It may even take a little bit longer,
> due to the overhead of switching from one to another.
>
> (I presume that actual disk or network I/O may be better, because the OS
> will run the I/O in parallel if possible, but I don't expect that in this
> case.)
>
> Am I getting close?

Well, let's see. I can quickly tweak my select() demo to support time delays.

# Partially borrowed from example in Python docs:
# https://docs.python.org/3/library/selectors.html#examples
import selectors
import socket
import time

sel = selectors.DefaultSelector()
sleepers = {} # In a non-toy, this would probably be a heap, not a dict
def eventloop():
    while "loop forever":
        t = time.time()
        for gen, tm in list(sleepers.items()):
            if tm <= t:
                del sleepers[gen]
                run_task(gen)
        delay = min(sleepers.values(), default=t+3600) - t
        if delay < 0: continue
        for key, mask in sel.select(timeout=delay):
            sel.unregister(key.fileobj)
            run_task(key.data)

def run_task(gen):
    try:
        waitfor = next(gen)
        if isinstance(waitfor, float):
            sleepers[gen] = waitfor
        else:
            sel.register(waitfor, selectors.EVENT_READ, gen)
    except StopIteration:
        pass

def sleep(tm):
    yield time.time() + tm

def mainsock():
    sock = socket.socket()
    sock.bind(('localhost', 1234))
    sock.listen(100)
    sock.setblocking(False)
    print("Listening on port 1234.")
    while "moar sockets":
        yield sock
        conn, addr = sock.accept()  # Should be ready
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        run_task(client(conn))

def client(conn):
    while "moar data":
        yield conn
        data = conn.recv(1000)  # Should be ready
        if not data: break
        print("Got data")
        # At this point, you'd do something smart with the data.
        # But we don't. We just echo back, after a delay.
        yield from sleep(3)
        conn.send(data)  # Hope it won't block
        if b"quit" in data: break
    print('closing', conn)
    conn.close()

if __name__ == '__main__':
    run_task(mainsock())
    eventloop()



So, if it used this tiny event loop, your code would look like this:

def work(id):
    print("starting with id", id)
    workload = random.randint(5, 15)
    for i in range(workload):
        yield from sleep(0.2)  # pretend to do some real work
        print("processing id", id)  # let the user see some progress
    print("done with id", id)
    return 10 + id

for n in [1, 2, 3, 4]:
    run_task(work(n))
eventloop()

But crucially, this depends on having some kind of waitable work. That
generally means either non-blocking I/O (of some sort - a lot of
things in a Unix system end up being reads and writes to some
file-like thing, eg a socket, pipe, or device), or a time delay, or
maybe waiting on a signal. If the 'work' is a blocking CPU-bound
operation, this will never be able to multiplex.

ChrisA
-- 
https://mail.python.org/mailman/listinfo/python-list

Reply via email to