06.12.2021 21:35, John Snow wrote:


On Fri, Dec 3, 2021 at 7:22 AM Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com 
<mailto:vsement...@virtuozzo.com>> wrote:

    Add -j <JOBS> parameter, to run tests in several jobs simultaneously.
    For realization - simply utilize multiprocessing.Pool class.

    Notes:

    1. Of course, tests can't run simultaneously in same TEST_DIR. So,
        use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/
        instead of simply TEST_DIR and SOCK_DIR


SOCK_DIR was introduced because TEST_DIR was getting too long, and the length 
of the filepath was causing problems on some platforms. I hope that we aren't 
pushing our luck by making the directory longer here. Using the test name is 
nice because a human operator can quickly correlate directories to the tests 
that populated them, but if test names get kind of long, I wonder if we'll 
cause problems again.

Just a stray thought.

    2. multiprocessing.Pool.starmap function doesn't support passing
        context managers, so we can't simply pass "self". Happily, we need
        self only for read-only access, and it just works if it is defined
        in global space. So, add a temporary link TestRunner.shared_self
        during run_tests().


I'm a little confused on this point, see below

    Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com 
<mailto:vsement...@virtuozzo.com>>
    ---
      tests/qemu-iotests/check         |  4 +-
      tests/qemu-iotests/testrunner.py | 69 ++++++++++++++++++++++++++++----
      2 files changed, 64 insertions(+), 9 deletions(-)

    diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
    index 43a4b694cc..0c27721a41 100755
    --- a/tests/qemu-iotests/check
    +++ b/tests/qemu-iotests/check
    @@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser:
                         help='show me, do not run tests')
          p.add_argument('-makecheck', action='store_true',
                         help='pretty print output for make check')
    +    p.add_argument('-j', dest='jobs', type=int, default=1,
    +                   help='run tests in multiple parallel jobs')

          p.add_argument('-d', dest='debug', action='store_true', help='debug')
          p.add_argument('-p', dest='print', action='store_true',
    @@ -165,6 +167,6 @@ if __name__ == '__main__':
              with TestRunner(env, makecheck=args.makecheck,
                              color=args.color) as tr:
                  paths = [os.path.join(env.source_iotests, t) for t in tests]
    -            ok = tr.run_tests(paths)
    +            ok = tr.run_tests(paths, args.jobs <http://args.jobs>)
                  if not ok:
                      sys.exit(1)


(OK)

    diff --git a/tests/qemu-iotests/testrunner.py 
b/tests/qemu-iotests/testrunner.py
    index a9f2feb58c..0feaa396d0 100644
    --- a/tests/qemu-iotests/testrunner.py
    +++ b/tests/qemu-iotests/testrunner.py
    @@ -26,6 +26,7 @@
      import json
      import termios
      import sys
    +from multiprocessing import Pool
      from contextlib import contextmanager
      from typing import List, Optional, Iterator, Any, Sequence, Dict, \
              ContextManager
    @@ -126,6 +127,31 @@ def __init__(self, status: str, description: str = '',


      class TestRunner(ContextManager['TestRunner']):
    +    shared_self = None

    +
    +    @staticmethod
    +    def proc_run_test(test: str, test_field_width: int) -> TestResult:
    +        # We are in a subprocess, we can't change the runner object!


*can't*, or shouldn't? I thought changing anything would just result in the 
child process diverging, but nothing harmful overall. Am I mistaken?

Yes you are right. "Shouldn't" is OK


    +        runner = TestRunner.shared_self
    +        assert runner is not None
    +        return runner.run_test(test, test_field_width, mp=True)
    +
    +    def run_tests_pool(self, tests: List[str],
    +                       test_field_width: int, jobs: int) -> 
List[TestResult]:
    +
    +        # passing self directly to Pool.starmap() just doesn't work, 
because
    +        # it's a context manager.


Why, what happens? (Or what doesn't happen?)

(I believe you that it doesn't work, but it's not immediately obvious to me 
why.)

Simple check:

diff --git a/tests/qemu-iotests/testrunner.py b/tests/qemu-iotests/testrunner.py
index 0feaa396d0..49c1780697 100644
--- a/tests/qemu-iotests/testrunner.py
+++ b/tests/qemu-iotests/testrunner.py
@@ -130,7 +130,7 @@ class TestRunner(ContextManager['TestRunner']):
     shared_self = None
@staticmethod
-    def proc_run_test(test: str, test_field_width: int) -> TestResult:
+    def proc_run_test(x, test: str, test_field_width: int) -> TestResult:
         # We are in a subprocess, we can't change the runner object!
         runner = TestRunner.shared_self
         assert runner is not None
@@ -146,7 +146,7 @@ def run_tests_pool(self, tests: List[str],
with Pool(jobs) as p:
             results = p.starmap(self.proc_run_test,
-                                zip(tests, [test_field_width] * len(tests)))
+                                [(self, t, test_field_width) for t in tests])
TestRunner.shared_self = None


Something like this happens:

  File 
"/work/src/qemu/up/up-iotests-multiprocessing/build/tests/qemu-iotests/./check", line 
170, in <module>
    ok = tr.run_tests(paths, args.jobs)
  File 
"/work/src/qemu/up/up-iotests-multiprocessing/tests/qemu-iotests/testrunner.py",
 line 383, in run_tests
    results = self.run_tests_pool(tests, test_field_width, jobs)
  File 
"/work/src/qemu/up/up-iotests-multiprocessing/tests/qemu-iotests/testrunner.py",
 line 149, in run_tests_pool
    results = p.starmap(self.proc_run_test,
  File "/usr/lib64/python3.9/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/lib64/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib64/python3.9/multiprocessing/pool.py", line 537, in 
_handle_tasks
    put(task)
  File "/usr/lib64/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'generator' object


Hmm, I remember that was cannot pickle context manager.. Probably I remember wrong :) 
Honestly I didn't dig into it except for detecting that not passing "self" 
fixes the problem.


    +        assert TestRunner.shared_self is None
    +        TestRunner.shared_self = self
    +
    +        with Pool(jobs) as p:
    +            results = p.starmap(self.proc_run_test,
    +                                zip(tests, [test_field_width] * 
len(tests)))
    +
    +        TestRunner.shared_self = None
    +
    +        return results
    +
          def __init__(self, env: TestEnv, makecheck: bool = False,
                       color: str = 'auto') -> None:
              self.env = env
    @@ -219,11 +245,16 @@ def find_reference(self, test: str) -> str:

              return f'{test}.out'

    -    def do_run_test(self, test: str) -> TestResult:
    +    def do_run_test(self, test: str, mp: bool) -> TestResult:
              """
              Run one test

              :param test: test file path
    +        :param mp: if true, we are in a multiprocessing environment, use
    +                   personal subdirectories for test run
    +
    +        Note: this method may be called from subprocess, so it does not
    +        change ``self`` object in any way!
              """


Maybe worth mentioning that it *does* change environment variables, but because this is 
"mp", it won't affect the parent execution environment.


Hmm. actually, it does not change it. And yes the reason is that we'll not 
change the original object anyway, so any logic that change the runner object 
in hope that it will make some effect would be wrong.




              f_test = Path(test)
    @@ -249,6 +280,12 @@ def do_run_test(self, test: str) -> TestResult:

              args = [str(f_test.resolve())]
              env = self.env.prepare_subprocess(args)
    +        if mp:
    +            # Split test directories, so that tests running in parallel 
don't
    +            # break each other.
    +            for d in ['TEST_DIR', 'SOCK_DIR']:
    +                env[d] = os.path.join(env[d], f_test.name 
<http://f_test.name>)
    +                Path(env[d]).mkdir(parents=True, exist_ok=True)

              t0 = time.time()
              with f_bad.open('w', encoding="utf-8") as f:
    @@ -291,23 +328,32 @@ def do_run_test(self, test: str) -> TestResult:
                                    casenotrun=casenotrun)

          def run_test(self, test: str,
    -                 test_field_width: Optional[int] = None) -> TestResult:
    +                 test_field_width: Optional[int] = None,
    +                 mp: bool = False) -> TestResult:
              """
              Run one test and print short status

              :param test: test file path
              :param test_field_width: width for first field of status format
    +        :param mp: if true, we are in a multiprocessing environment, don't 
try
    +                   to rewrite things in stdout
    +
    +        Note: this method may be called from subprocess, so it does not
    +        change ``self`` object in any way!
              """

              last_el = self.last_elapsed.get(test)
              start = datetime.datetime.now().strftime('%H:%M:%S')

              if not self.makecheck:
    -            self.test_print_one_line(test=test, starttime=start,
    -                                     lasttime=last_el, end='\r',
    +            self.test_print_one_line(test=test,
    +                                     status = 'started' if mp else '...',
    +                                     starttime=start,
    +                                     lasttime=last_el,
    +                                     end = '\n' if mp else '\r',
                                           test_field_width=test_field_width)

    -        res = self.do_run_test(test)
    +        res = self.do_run_test(test, mp)

              end = datetime.datetime.now().strftime('%H:%M:%S')
              self.test_print_one_line(test=test, status=res.status,

    @@ -321,7 +367,7 @@ def run_test(self, test: str,

              return res

    -    def run_tests(self, tests: List[str]) -> bool:
    +    def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
              n_run = 0
              failed = []
              notrun = []
    @@ -332,9 +378,16 @@ def run_tests(self, tests: List[str]) -> bool:

              test_field_width = max(len(os.path.basename(t)) for t in tests) + 
2

    -        for t in tests:
    +        if jobs > 1:
    +            results = self.run_tests_pool(tests, test_field_width, jobs)
    +
    +        for i, t in enumerate(tests):
                  name = os.path.basename(t)
    -            res = self.run_test(t, test_field_width=test_field_width)
    +
    +            if jobs > 1:
    +                res = results[i]
    +            else:
    +                res = self.run_test(t, test_field_width)

                  assert res.status in ('pass', 'fail', 'not run')


Looks good and surprisingly minimal, I just have a curiosity about the nature 
of the workaround here.

Either way, I believe this will probably work as written, so I can give it an 
ACK at a minimum while I wait for answers.

Acked-by: John Snow <js...@redhat.com <mailto:js...@redhat.com>>


Thanks!

Yes, the workaround is a ugly.. But it's small, so I think we could live with.

I don't think that refactoring TestRunner to move all needed things to some 
simple structure supported by Pool is good idea: actually, we don't want to 
copy these data for each subprocess, we are OK with readonly access to shared 
object. And we do call methods on self, and on self.env, so refactoring would 
not be simple.

But about shared object, I didn't find any way to pass a link to shared object 
to Pool.map()..   Something like Pool.map( , ... , shared_state=self) would be 
good. But were is such an option? Note that this is my first experience with 
multiprocessing.

The only thing I find is passing through global variable. I started with real 
global variably, but then thought that hiding it inside the class would be a 
bit better.

--
Best regards,
Vladimir

Reply via email to