Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2019-01-08 12:29:06 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.28833 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Tue Jan 8 12:29:06 2019 rev:14 rq:663294 version:1.25.2 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2018-12-31 09:47:20.278169858 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.28833/python-distributed.changes 2019-01-08 12:31:21.972083379 +0100 @@ -1,0 +2,25 @@ +Sun Jan 6 22:33:33 UTC 2019 - Arun Persaud <a...@gmx.de> + +- specfile: + * update copyright year + +- update to version 1.25.2: + * Clean up LocalCluster logging better in async mode (:pr:`2448`) + Matthew Rocklin + * Add short error message if bokeh cannot be imported (:pr:`2444`) + Dirk Petersen + * Add optional environment variables to Nanny (:pr:`2431`) Matthew + Rocklin + * Make the direct keyword docstring entries uniform (:pr:`2441`) + Matthew Rocklin + * Make LocalCluster.close async friendly (:pr:`2437`) Matthew + Rocklin + * gather_dep: don't request dependencies we already found out we + don't want (:pr:`2428`) tjb900 + * Add parameters to Client.run docstring (:pr:`2429`) Matthew + Rocklin + * Support coroutines and async-def functions in run/run_scheduler + (:pr:`2427`) Matthew Rocklin + * Name threads in ThreadPoolExecutors (:pr:`2408`) Matthew Rocklin + +------------------------------------------------------------------- Old: ---- distributed-1.25.1.tar.gz New: ---- distributed-1.25.2.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.sW5SKP/_old 2019-01-08 12:31:22.448082861 +0100 +++ /var/tmp/diff_new_pack.sW5SKP/_new 2019-01-08 12:31:22.448082861 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-distributed # -# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany. # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -20,7 +20,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 1.25.1 +Version: 1.25.2 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-1.25.1.tar.gz -> distributed-1.25.2.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/PKG-INFO new/distributed-1.25.2/PKG-INFO --- old/distributed-1.25.1/PKG-INFO 2018-12-15 17:59:09.000000000 +0100 +++ new/distributed-1.25.2/PKG-INFO 2019-01-04 23:14:54.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 1.25.1 +Version: 1.25.2 Summary: Distributed scheduler for Dask Home-page: https://distributed.readthedocs.io/en/latest/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/_version.py new/distributed-1.25.2/distributed/_version.py --- old/distributed-1.25.1/distributed/_version.py 2018-12-15 17:59:09.000000000 +0100 +++ new/distributed-1.25.2/distributed/_version.py 2019-01-04 23:14:54.000000000 +0100 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2018-12-15T11:58:40-0500", + "date": "2019-01-04T14:14:00-0800", "dirty": false, "error": null, - "full-revisionid": "19a393b5ea2b8dc10bd6d18a3a38c2f1d67655d0", - "version": "1.25.1" + "full-revisionid": "4e38022ed91b7d90ffe54703e9975d94a37fb9c3", + "version": "1.25.2" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/cli/dask_scheduler.py new/distributed-1.25.2/distributed/cli/dask_scheduler.py --- old/distributed-1.25.1/distributed/cli/dask_scheduler.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/cli/dask_scheduler.py 2019-01-03 18:30:24.000000000 +0100 @@ -14,7 +14,7 @@ from distributed import Scheduler from distributed.security import Security -from distributed.utils import get_ip_interface, ignoring +from distributed.utils import get_ip_interface from distributed.cli.utils import (check_python_3, install_signal_handlers, uri_from_host_port) from distributed.preloading import preload_modules, validate_preload_argv @@ -118,10 +118,16 @@ services = {} if _bokeh: - with ignoring(ImportError): + try: from distributed.bokeh.scheduler import BokehScheduler services[('bokeh', bokeh_port)] = (BokehScheduler, {'prefix': bokeh_prefix}) + except ImportError as error: + if str(error).startswith('No module named'): + logger.info('Web dashboard not loaded. Unable to import bokeh') + else: + logger.info('Unable to import bokeh: %s' % str(error)) + scheduler = Scheduler(loop=loop, services=services, scheduler_file=scheduler_file, security=sec) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/client.py new/distributed-1.25.2/distributed/client.py --- old/distributed-1.25.1/distributed/client.py 2018-12-15 00:02:08.000000000 +0100 +++ new/distributed-1.25.2/distributed/client.py 2019-01-03 18:30:24.000000000 +0100 @@ -250,7 +250,10 @@ """ cls = Future if cls._cb_executor is None or cls._cb_executor_pid != os.getpid(): - cls._cb_executor = ThreadPoolExecutor(1) + try: + cls._cb_executor = ThreadPoolExecutor(1, thread_name_prefix="Dask-Callback-Thread") + except TypeError: + cls._cb_executor = ThreadPoolExecutor(1) cls._cb_executor_pid = os.getpid() def execute_callback(fut): @@ -486,8 +489,8 @@ Gives the client a name that will be included in logs generated on the scheduler for matters relating to this client direct_to_workers: bool (optional) - Can this client connect directly to workers or should it proxy through - the scheduler? + Whether or not to connect directly to the workers, or to ask + the scheduler to serve as intermediary. heartbeat_interval: int Time in milliseconds between heartbeats to scheduler @@ -1606,6 +1609,10 @@ errors: string Either 'raise' or 'skip' if we should raise if a future has erred or skip its inclusion in the output collection + direct: boolean + Whether or not to connect directly to the workers, or to ask + the scheduler to serve as intermediary. This can also be set when + creating the Client. maxsize: int If the input is a queue then this produces an output queue with a maximum size. @@ -1792,9 +1799,9 @@ Whether to send each data element to all workers. By default we round-robin based on number of cores. direct: bool (defaults to automatically check) - Send data directly to workers, bypassing the central scheduler - This avoids burdening the scheduler but assumes that the client is - able to talk directly with the workers. + Whether or not to connect directly to the workers, or to ask + the scheduler to serve as intermediary. This can also be set when + creating the Client. maxsize: int (optional) Maximum size of queue if using queues, 0 implies infinite hash: bool (optional) @@ -2045,9 +2052,11 @@ @gen.coroutine def _run_on_scheduler(self, function, *args, **kwargs): + wait = kwargs.pop('wait', True) response = yield self.scheduler.run_function(function=dumps(function), args=dumps(args), - kwargs=dumps(kwargs)) + kwargs=dumps(kwargs), + wait=wait) if response['status'] == 'error': six.reraise(*clean_exception(**response)) else: @@ -2069,6 +2078,15 @@ >>> client.run_on_scheduler(get_number_of_tasks) # doctest: +SKIP 100 + Run asynchronous functions in the background: + + >>> async def print_state(dask_scheduler): # doctest: +SKIP + ... while True: + ... print(dask_scheduler.status) + ... await gen.sleep(1) + + >>> c.run(print_state, wait=False) # doctest: +SKIP + See Also -------- Client.run: Run a function on all workers @@ -2081,9 +2099,11 @@ def _run(self, function, *args, **kwargs): nanny = kwargs.pop('nanny', False) workers = kwargs.pop('workers', None) + wait = kwargs.pop('wait', True) responses = yield self.scheduler.broadcast(msg=dict(op='run', function=dumps(function), args=dumps(args), + wait=wait, kwargs=dumps(kwargs)), workers=workers, nanny=nanny) results = {} @@ -2092,7 +2112,8 @@ results[key] = resp['result'] elif resp['status'] == 'error': six.reraise(*clean_exception(**resp)) - raise gen.Return(results) + if wait: + raise gen.Return(results) def run(self, function, *args, **kwargs): """ @@ -2114,6 +2135,9 @@ **kwargs: keyword arguments for remote function workers: list Workers on which to run the function. Defaults to all known workers. + wait: boolean (optional) + If the function is asynchronous whether or not to wait until that + function finishes. Examples -------- @@ -2136,30 +2160,18 @@ >>> c.run(get_hostname) # doctest: +SKIP {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running} + + Run asynchronous functions in the background: + + >>> async def print_state(dask_worker): # doctest: +SKIP + ... while True: + ... print(dask_worker.status) + ... await gen.sleep(1) + + >>> c.run(print_state, wait=False) # doctest: +SKIP """ return self.sync(self._run, function, *args, **kwargs) - @gen.coroutine - def _run_coroutine(self, function, *args, **kwargs): - workers = kwargs.pop('workers', None) - wait = kwargs.pop('wait', True) - responses = yield self.scheduler.broadcast(msg=dict(op='run_coroutine', - function=dumps(function), - args=dumps(args), - kwargs=dumps(kwargs), - wait=wait), - workers=workers) - if not wait: - raise gen.Return(None) - else: - results = {} - for key, resp in responses.items(): - if resp['status'] == 'OK': - results[key] = resp['result'] - elif resp['status'] == 'error': - six.reraise(*clean_exception(**resp)) - raise gen.Return(results) - def run_coroutine(self, function, *args, **kwargs): """ Spawn a coroutine on all workers. @@ -2181,7 +2193,10 @@ Workers on which to run the function. Defaults to all known workers. """ - return self.sync(self._run_coroutine, function, *args, **kwargs) + warnings.warn("This method has been deprecated. " + "Instead use Client.run which detects async functions " + "automatically") + return self.run(function, *args, **kwargs) def _graph_to_futures(self, dsk, keys, restrictions=None, loose_restrictions=None, priority=None, @@ -2284,7 +2299,9 @@ sync: bool (optional) Returns Futures if False or concrete values if True (default). direct: bool - Gather results directly from workers + Whether or not to connect directly to the workers, or to ask + the scheduler to serve as intermediary. This can also be set when + creating the Client. Examples -------- @@ -3023,10 +3040,10 @@ Examples -------- - >>> client = Client() - >>> client.write_scheduler_file('scheduler.json') + >>> client = Client() # doctest: +SKIP + >>> client.write_scheduler_file('scheduler.json') # doctest: +SKIP # connect to previous client's scheduler - >>> client2 = Client(scheduler_file='scheduler.json') + >>> client2 = Client(scheduler_file='scheduler.json') # doctest: +SKIP """ if self.scheduler_file: raise ValueError('Scheduler file already set') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/comm/utils.py new/distributed-1.25.2/distributed/comm/utils.py --- old/distributed-1.25.1/distributed/comm/utils.py 2018-12-15 00:02:02.000000000 +0100 +++ new/distributed-1.25.2/distributed/comm/utils.py 2019-01-03 18:30:24.000000000 +0100 @@ -19,7 +19,10 @@ FRAME_OFFLOAD_THRESHOLD = 10 * 1024 ** 2 # 10 MB -_offload_executor = ThreadPoolExecutor(max_workers=1) +try: + _offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix='Dask-Offload') +except TypeError: + _offload_executor = ThreadPoolExecutor(max_workers=1) finalize(_offload_executor, _offload_executor.shutdown) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/deploy/local.py new/distributed-1.25.2/distributed/deploy/local.py --- old/distributed-1.25.1/distributed/deploy/local.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/deploy/local.py 2019-01-03 18:30:24.000000000 +0100 @@ -4,7 +4,7 @@ from datetime import timedelta import logging import math -from time import sleep +import warnings import weakref import toolz @@ -13,7 +13,7 @@ from .cluster import Cluster from ..core import CommClosedError from ..utils import (sync, ignoring, All, silence_logging, LoopRunner, - log_errors, thread_state) + log_errors, thread_state, parse_timedelta) from ..nanny import Nanny from ..scheduler import Scheduler from ..worker import Worker, _ncores @@ -151,9 +151,12 @@ def __await__(self): return self._started.__await__() + @property + def asynchronous(self): + return self._asynchronous or getattr(thread_state, 'asynchronous', False) + def sync(self, func, *args, **kwargs): - asynchronous = kwargs.pop('asynchronous', None) - if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False): + if kwargs.pop('asynchronous', None) or self.asynchronous: callback_timeout = kwargs.pop('callback_timeout', None) future = func(*args, **kwargs) if callback_timeout is not None: @@ -196,6 +199,10 @@ @gen.coroutine def _start_worker(self, death_timeout=60, **kwargs): + if self.status and self.status.startswith('clos'): + warnings.warn("Tried to start a worker while status=='%s'" % self.status) + return + if self.processes: W = Nanny kwargs['quiet'] = True @@ -257,15 +264,23 @@ self.sync(self._stop_worker, w) @gen.coroutine - def _close(self): + def _close(self, timeout='2s'): # Can be 'closing' as we're called by close() below if self.status == 'closed': return + self.status = 'closing' + + self.scheduler.clear_task_state() + + with ignoring(gen.TimeoutError): + yield gen.with_timeout( + timedelta(seconds=parse_timedelta(timeout)), + All([self._stop_worker(w) for w in self.workers]), + ) + del self.workers[:] try: with ignoring(gen.TimeoutError, CommClosedError, OSError): - yield All([w._close() for w in self.workers]) - with ignoring(gen.TimeoutError, CommClosedError, OSError): yield self.scheduler.close(fast=True) del self.workers[:] finally: @@ -277,25 +292,20 @@ return try: - self.scheduler.clear_task_state() + result = self.sync(self._close, callback_timeout=timeout) + except RuntimeError: # IOLoop is closed + result = None + + if hasattr(self, '_old_logging_level'): + if self.asynchronous: + result.add_done_callback(lambda _: silence_logging(self._old_logging_level)) + else: + silence_logging(self._old_logging_level) - for w in self.workers: - self.loop.add_callback(self._stop_worker, w) - for i in range(10): - if not self.workers: - break - else: - sleep(0.01) - del self.workers[:] - try: - self._loop_runner.run_sync(self._close, callback_timeout=timeout) - except RuntimeError: # IOLoop is closed - pass + if not self.asynchronous: self._loop_runner.stop() - finally: - self.status = 'closed' - with ignoring(AttributeError): - silence_logging(self._old_logging_level) + + return result @gen.coroutine def scale_up(self, n, **kwargs): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/deploy/tests/test_adaptive.py new/distributed-1.25.2/distributed/deploy/tests/test_adaptive.py --- old/distributed-1.25.1/distributed/deploy/tests/test_adaptive.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/deploy/tests/test_adaptive.py 2019-01-03 18:30:24.000000000 +0100 @@ -91,7 +91,7 @@ asynchronous=True) try: cluster.scheduler.allowed_failures = 1000 - alc = Adaptive(cluster.scheduler, cluster, interval=100) + alc = cluster.adapt(interval=100) c = yield Client(cluster, asynchronous=True) futures = c.map(slowinc, range(100), delay=0.01) @@ -120,8 +120,8 @@ yield c.gather(futures) finally: - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 10, active_rpc_timeout=10) @@ -199,8 +199,8 @@ assert time() < start + 2 assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1} finally: - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() @gen_test() @@ -223,8 +223,8 @@ assert frequencies(pluck(1, adapt.log)) == {'up': 1} finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -270,8 +270,8 @@ yield gen.sleep(0.1) assert len(cluster.scheduler.workers) == 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -295,8 +295,8 @@ yield gen.sleep(0.1) assert time() < start + 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks") @@ -317,8 +317,8 @@ assert len(cluster.scheduler.workers) <= 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() def test_basic_no_loop(): @@ -358,8 +358,8 @@ assert adaptive.log[1][1:] == ('up', {'n': 20}) finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -391,7 +391,7 @@ names = {ws.name for ws in cluster.scheduler.workers.values()} assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'} finally: - yield cluster._close() + yield cluster.close() @gen_cluster(client=True, ncores=[]) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/deploy/tests/test_local.py new/distributed-1.25.2/distributed/deploy/tests/test_local.py --- old/distributed-1.25.1/distributed/deploy/tests/test_local.py 2018-12-15 17:51:37.000000000 +0100 +++ new/distributed-1.25.2/distributed/deploy/tests/test_local.py 2019-01-03 18:30:24.000000000 +0100 @@ -50,7 +50,6 @@ cluster.close() sleep(0.5) log = log.getvalue() - print(log) assert not log @@ -291,8 +290,8 @@ assert len(cluster.workers) == 1 assert addr not in cluster.scheduler.ncores - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() def test_silent_startup(): @@ -511,8 +510,8 @@ yield gen.sleep(0.01) assert time() < start + 3 - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() def test_local_tls_restart(loop): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/nanny.py new/distributed-1.25.2/distributed/nanny.py --- old/distributed-1.25.1/distributed/nanny.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/nanny.py 2019-01-03 18:30:24.000000000 +0100 @@ -43,7 +43,7 @@ memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=(), preload_argv=[], security=None, contact_address=None, - listen_address=None, worker_class=None, **kwargs): + listen_address=None, worker_class=None, env=None, **kwargs): if scheduler_file: cfg = json_load_robust(scheduler_file) @@ -63,6 +63,7 @@ self.preload = preload self.preload_argv = preload_argv self.Worker = Worker if worker_class is None else worker_class + self.env = env or {} self.contact_address = contact_address self.memory_terminate_fraction = dask.config.get('distributed.worker.memory.terminate') @@ -215,7 +216,8 @@ worker_start_args=(start_arg,), silence_logs=self.silence_logs, on_exit=self._on_exit, - worker=self.Worker + worker=self.Worker, + env=self.env, ) self.auto_restart = True @@ -322,7 +324,7 @@ class WorkerProcess(object): def __init__(self, worker_args, worker_kwargs, worker_start_args, - silence_logs, on_exit, worker): + silence_logs, on_exit, worker, env): self.status = 'init' self.silence_logs = silence_logs self.worker_args = worker_args @@ -331,6 +333,7 @@ self.on_exit = on_exit self.process = None self.Worker = worker + self.env = env # Initialized when worker is ready self.worker_dir = None @@ -360,7 +363,9 @@ silence_logs=self.silence_logs, init_result_q=self.init_result_q, child_stop_q=self.child_stop_q, - uid=uid, Worker=self.Worker), + uid=uid, + Worker=self.Worker, + env=self.env), ) self.process.daemon = True self.process.set_exit_callback(self._on_exit) @@ -488,7 +493,8 @@ @classmethod def _run(cls, worker_args, worker_kwargs, worker_start_args, - silence_logs, init_result_q, child_stop_q, uid, Worker): # pragma: no cover + silence_logs, init_result_q, child_stop_q, uid, env, Worker): # pragma: no cover + os.environ.update(env) try: from dask.multiprocessing import initialize_worker_process except ImportError: # old Dask version diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/scheduler.py new/distributed-1.25.2/distributed/scheduler.py --- old/distributed-1.25.1/distributed/scheduler.py 2018-12-06 21:02:46.000000000 +0100 +++ new/distributed-1.25.2/distributed/scheduler.py 2019-01-03 18:30:24.000000000 +0100 @@ -3068,7 +3068,7 @@ self.unknown_durations[prefix].add(ts) return default - def run_function(self, stream, function, args=(), kwargs={}): + def run_function(self, stream, function, args=(), kwargs={}, wait=True): """ Run a function within this process See Also @@ -3077,7 +3077,7 @@ """ from .worker import run self.log_event('all', {'action': 'run-function', 'function': function}) - return run(self, stream, function=function, args=args, kwargs=kwargs) + return run(self, stream, function=function, args=args, kwargs=kwargs, wait=wait) def set_metadata(self, stream=None, keys=None, value=None): try: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/py3_test_client.py new/distributed-1.25.2/distributed/tests/py3_test_client.py --- old/distributed-1.25.1/distributed/tests/py3_test_client.py 2018-12-11 01:24:58.000000000 +0100 +++ new/distributed-1.25.2/distributed/tests/py3_test_client.py 2019-01-03 18:30:24.000000000 +0100 @@ -158,3 +158,47 @@ pytest.fail("array should have been destroyed") await gen.sleep(0.200) + + +@gen_cluster(client=True) +async def test_run_scheduler_async_def(c, s, a, b): + async def f(dask_scheduler): + await gen.sleep(0.01) + dask_scheduler.foo = 'bar' + + await c.run_on_scheduler(f) + + assert s.foo == 'bar' + + async def f(dask_worker): + await gen.sleep(0.01) + dask_worker.foo = 'bar' + + await c.run(f) + assert a.foo == 'bar' + assert b.foo == 'bar' + + +@gen_cluster(client=True) +async def test_run_scheduler_async_def_wait(c, s, a, b): + async def f(dask_scheduler): + await gen.sleep(0.01) + dask_scheduler.foo = 'bar' + + await c.run_on_scheduler(f, wait=False) + + while not hasattr(s, 'foo'): + await gen.sleep(0.01) + assert s.foo == 'bar' + + async def f(dask_worker): + await gen.sleep(0.01) + dask_worker.foo = 'bar' + + await c.run(f, wait=False) + + while not hasattr(a, 'foo') or not hasattr(b, 'foo'): + await gen.sleep(0.01) + + assert a.foo == 'bar' + assert b.foo == 'bar' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_client.py new/distributed-1.25.2/distributed/tests/test_client.py --- old/distributed-1.25.1/distributed/tests/test_client.py 2018-12-15 17:51:37.000000000 +0100 +++ new/distributed-1.25.2/distributed/tests/test_client.py 2019-01-03 18:30:24.000000000 +0100 @@ -1871,7 +1871,7 @@ assert cluster.scheduler.address in text finally: yield client.close() - yield cluster._close() + yield cluster.close() @gen_cluster(client=True) @@ -2616,35 +2616,34 @@ @gen_cluster(client=True) def test_run_coroutine(c, s, a, b): - results = yield c.run_coroutine(geninc, 1, delay=0.05) + results = yield c.run(geninc, 1, delay=0.05) assert results == {a.address: 2, b.address: 2} - results = yield c.run_coroutine(geninc, 1, delay=0.05, workers=[a.address]) + results = yield c.run(geninc, 1, delay=0.05, workers=[a.address]) assert results == {a.address: 2} - results = yield c.run_coroutine(geninc, 1, workers=[]) + results = yield c.run(geninc, 1, workers=[]) assert results == {} with pytest.raises(RuntimeError) as exc_info: - yield c.run_coroutine(throws, 1) + yield c.run(throws, 1) assert "hello" in str(exc_info) if sys.version_info >= (3, 5): - results = yield c.run_coroutine(asyncinc, 2, delay=0.01) + results = yield c.run(asyncinc, 2, delay=0.01) assert results == {a.address: 3, b.address: 3} def test_run_coroutine_sync(c, s, a, b): - result = c.run_coroutine(geninc, 2, delay=0.01) + result = c.run(geninc, 2, delay=0.01) assert result == {a['address']: 3, b['address']: 3} - result = c.run_coroutine(geninc, 2, - workers=[a['address']]) + result = c.run(geninc, 2, workers=[a['address']]) assert result == {a['address']: 3} t1 = time() - result = c.run_coroutine(geninc, 2, delay=10, wait=False) + result = c.run(geninc, 2, delay=10, wait=False) t2 = time() assert result is None assert t2 - t1 <= 1.0 @@ -4561,7 +4560,6 @@ assert time() < start + 1 -@pytest.mark.xfail(reason='Other tests bleed into the logs of this one') def test_quiet_client_close(loop): with captured_logger(logging.getLogger('distributed')) as logger: with Client(loop=loop, processes=False, threads_per_worker=4) as c: @@ -4573,8 +4571,12 @@ lines = out.strip().split('\n') assert len(lines) <= 2 for line in lines: - assert not line or 'Reconnecting' in line or set(line) == {'-'} - # assert not out + assert ( + not line or + 'Reconnecting' in line or + 'garbage' in line or + set(line) == {'-'} + ), line @gen_cluster() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_nanny.py new/distributed-1.25.2/distributed/tests/test_nanny.py --- old/distributed-1.25.1/distributed/tests/test_nanny.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/tests/test_nanny.py 2019-01-03 18:30:24.000000000 +0100 @@ -333,3 +333,13 @@ log = log.getvalue() assert 'error' not in log.lower(), log assert 'restart' not in log.lower(), log + + +@gen_cluster(ncores=[], client=True) +def test_environment_variable(c, s): + a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"}) + b = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "456"}) + yield [a._start(), b._start()] + results = yield c.run(lambda: os.environ['FOO']) + assert results == {a.worker_address: "123", b.worker_address: "456"} + yield [a._close(), b._close()] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_resources.py new/distributed-1.25.2/distributed/tests/test_resources.py --- old/distributed-1.25.1/distributed/tests/test_resources.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/tests/test_resources.py 2019-01-03 18:30:24.000000000 +0100 @@ -262,6 +262,7 @@ assert 'executing' in str(a.story(key)) +@pytest.mark.xfail(reason="atop fusion seemed to break this") @gen_cluster(client=True, ncores=[('127.0.0.1', 1, {'resources': {'A': 1}}), ('127.0.0.1', 1, {'resources': {'B': 1}})]) def test_full_collections(c, s, a, b): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_worker.py new/distributed-1.25.2/distributed/tests/test_worker.py --- old/distributed-1.25.1/distributed/tests/test_worker.py 2018-11-28 00:45:41.000000000 +0100 +++ new/distributed-1.25.2/distributed/tests/test_worker.py 2019-01-03 18:30:24.000000000 +0100 @@ -399,7 +399,7 @@ yield gen.sleep(0.001) raise gen.Return(dask_worker.id) - response = yield c._run_coroutine(f) + response = yield c.run(f) assert response == {a.address: a.id, b.address: b.id} @@ -866,7 +866,7 @@ result = yield future raise gen.Return(result) - results = yield c.run_coroutine(f) + results = yield c.run(f) assert results == {a.address: 11, b.address: 11} @@ -879,7 +879,7 @@ result = yield future raise gen.Return(result) - results = client.run_coroutine(f) + results = client.run(f) assert results == {a['address']: 11, b['address']: 11} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/threadpoolexecutor.py new/distributed-1.25.2/distributed/threadpoolexecutor.py --- old/distributed-1.25.1/distributed/threadpoolexecutor.py 2018-12-15 00:02:02.000000000 +0100 +++ new/distributed-1.25.2/distributed/threadpoolexecutor.py 2019-01-03 18:30:24.000000000 +0100 @@ -75,11 +75,12 @@ super(ThreadPoolExecutor, self).__init__(*args, **kwargs) self._rejoin_list = [] self._rejoin_lock = threading.Lock() + self._thread_name_prefix = kwargs.get('thread_name_prefix', 'DaskThreadPoolExecutor') def _adjust_thread_count(self): if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - name="ThreadPoolExecutor-%d-%d" % (os.getpid(), next(self._counter)), + name=self._thread_name_prefix + "-%d-%d" % (os.getpid(), next(self._counter)), args=(self, self._work_queue)) t.daemon = True self._threads.add(t) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed/worker.py new/distributed-1.25.2/distributed/worker.py --- old/distributed-1.25.1/distributed/worker.py 2018-12-15 00:02:02.000000000 +0100 +++ new/distributed-1.25.2/distributed/worker.py 2019-01-03 18:30:24.000000000 +0100 @@ -424,8 +424,8 @@ self.status = None self._closed = Event() self.reconnect = reconnect - self.executor = executor or ThreadPoolExecutor(self.ncores) - self.actor_executor = ThreadPoolExecutor(1) + self.executor = executor or ThreadPoolExecutor(self.ncores, thread_name_prefix="Dask-Worker-Threads'") + self.actor_executor = ThreadPoolExecutor(1, thread_name_prefix="Dask-Actor-Threads") self.name = name self.scheduler_delay = 0 self.stream_comms = dict() @@ -1594,8 +1594,10 @@ if self.validate: self.validate_state() + # dep states may have changed before gather_dep runs + # if a dep is no longer in-flight then don't fetch it deps = tuple(dep for dep in deps - if self.dep_state.get(dep) in ('waiting', 'flight')) + if self.dep_state.get(dep) == 'flight') self.log.append(('request-dep', dep, worker, deps)) logger.debug("Request %d keys", len(deps)) @@ -1960,13 +1962,14 @@ # logger.info("Finish job %d, %s", i, key) raise gen.Return(result) - def run(self, comm, function, args=(), kwargs=None): + def run(self, comm, function, args=(), wait=True, kwargs=None): kwargs = kwargs or {} - return run(self, comm, function=function, args=args, kwargs=kwargs) + return run(self, comm, function=function, args=args, kwargs=kwargs, + wait=wait) def run_coroutine(self, comm, function, args=(), kwargs=None, wait=True): return run(self, comm, function=function, args=args, kwargs=kwargs, - is_coro=True, wait=wait) + wait=wait) @gen.coroutine def actor_execute(self, comm=None, actor=None, function=None, args=(), kwargs={}): @@ -2915,9 +2918,14 @@ @gen.coroutine -def run(server, comm, function, args=(), kwargs={}, is_coro=False, wait=True): - assert wait or is_coro, "Combination not supported" +def run(server, comm, function, args=(), kwargs={}, is_coro=None, wait=True): function = pickle.loads(function) + if is_coro is None: + is_coro = iscoroutinefunction(function) + else: + warnings.warn("The is_coro= parameter is deprecated. " + "We now automatically detect coroutines/async functions") + assert wait or is_coro, "Combination not supported" if args: args = pickle.loads(args) if kwargs: @@ -2928,9 +2936,15 @@ kwargs['dask_scheduler'] = server logger.info("Run out-of-band function %r", funcname(function)) try: - result = function(*args, **kwargs) - if is_coro: - result = (yield result) if wait else None + if not is_coro: + result = function(*args, **kwargs) + else: + if wait: + result = yield function(*args, **kwargs) + else: + server.loop.add_callback(function, *args, **kwargs) + result = None + except Exception as e: logger.warning(" Run Failed\n" "Function: %s\n" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/distributed.egg-info/PKG-INFO new/distributed-1.25.2/distributed.egg-info/PKG-INFO --- old/distributed-1.25.1/distributed.egg-info/PKG-INFO 2018-12-15 17:59:09.000000000 +0100 +++ new/distributed-1.25.2/distributed.egg-info/PKG-INFO 2019-01-04 23:14:53.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 1.25.1 +Version: 1.25.2 Summary: Distributed scheduler for Dask Home-page: https://distributed.readthedocs.io/en/latest/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-1.25.1/docs/source/changelog.rst new/distributed-1.25.2/docs/source/changelog.rst --- old/distributed-1.25.1/docs/source/changelog.rst 2018-12-15 17:57:26.000000000 +0100 +++ new/distributed-1.25.2/docs/source/changelog.rst 2019-01-04 23:13:19.000000000 +0100 @@ -1,6 +1,21 @@ Changelog ========= +1.25.2 - 2019-01-04 +------------------- + +- Clean up LocalCluster logging better in async mode (:pr:`2448`) `Matthew Rocklin`_ +- Add short error message if bokeh cannot be imported (:pr:`2444`) `Dirk Petersen`_ +- Add optional environment variables to Nanny (:pr:`2431`) `Matthew Rocklin`_ +- Make the direct keyword docstring entries uniform (:pr:`2441`) `Matthew Rocklin`_ +- Make LocalCluster.close async friendly (:pr:`2437`) `Matthew Rocklin`_ +- gather_dep: don't request dependencies we already found out we don't want (:pr:`2428`) `tjb900`_ +- Add parameters to Client.run docstring (:pr:`2429`) `Matthew Rocklin`_ +- Support coroutines and async-def functions in run/run_scheduler (:pr:`2427`) `Matthew Rocklin`_ +- Name threads in ThreadPoolExecutors (:pr:`2408`) `Matthew Rocklin`_ + + + 1.25.1 - 2018-12-15 ------------------- @@ -880,3 +895,5 @@ .. _`Diane Trout`: https://github.com/detrout .. _`tjb900`: https://github.com/tjb900 .. _`Stephan Hoyer`: https://github.com/shoyer +.. _`tjb900`: https://github.com/tjb900 +.. _`Dirk Petersen`: https://github.com/dirkpetersen