4 new commits in pytest-xdist:
https://bitbucket.org/hpk42/pytest-xdist/commits/2b3f7baf4fa0/
Changeset: 2b3f7baf4fa0
User: hpk42
Date: 2014-01-27 11:37:26
Summary: a failing test for pytest issue419
Affected #: 2 files
diff -r 1138fbd617e138dad6c72a6492d0b7753e61eae9 -r
2b3f7baf4fa00f49b6300e56b7c134167ab0f24d testing/test_dsession.py
--- a/testing/test_dsession.py
+++ b/testing/test_dsession.py
@@ -6,6 +6,7 @@
)
from _pytest import main as outcome
import py
+import pytest
import execnet
XSpec = execnet.XSpec
@@ -209,3 +210,16 @@
report_collection_diff(from_collection, to_collection, 1, 2)
except AssertionError as e:
assert py.builtin._totext(e) == error_message
+
[email protected](reason="duplicate test ids not supported yet")
+def test_pytest_issue419(testdir):
+ testdir.makepyfile("""
+ import pytest
+
+ @pytest.mark.parametrize('birth_year', [1988, 1988, ])
+ def test_2011_table(birth_year):
+ pass
+ """)
+ reprec = testdir.inline_run("-n1")
+ reprec.assertoutcome(passed=2)
+ assert 0
diff -r 1138fbd617e138dad6c72a6492d0b7753e61eae9 -r
2b3f7baf4fa00f49b6300e56b7c134167ab0f24d xdist/dsession.py
--- a/xdist/dsession.py
+++ b/xdist/dsession.py
@@ -142,6 +142,8 @@
node.gateway.id,
)
+ # all collections are the same, good.
+ # we now create an index
self.pending = col
if not col:
return
https://bitbucket.org/hpk42/pytest-xdist/commits/d3a9df2d81a1/
Changeset: d3a9df2d81a1
User: hpk42
Date: 2014-01-27 11:37:27
Summary: remove unneccesary item2nodes data structure for load scheduling
Affected #: 1 file
diff -r 2b3f7baf4fa00f49b6300e56b7c134167ab0f24d -r
d3a9df2d81a14b7a33b067ff64f28ebfa7df146b xdist/dsession.py
--- a/xdist/dsession.py
+++ b/xdist/dsession.py
@@ -95,43 +95,27 @@
self.collection_is_completed = True
def remove_item(self, node, item):
- if item not in self.item2nodes:
- raise AssertionError(item, self.item2nodes)
- nodes = self.item2nodes[item]
- if node in nodes: # the node might have gone down already
- nodes.remove(node)
- #if not nodes:
- # del self.item2nodes[item]
- pending = self.node2pending[node]
- pending.remove(item)
+ node_pending = self.node2pending[node]
+ node_pending.remove(item)
# pre-load items-to-test if the node may become ready
- if self.pending and len(pending) < self.LOAD_THRESHOLD_NEWITEMS:
+ if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS:
item = self.pending.pop(0)
- pending.append(item)
- self.item2nodes.setdefault(item, []).append(node)
+ node_pending.append(item)
node.send_runtest(item)
self.log("items waiting for node: %d" %(len(self.pending)))
- #self.log("item2pending still executing: %s" %(self.item2nodes,))
#self.log("node2pending: %s" %(self.node2pending,))
def remove_node(self, node):
pending = self.node2pending.pop(node)
- # KeyError if we didn't get an addnode() yet
- for item in pending:
- l = self.item2nodes[item]
- l.remove(node)
- if not l:
- del self.item2nodes[item]
if not pending:
return
+ # the node must have crashed on the item if there are pending ones
crashitem = pending.pop(0)
self.pending.extend(pending)
return crashitem
def init_distribute(self):
assert self.collection_is_completed
- assert not hasattr(self, 'item2nodes')
- self.item2nodes = {}
# XXX allow nodes to have different collections
first_node, col = list(self.node2collection.items())[0]
for node, collection in self.node2collection.items():
@@ -154,7 +138,6 @@
nodeindex = i % num_available
node, pending = available[nodeindex]
node.send_runtest(item)
- self.item2nodes.setdefault(item, []).append(node)
pending.append(item)
if i >= max_one_round:
break
https://bitbucket.org/hpk42/pytest-xdist/commits/fe461fefe08f/
Changeset: fe461fefe08f
User: hpk42
Date: 2014-01-27 11:37:33
Summary: fix issue419: work with collection indices instead of node ids.
This reduces network message size.
Affected #: 6 files
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 CHANGELOG
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -7,6 +7,10 @@
- fix pytest issue382 - produce "pytest_runtest_logstart" event again
in master. Thanks Aron Curzon.
+- fix pytest issue419 by sending/receiving indices into the test
+ collection instead of node ids (which are not neccessarily unique
+ for functions parametrized with duplicate values)
+
1.9
-------------------------
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 testing/test_dsession.py
--- a/testing/test_dsession.py
+++ b/testing/test_dsession.py
@@ -64,9 +64,9 @@
assert sched.tests_finished()
assert node1.sent == ['ALL']
assert node2.sent == ['ALL']
- sched.remove_item(node1, collection[0])
+ sched.remove_item(node1, 0)
assert sched.tests_finished()
- sched.remove_item(node2, collection[0])
+ sched.remove_item(node2, 0)
assert sched.tests_finished()
def test_schedule_remove_node(self):
@@ -105,7 +105,7 @@
assert len(node1.sent) == 1
assert len(node2.sent) == 1
x = sorted(node1.sent + node2.sent)
- assert x == collection
+ assert x == [0, 1]
sched.remove_item(node1, node1.sent[0])
sched.remove_item(node2, node2.sent[0])
assert sched.tests_finished()
@@ -126,14 +126,14 @@
sent1 = node1.sent
sent2 = node2.sent
chunkitems = col[:sched.ITEM_CHUNKSIZE]
- assert sent1 == chunkitems
- assert sent2 == chunkitems
+ assert (sent1 == [0,2] and sent2 == [1,3]) or (
+ sent1 == [1,3] and sent2 == [0,2])
assert sched.node2pending[node1] == sent1
assert sched.node2pending[node2] == sent2
assert len(sched.pending) == 1
for node in (node1, node2):
- for i in range(sched.ITEM_CHUNKSIZE):
- sched.remove_item(node, "xyz")
+ for i in sched.node2pending[node]:
+ sched.remove_item(node, i)
assert not sched.pending
def test_add_remove_node(self):
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 testing/test_remote.py
--- a/testing/test_remote.py
+++ b/testing/test_remote.py
@@ -154,7 +154,7 @@
assert ev.kwargs['topdir'] == slave.testdir.tmpdir
ids = ev.kwargs['ids']
assert len(ids) == 1
- slave.sendcommand("runtests", ids=ids)
+ slave.sendcommand("runtests", indices=range(len(ids)))
slave.sendcommand("shutdown")
ev = slave.popevent("logstart")
assert ev.kwargs["nodeid"].endswith("test_func")
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/dsession.py
--- a/xdist/dsession.py
+++ b/xdist/dsession.py
@@ -40,15 +40,15 @@
if len(self.node2pending) >= self.numnodes:
self.collection_is_completed = True
- def remove_item(self, node, item):
- self.node2pending[node].remove(item)
+ def remove_item(self, node, item_index):
+ self.node2pending[node].remove(item_index)
def remove_node(self, node):
# KeyError if we didn't get an addnode() yet
pending = self.node2pending.pop(node)
if not pending:
return
- crashitem = pending.pop(0)
+ crashitem = self.node2collection[node][pending.pop(0)]
# XXX what about the rest of pending?
return crashitem
@@ -56,7 +56,7 @@
assert self.collection_is_completed
for node, pending in self.node2pending.items():
node.send_runtest_all()
- pending[:] = self.node2collection[node]
+ pending[:] = range(len(self.node2collection[node]))
class LoadScheduling:
LOAD_THRESHOLD_NEWITEMS = 5
@@ -94,14 +94,15 @@
if len(self.node2collection) >= self.numnodes:
self.collection_is_completed = True
- def remove_item(self, node, item):
+ def remove_item(self, node, item_index):
node_pending = self.node2pending[node]
- node_pending.remove(item)
+ assert item_index in node_pending, (item_index, node_pending)
+ node_pending.remove(item_index)
# pre-load items-to-test if the node may become ready
if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS:
- item = self.pending.pop(0)
- node_pending.append(item)
- node.send_runtest(item)
+ item_index = self.pending.pop(0)
+ node_pending.append(item_index)
+ node.send_runtest(item_index)
self.log("items waiting for node: %d" %(len(self.pending)))
#self.log("node2pending: %s" %(self.node2pending,))
@@ -110,7 +111,7 @@
if not pending:
return
# the node must have crashed on the item if there are pending ones
- crashitem = pending.pop(0)
+ crashitem = self.collection[pending.pop(0)]
self.pending.extend(pending)
return crashitem
@@ -128,17 +129,18 @@
# all collections are the same, good.
# we now create an index
- self.pending = col
+ self.collection = col
+ self.pending = range(len(col))
if not col:
return
available = list(self.node2pending.items())
num_available = self.numnodes
max_one_round = num_available * self.ITEM_CHUNKSIZE - 1
- for i, item in enumerate(self.pending):
+ for i, item_index in enumerate(self.pending):
nodeindex = i % num_available
node, pending = available[nodeindex]
- node.send_runtest(item)
- pending.append(item)
+ node.send_runtest(item_index)
+ pending.append(item_index)
if i >= max_one_round:
break
del self.pending[:i + 1]
@@ -304,7 +306,7 @@
def slave_testreport(self, node, rep):
if not (rep.passed and rep.when != "call"):
if rep.when in ("setup", "call"):
- self.sched.remove_item(node, rep.nodeid)
+ self.sched.remove_item(node, rep.item_index)
#self.report_line("testreport %s: %s" %(rep.id, rep.status))
rep.node = node
self.config.hook.pytest_runtest_logreport(report=rep)
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/remote.py
--- a/xdist/remote.py
+++ b/xdist/remote.py
@@ -47,39 +47,44 @@
name, kwargs = self.channel.receive()
self.log("received command %s(**%s)" % (name, kwargs))
if name == "runtests":
- ids = kwargs['ids']
- for nodeid in ids:
- torun.append(self._id2item[nodeid])
+ torun.extend(kwargs['indices'])
elif name == "runtests_all":
- torun.extend(session.items)
- self.log("items to run: %s" %(len(torun)))
+ torun.extend(range(len(session.items)))
+ self.log("items to run: %s" % (torun,))
while len(torun) >= 2:
- item = torun.pop(0)
- nextitem = torun[0]
- self.config.hook.pytest_runtest_protocol(item=item,
- nextitem=nextitem)
+ # we store item_index so that we can pick it up from the
+ # runtest hooks
+ self.run_one_test(torun)
+
if name == "shutdown":
while torun:
- self.config.hook.pytest_runtest_protocol(
- item=torun.pop(0), nextitem=None)
+ self.run_one_test(torun)
break
return True
+ def run_one_test(self, torun):
+ items = self.session.items
+ self.item_index = torun.pop(0)
+ if torun:
+ nextitem = items[torun[0]]
+ else:
+ nextitem = None
+ self.config.hook.pytest_runtest_protocol(
+ item=items[self.item_index],
+ nextitem=nextitem)
+
def pytest_collection_finish(self, session):
- self._id2item = {}
- ids = []
- for item in session.items:
- self._id2item[item.nodeid] = item
- ids.append(item.nodeid)
self.sendevent("collectionfinish",
topdir=str(session.fspath),
- ids=ids)
+ ids=[item.nodeid for item in session.items])
def pytest_runtest_logstart(self, nodeid, location):
self.sendevent("logstart", nodeid=nodeid, location=location)
def pytest_runtest_logreport(self, report):
data = serialize_report(report)
+ data["item_index"] = self.item_index
+ assert self.session.items[self.item_index].nodeid == report.nodeid
self.sendevent("testreport", data=data)
def pytest_collectreport(self, report):
diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r
fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/slavemanage.py
--- a/xdist/slavemanage.py
+++ b/xdist/slavemanage.py
@@ -241,8 +241,8 @@
self.gateway.exit()
#del self.gateway
- def send_runtest(self, nodeid):
- self.sendcommand("runtests", ids=[nodeid])
+ def send_runtest(self, index):
+ self.sendcommand("runtests", indices=[index])
def send_runtest_all(self):
self.sendcommand("runtests_all",)
@@ -292,7 +292,10 @@
elif eventname == "logstart":
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname in ("testreport", "collectreport",
"teardownreport"):
+ item_index = kwargs.pop("item_index", None)
rep = unserialize_report(eventname, kwargs['data'])
+ if item_index is not None:
+ rep.item_index = item_index
self.notify_inproc(eventname, node=self, rep=rep)
elif eventname == "collectionfinish":
self.notify_inproc(eventname, node=self, ids=kwargs['ids'])
@@ -307,8 +310,7 @@
self.config.pluginmanager.notify_exception(excinfo)
def unserialize_report(name, reportdict):
- d = reportdict
if name == "testreport":
- return runner.TestReport(**d)
+ return runner.TestReport(**reportdict)
elif name == "collectreport":
- return runner.CollectReport(**d)
+ return runner.CollectReport(**reportdict)
https://bitbucket.org/hpk42/pytest-xdist/commits/05aa4f48306c/
Changeset: 05aa4f48306c
User: hpk42
Date: 2014-01-27 11:37:40
Summary: send multiple "to test" indices in one network message to a slave
and improve heuristics for sending chunks where the chunksize
depends on the number of remaining tests rather than fixed numbers.
This reduces the number of master -> node messages (but not the
reverse direction)
Affected #: 8 files
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 CHANGELOG
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,13 @@
collection instead of node ids (which are not neccessarily unique
for functions parametrized with duplicate values)
+- send multiple "to test" indices in one network message to a slave
+ and improve heuristics for sending chunks where the chunksize
+ depends on the number of remaining tests rather than fixed numbers.
+ This reduces the number of master -> node messages (but not the
+ reverse direction)
+
+
1.9
-------------------------
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 setup.py
--- a/setup.py
+++ b/setup.py
@@ -13,7 +13,7 @@
packages = ['xdist'],
entry_points = {'pytest11': ['xdist = xdist.plugin'],},
zip_safe=False,
- install_requires = ['execnet>=1.1', 'pytest>=2.3.5'],
+ install_requires = ['execnet>=1.1', 'pytest>=2.4.2'],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 testing/test_dsession.py
--- a/testing/test_dsession.py
+++ b/testing/test_dsession.py
@@ -29,15 +29,12 @@
self.sent = []
self.gateway = MockGateway()
- def send_runtest(self, nodeid):
- self.sent.append(nodeid)
+ def send_runtest_some(self, indices):
+ self.sent.extend(indices)
def send_runtest_all(self):
self.sent.append("ALL")
- def sendlist(self, items):
- self.sent.extend(items)
-
def shutdown(self):
self._shutdown=True
@@ -117,17 +114,16 @@
node2 = MockNode()
sched.addnode(node1)
sched.addnode(node2)
- sched.ITEM_CHUNKSIZE = 2
- col = ["xyz"] * (2*sched.ITEM_CHUNKSIZE +1)
+ col = ["xyz"] * (3)
sched.addnode_collection(node1, col)
sched.addnode_collection(node2, col)
sched.init_distribute()
#assert not sched.tests_finished()
sent1 = node1.sent
sent2 = node2.sent
- chunkitems = col[:sched.ITEM_CHUNKSIZE]
- assert (sent1 == [0,2] and sent2 == [1,3]) or (
- sent1 == [1,3] and sent2 == [0,2])
+ chunkitems = col[:1]
+ assert (sent1 == [0] and sent2 == [1]) or (
+ sent1 == [1] and sent2 == [0])
assert sched.node2pending[node1] == sent1
assert sched.node2pending[node2] == sent2
assert len(sched.pending) == 1
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 testing/test_remote.py
--- a/testing/test_remote.py
+++ b/testing/test_remote.py
@@ -154,7 +154,7 @@
assert ev.kwargs['topdir'] == slave.testdir.tmpdir
ids = ev.kwargs['ids']
assert len(ids) == 1
- slave.sendcommand("runtests", indices=range(len(ids)))
+ slave.sendcommand("runtests", indices=list(range(len(ids))))
slave.sendcommand("shutdown")
ev = slave.popevent("logstart")
assert ev.kwargs["nodeid"].endswith("test_func")
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -3,7 +3,7 @@
[testenv]
changedir=testing
-deps=pytest>=2.4.2
+deps=pytest>=2.5.1
commands= py.test --junitxml={envlogdir}/junit-{envname}.xml []
[testenv:py27]
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/dsession.py
--- a/xdist/dsession.py
+++ b/xdist/dsession.py
@@ -59,9 +59,6 @@
pending[:] = range(len(self.node2collection[node]))
class LoadScheduling:
- LOAD_THRESHOLD_NEWITEMS = 5
- ITEM_CHUNKSIZE = 10
-
def __init__(self, numnodes, log=None):
self.numnodes = numnodes
self.node2pending = {}
@@ -96,15 +93,24 @@
def remove_item(self, node, item_index):
node_pending = self.node2pending[node]
- assert item_index in node_pending, (item_index, node_pending)
node_pending.remove(item_index)
# pre-load items-to-test if the node may become ready
- if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS:
- item_index = self.pending.pop(0)
- node_pending.append(item_index)
- node.send_runtest(item_index)
- self.log("items waiting for node: %d" %(len(self.pending)))
- #self.log("node2pending: %s" %(self.node2pending,))
+
+ if self.pending:
+ # how many nodes do we have remaining per node roughly?
+ num_nodes = len(self.node2pending)
+ # if our node goes below a heuristic minimum, fill it out to
+ # heuristic maximum
+ items_per_node_min = max(
+ 1, len(self.pending) // num_nodes // 4)
+ items_per_node_max = max(
+ 1, len(self.pending) // num_nodes // 2)
+ if len(node_pending) <= items_per_node_min:
+ num_send = items_per_node_max - len(node_pending) + 1
+ self._send_tests(node, num_send)
+
+ self.log("num items waiting for node:", len(self.pending))
+ #self.log("node2pending:", self.node2pending)
def remove_node(self, node):
pending = self.node2pending.pop(node)
@@ -118,8 +124,9 @@
def init_distribute(self):
assert self.collection_is_completed
# XXX allow nodes to have different collections
- first_node, col = list(self.node2collection.items())[0]
- for node, collection in self.node2collection.items():
+ node_collection_items = list(self.node2collection.items())
+ first_node, col = node_collection_items[0]
+ for node, collection in node_collection_items[1:]:
report_collection_diff(
col,
collection,
@@ -130,21 +137,25 @@
# all collections are the same, good.
# we now create an index
self.collection = col
- self.pending = range(len(col))
+ self.pending[:] = range(len(col))
if not col:
return
- available = list(self.node2pending.items())
- num_available = self.numnodes
- max_one_round = num_available * self.ITEM_CHUNKSIZE - 1
- for i, item_index in enumerate(self.pending):
- nodeindex = i % num_available
- node, pending = available[nodeindex]
- node.send_runtest(item_index)
- pending.append(item_index)
- if i >= max_one_round:
- break
- del self.pending[:i + 1]
+ # how many items per node do we have about?
+ items_per_node = len(self.collection) // len(self.node2pending)
+ # take half of it for initial distribution, at least 1
+ node_chunksize = max(items_per_node // 2, 1)
+ # and initialize each node with a chunk of tests
+ for node in self.node2pending:
+ self._send_tests(node, node_chunksize)
+ #f = open("/tmp/sent", "w")
+ def _send_tests(self, node, num):
+ tests_per_node = self.pending[:num]
+ #print >>self.f, "sent", node, tests_per_node
+ if tests_per_node:
+ del self.pending[:num]
+ self.node2pending[node].extend(tests_per_node)
+ node.send_runtest_some(tests_per_node)
def report_collection_diff(from_collection, to_collection, from_id, to_id):
"""Report the collected test difference between two nodes.
@@ -243,7 +254,7 @@
assert callname, kwargs
method = "slave_" + callname
call = getattr(self, method)
- self.log("calling method: %s(**%s)" % (method, kwargs))
+ self.log("calling method", method, kwargs)
call(**kwargs)
if self.sched.tests_finished():
self.triggershutdown()
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/remote.py
--- a/xdist/remote.py
+++ b/xdist/remote.py
@@ -24,7 +24,7 @@
def pytest_internalerror(self, excrepr):
for line in str(excrepr).split("\n"):
- self.log("IERROR> " + line)
+ self.log("IERROR>", line)
def pytest_sessionstart(self, session):
self.session = session
@@ -45,24 +45,19 @@
torun = []
while 1:
name, kwargs = self.channel.receive()
- self.log("received command %s(**%s)" % (name, kwargs))
+ self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs['indices'])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
- self.log("items to run: %s" % (torun,))
- while len(torun) >= 2:
- # we store item_index so that we can pick it up from the
- # runtest hooks
- self.run_one_test(torun)
-
+ self.log("items to run:", torun)
+ while torun:
+ self.run_tests(torun)
if name == "shutdown":
- while torun:
- self.run_one_test(torun)
break
return True
- def run_one_test(self, torun):
+ def run_tests(self, torun):
items = self.session.items
self.item_index = torun.pop(0)
if torun:
diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r
05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/slavemanage.py
--- a/xdist/slavemanage.py
+++ b/xdist/slavemanage.py
@@ -241,8 +241,8 @@
self.gateway.exit()
#del self.gateway
- def send_runtest(self, index):
- self.sendcommand("runtests", indices=[index])
+ def send_runtest_some(self, indices):
+ self.sendcommand("runtests", indices=indices)
def send_runtest_all(self):
self.sendcommand("runtests_all",)
Repository URL: https://bitbucket.org/hpk42/pytest-xdist/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
_______________________________________________
pytest-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pytest-commit