The stack overflow link: https://stackoverflow.com/questions/56696562/running-long-blocking-calculations-in-parallel-in-twisted
On Mon, Jun 24, 2019 at 7:26 PM Chengi Liu <chengi.liu...@gmail.com> wrote: > Thanks Moshe & Meejah & Gelin for the suggestions and advice. This is > super helpful. > > I think, I am able to move forward with this. > Let me just summarize this.. > My usecase is.. fetch the data.. and then you assemble the data. Fetching > data is network bound, assembling data is like CPU bound. > > Can you guys confirm if what I am doing makes sense? > > def compute_heavy_function(x): > return x*x > > @defer.inlinecallbacks > def network_get(x): > x = yield treq.get('http://localhost:8081') > content = yield x.content() > defer.returnValue(val) > > @defer.inlinecallbacks > def twisted_do_your_magic(): > nets, cpus = [], [] > for i in range(10): > t = defer.ensureDeferred(network_get(i)) > nets.append(t) > d = threads.deferToThread(compute_heavy_function, i) > > cpus.append(d) > > cpu_res = yield defer.gatherResults(cpus) > network_res = yield defer.gatherResults(nets) > defer.returnValue({'cpu': cpu_res, 'network': network_res}) > > if __name__ == '__main__': > twisted_do_your_magic() > reactor.callLater(2, reactor.stop) > reactor.run() > > > I ran it locally.. it seems to be running fine. But just want to make sure > that I got the concept on what to deferToThread & what to "ensureDeferred". > From the SO, I got the impression that network based IO benefits from > `deferToThread` but from video tutorial.. I got the impression that > ensureDefer followed by gatherResults seems to be the right way to go? > > > > Moshe.. One last question.. > I was trying to follow the tutorial on video lecture.. > But, I wasnt able to make it run on python3. > > > Say, I have an async function > > async def foo(): > resp = await treq.get("localhost:1234/foo") > content = await resp.content() > return json.loads(content.decode("utf-8") > > async def func(): > d1 = defer.ensureDeffered(foo()) > d2 = defer.ensureDeffered(foo()) > res = await defer.gatherResults([d1, d2]) > return res > > if __name__ == '__main__' > x = func() > reactor.callLater(2, reactor.stop) > reactor.run() > > In this case, I get an error (x = func() in main code block).. > RuntimeWarning: coroutine 'func' was never awaited > How do i fix this. > > Again, thanks for all the help, support and advice in getting me started > with twisted. > > > > On Mon, Jun 24, 2019 at 3:49 PM meejah <mee...@meejah.ca> wrote: > >> >> As a clarification to the above, parallelization of Python code across >> cores is not unique to Twisted; all Python code has this same >> limitation. >> >> To use multiple cores with Python code, you need multiple Python >> processes (as has been pointed out). One way to achieve this is to have >> the multiple processes talking to each other (using some kind of RPC >> protocol). >> >> Another way is to simply spawn some number of subprocesses (and Twisted >> has good support for running subprocesses). So, for example, if you >> write a CLI tool that can be told to run "part of your problem" then >> your parent Twisted process can simply spawn some number of those with >> appropriate arguments to split up the problem (e.g. give each process 1 >> / num_cores of the problem). This will incur some startup penalty as >> each process starts up (especially if you're using PyPy, which you >> should be if you care about speed) but is way simpler. >> >> Obviously, an RPC-style communication system avoids the startup penalty >> (but can be more complex). >> >> -- >> meejah >> >> _______________________________________________ >> Twisted-Python mailing list >> Twisted-Python@twistedmatrix.com >> https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python >> >
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python