Author: Raffael Tfirst <[email protected]>
Branch: py3.5-async
Changeset: r86287:5f146fc69289
Date: 2016-08-18 20:17 +0200
http://bitbucket.org/pypy/pypy/changeset/5f146fc69289/
Log: Check for correct values in 'async for' test
diff --git a/pypy/module/_asyncio/test/test_asyncio.py
b/pypy/module/_asyncio/test/test_asyncio.py
--- a/pypy/module/_asyncio/test/test_asyncio.py
+++ b/pypy/module/_asyncio/test/test_asyncio.py
@@ -19,17 +19,14 @@
loop = asyncio.get_event_loop()
loop.run_until_complete(f())
-print("done with async loop")
"""
def test_async_for(self):
- # temporary test from
- #
http://blog.idego.pl/2015/12/05/back-to-the-future-with-async-and-await-in-python-3-5/
+ # tests if async for receives all stores values in the right order
+ # and if the correct methods __aiter__ and __anext__ get called
+ # and if the end results of run_until_complete are None (in a tuple)
"""
import asyncio
-import logging
-import sys
-logging.basicConfig(level=logging.INFO, stream=sys.stdout,
format="%(asctime)s: %(message)s")
class AsyncIter:
def __init__(self):
@@ -46,13 +43,32 @@
return self._data[self._index-1]
raise StopAsyncIteration
-async def do_loop():
- async for x in AsyncIter():
- logging.info(x)
+class Corotest(object):
+ def __init__(self):
+ self.res = "-"
+
+ async def do_loop(self):
+ async for x in AsyncIter():
+ self.res += str(x)
+ self.res += "-"
+cor = Corotest()
loop = asyncio.get_event_loop()
-futures = [asyncio.ensure_future(do_loop()), asyncio.ensure_future(do_loop())]
-loop.run_until_complete(asyncio.wait(futures))
+futures = [asyncio.ensure_future(cor.do_loop()),
asyncio.ensure_future(cor.do_loop())]
+taskres = loop.run_until_complete(asyncio.wait(futures))
+assert cor.res.count('0') == 2
+assert cor.res.count('1') == 2
+assert cor.res.count('2') == 2
+assert cor.res.count('3') == 2
+assert cor.res.count('4') == 2
+assert cor.res.find("0") < cor.res.find("1")
+assert cor.res.find("1") < cor.res.find("2")
+assert cor.res.find("2") < cor.res.find("3")
+assert cor.res.find("3") < cor.res.find("4")
+assert isinstance(taskres, tuple)
+assert len(taskres) == 2
+assert "result=None" in repr(taskres[0].pop())
+assert "result=None" in repr(taskres[0].pop())
"""
def test_asynchronous_context_managers(self):
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit