Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2019-01-08 12:29:06
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.28833 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Tue Jan  8 12:29:06 2019 rev:14 rq:663294 version:1.25.2

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2018-12-31 09:47:20.278169858 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.28833/python-distributed.changes
 2019-01-08 12:31:21.972083379 +0100
@@ -1,0 +2,25 @@
+Sun Jan  6 22:33:33 UTC 2019 - Arun Persaud <a...@gmx.de>
+
+- specfile:
+  * update copyright year
+
+- update to version 1.25.2:
+  * Clean up LocalCluster logging better in async mode (:pr:`2448`)
+    Matthew Rocklin
+  * Add short error message if bokeh cannot be imported (:pr:`2444`)
+    Dirk Petersen
+  * Add optional environment variables to Nanny (:pr:`2431`) Matthew
+    Rocklin
+  * Make the direct keyword docstring entries uniform (:pr:`2441`)
+    Matthew Rocklin
+  * Make LocalCluster.close async friendly (:pr:`2437`) Matthew
+    Rocklin
+  * gather_dep: don't request dependencies we already found out we
+    don't want (:pr:`2428`) tjb900
+  * Add parameters to Client.run docstring (:pr:`2429`) Matthew
+    Rocklin
+  * Support coroutines and async-def functions in run/run_scheduler
+    (:pr:`2427`) Matthew Rocklin
+  * Name threads in ThreadPoolExecutors (:pr:`2408`) Matthew Rocklin
+
+-------------------------------------------------------------------

Old:
----
  distributed-1.25.1.tar.gz

New:
----
  distributed-1.25.2.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.sW5SKP/_old  2019-01-08 12:31:22.448082861 +0100
+++ /var/tmp/diff_new_pack.sW5SKP/_new  2019-01-08 12:31:22.448082861 +0100
@@ -1,7 +1,7 @@
 #
 # spec file for package python-distributed
 #
-# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany.
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -20,7 +20,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        1.25.1
+Version:        1.25.2
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-1.25.1.tar.gz -> distributed-1.25.2.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/PKG-INFO 
new/distributed-1.25.2/PKG-INFO
--- old/distributed-1.25.1/PKG-INFO     2018-12-15 17:59:09.000000000 +0100
+++ new/distributed-1.25.2/PKG-INFO     2019-01-04 23:14:54.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 1.25.1
+Version: 1.25.2
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.readthedocs.io/en/latest/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/_version.py 
new/distributed-1.25.2/distributed/_version.py
--- old/distributed-1.25.1/distributed/_version.py      2018-12-15 
17:59:09.000000000 +0100
+++ new/distributed-1.25.2/distributed/_version.py      2019-01-04 
23:14:54.000000000 +0100
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2018-12-15T11:58:40-0500",
+ "date": "2019-01-04T14:14:00-0800",
  "dirty": false,
  "error": null,
- "full-revisionid": "19a393b5ea2b8dc10bd6d18a3a38c2f1d67655d0",
- "version": "1.25.1"
+ "full-revisionid": "4e38022ed91b7d90ffe54703e9975d94a37fb9c3",
+ "version": "1.25.2"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/cli/dask_scheduler.py 
new/distributed-1.25.2/distributed/cli/dask_scheduler.py
--- old/distributed-1.25.1/distributed/cli/dask_scheduler.py    2018-11-28 
00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/cli/dask_scheduler.py    2019-01-03 
18:30:24.000000000 +0100
@@ -14,7 +14,7 @@
 
 from distributed import Scheduler
 from distributed.security import Security
-from distributed.utils import get_ip_interface, ignoring
+from distributed.utils import get_ip_interface
 from distributed.cli.utils import (check_python_3, install_signal_handlers,
                                    uri_from_host_port)
 from distributed.preloading import preload_modules, validate_preload_argv
@@ -118,10 +118,16 @@
 
     services = {}
     if _bokeh:
-        with ignoring(ImportError):
+        try:
             from distributed.bokeh.scheduler import BokehScheduler
             services[('bokeh', bokeh_port)] = (BokehScheduler,
                                                {'prefix': bokeh_prefix})
+        except ImportError as error:
+            if str(error).startswith('No module named'):
+                logger.info('Web dashboard not loaded.  Unable to import 
bokeh')
+            else:
+                logger.info('Unable to import bokeh: %s' % str(error))
+
     scheduler = Scheduler(loop=loop, services=services,
                           scheduler_file=scheduler_file,
                           security=sec)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/client.py 
new/distributed-1.25.2/distributed/client.py
--- old/distributed-1.25.1/distributed/client.py        2018-12-15 
00:02:08.000000000 +0100
+++ new/distributed-1.25.2/distributed/client.py        2019-01-03 
18:30:24.000000000 +0100
@@ -250,7 +250,10 @@
         """
         cls = Future
         if cls._cb_executor is None or cls._cb_executor_pid != os.getpid():
-            cls._cb_executor = ThreadPoolExecutor(1)
+            try:
+                cls._cb_executor = ThreadPoolExecutor(1, 
thread_name_prefix="Dask-Callback-Thread")
+            except TypeError:
+                cls._cb_executor = ThreadPoolExecutor(1)
             cls._cb_executor_pid = os.getpid()
 
         def execute_callback(fut):
@@ -486,8 +489,8 @@
         Gives the client a name that will be included in logs generated on
         the scheduler for matters relating to this client
     direct_to_workers: bool (optional)
-        Can this client connect directly to workers or should it proxy through
-        the scheduler?
+        Whether or not to connect directly to the workers, or to ask
+        the scheduler to serve as intermediary.
     heartbeat_interval: int
         Time in milliseconds between heartbeats to scheduler
 
@@ -1606,6 +1609,10 @@
         errors: string
             Either 'raise' or 'skip' if we should raise if a future has erred
             or skip its inclusion in the output collection
+        direct: boolean
+            Whether or not to connect directly to the workers, or to ask
+            the scheduler to serve as intermediary.  This can also be set when
+            creating the Client.
         maxsize: int
             If the input is a queue then this produces an output queue with a
             maximum size.
@@ -1792,9 +1799,9 @@
             Whether to send each data element to all workers.
             By default we round-robin based on number of cores.
         direct: bool (defaults to automatically check)
-            Send data directly to workers, bypassing the central scheduler
-            This avoids burdening the scheduler but assumes that the client is
-            able to talk directly with the workers.
+            Whether or not to connect directly to the workers, or to ask
+            the scheduler to serve as intermediary.  This can also be set when
+            creating the Client.
         maxsize: int (optional)
             Maximum size of queue if using queues, 0 implies infinite
         hash: bool (optional)
@@ -2045,9 +2052,11 @@
 
     @gen.coroutine
     def _run_on_scheduler(self, function, *args, **kwargs):
+        wait = kwargs.pop('wait', True)
         response = yield self.scheduler.run_function(function=dumps(function),
                                                      args=dumps(args),
-                                                     kwargs=dumps(kwargs))
+                                                     kwargs=dumps(kwargs),
+                                                     wait=wait)
         if response['status'] == 'error':
             six.reraise(*clean_exception(**response))
         else:
@@ -2069,6 +2078,15 @@
         >>> client.run_on_scheduler(get_number_of_tasks)  # doctest: +SKIP
         100
 
+        Run asynchronous functions in the background:
+
+        >>> async def print_state(dask_scheduler):  # doctest: +SKIP
+        ...    while True:
+        ...        print(dask_scheduler.status)
+        ...        await gen.sleep(1)
+
+        >>> c.run(print_state, wait=False)  # doctest: +SKIP
+
         See Also
         --------
         Client.run: Run a function on all workers
@@ -2081,9 +2099,11 @@
     def _run(self, function, *args, **kwargs):
         nanny = kwargs.pop('nanny', False)
         workers = kwargs.pop('workers', None)
+        wait = kwargs.pop('wait', True)
         responses = yield self.scheduler.broadcast(msg=dict(op='run',
                                                             
function=dumps(function),
                                                             args=dumps(args),
+                                                            wait=wait,
                                                             
kwargs=dumps(kwargs)),
                                                    workers=workers, 
nanny=nanny)
         results = {}
@@ -2092,7 +2112,8 @@
                 results[key] = resp['result']
             elif resp['status'] == 'error':
                 six.reraise(*clean_exception(**resp))
-        raise gen.Return(results)
+        if wait:
+            raise gen.Return(results)
 
     def run(self, function, *args, **kwargs):
         """
@@ -2114,6 +2135,9 @@
         **kwargs: keyword arguments for remote function
         workers: list
             Workers on which to run the function. Defaults to all known 
workers.
+        wait: boolean (optional)
+            If the function is asynchronous whether or not to wait until that
+            function finishes.
 
         Examples
         --------
@@ -2136,30 +2160,18 @@
         >>> c.run(get_hostname)  # doctest: +SKIP
         {'192.168.0.100:9000': 'running',
          '192.168.0.101:9000': 'running}
+
+        Run asynchronous functions in the background:
+
+        >>> async def print_state(dask_worker):  # doctest: +SKIP
+        ...    while True:
+        ...        print(dask_worker.status)
+        ...        await gen.sleep(1)
+
+        >>> c.run(print_state, wait=False)  # doctest: +SKIP
         """
         return self.sync(self._run, function, *args, **kwargs)
 
-    @gen.coroutine
-    def _run_coroutine(self, function, *args, **kwargs):
-        workers = kwargs.pop('workers', None)
-        wait = kwargs.pop('wait', True)
-        responses = yield self.scheduler.broadcast(msg=dict(op='run_coroutine',
-                                                            
function=dumps(function),
-                                                            args=dumps(args),
-                                                            
kwargs=dumps(kwargs),
-                                                            wait=wait),
-                                                   workers=workers)
-        if not wait:
-            raise gen.Return(None)
-        else:
-            results = {}
-            for key, resp in responses.items():
-                if resp['status'] == 'OK':
-                    results[key] = resp['result']
-                elif resp['status'] == 'error':
-                    six.reraise(*clean_exception(**resp))
-            raise gen.Return(results)
-
     def run_coroutine(self, function, *args, **kwargs):
         """
         Spawn a coroutine on all workers.
@@ -2181,7 +2193,10 @@
             Workers on which to run the function. Defaults to all known 
workers.
 
         """
-        return self.sync(self._run_coroutine, function, *args, **kwargs)
+        warnings.warn("This method has been deprecated. "
+                      "Instead use Client.run which detects async functions "
+                      "automatically")
+        return self.run(function, *args, **kwargs)
 
     def _graph_to_futures(self, dsk, keys, restrictions=None,
                           loose_restrictions=None, priority=None,
@@ -2284,7 +2299,9 @@
         sync: bool (optional)
             Returns Futures if False or concrete values if True (default).
         direct: bool
-            Gather results directly from workers
+            Whether or not to connect directly to the workers, or to ask
+            the scheduler to serve as intermediary.  This can also be set when
+            creating the Client.
 
         Examples
         --------
@@ -3023,10 +3040,10 @@
 
         Examples
         --------
-        >>> client = Client()
-        >>> client.write_scheduler_file('scheduler.json')
+        >>> client = Client()  # doctest: +SKIP
+        >>> client.write_scheduler_file('scheduler.json')  # doctest: +SKIP
         # connect to previous client's scheduler
-        >>> client2 = Client(scheduler_file='scheduler.json')
+        >>> client2 = Client(scheduler_file='scheduler.json')  # doctest: +SKIP
         """
         if self.scheduler_file:
             raise ValueError('Scheduler file already set')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/comm/utils.py 
new/distributed-1.25.2/distributed/comm/utils.py
--- old/distributed-1.25.1/distributed/comm/utils.py    2018-12-15 
00:02:02.000000000 +0100
+++ new/distributed-1.25.2/distributed/comm/utils.py    2019-01-03 
18:30:24.000000000 +0100
@@ -19,7 +19,10 @@
 
 FRAME_OFFLOAD_THRESHOLD = 10 * 1024 ** 2   # 10 MB
 
-_offload_executor = ThreadPoolExecutor(max_workers=1)
+try:
+    _offload_executor = ThreadPoolExecutor(max_workers=1, 
thread_name_prefix='Dask-Offload')
+except TypeError:
+    _offload_executor = ThreadPoolExecutor(max_workers=1)
 finalize(_offload_executor, _offload_executor.shutdown)
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/deploy/local.py 
new/distributed-1.25.2/distributed/deploy/local.py
--- old/distributed-1.25.1/distributed/deploy/local.py  2018-11-28 
00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/deploy/local.py  2019-01-03 
18:30:24.000000000 +0100
@@ -4,7 +4,7 @@
 from datetime import timedelta
 import logging
 import math
-from time import sleep
+import warnings
 import weakref
 import toolz
 
@@ -13,7 +13,7 @@
 from .cluster import Cluster
 from ..core import CommClosedError
 from ..utils import (sync, ignoring, All, silence_logging, LoopRunner,
-        log_errors, thread_state)
+        log_errors, thread_state, parse_timedelta)
 from ..nanny import Nanny
 from ..scheduler import Scheduler
 from ..worker import Worker, _ncores
@@ -151,9 +151,12 @@
     def __await__(self):
         return self._started.__await__()
 
+    @property
+    def asynchronous(self):
+        return self._asynchronous or getattr(thread_state, 'asynchronous', 
False)
+
     def sync(self, func, *args, **kwargs):
-        asynchronous = kwargs.pop('asynchronous', None)
-        if asynchronous or self._asynchronous or getattr(thread_state, 
'asynchronous', False):
+        if kwargs.pop('asynchronous', None) or self.asynchronous:
             callback_timeout = kwargs.pop('callback_timeout', None)
             future = func(*args, **kwargs)
             if callback_timeout is not None:
@@ -196,6 +199,10 @@
 
     @gen.coroutine
     def _start_worker(self, death_timeout=60, **kwargs):
+        if self.status and self.status.startswith('clos'):
+            warnings.warn("Tried to start a worker while status=='%s'" % 
self.status)
+            return
+
         if self.processes:
             W = Nanny
             kwargs['quiet'] = True
@@ -257,15 +264,23 @@
         self.sync(self._stop_worker, w)
 
     @gen.coroutine
-    def _close(self):
+    def _close(self, timeout='2s'):
         # Can be 'closing' as we're called by close() below
         if self.status == 'closed':
             return
+        self.status = 'closing'
+
+        self.scheduler.clear_task_state()
+
+        with ignoring(gen.TimeoutError):
+            yield gen.with_timeout(
+                timedelta(seconds=parse_timedelta(timeout)),
+                All([self._stop_worker(w) for w in self.workers]),
+            )
+        del self.workers[:]
 
         try:
             with ignoring(gen.TimeoutError, CommClosedError, OSError):
-                yield All([w._close() for w in self.workers])
-            with ignoring(gen.TimeoutError, CommClosedError, OSError):
                 yield self.scheduler.close(fast=True)
             del self.workers[:]
         finally:
@@ -277,25 +292,20 @@
             return
 
         try:
-            self.scheduler.clear_task_state()
+            result = self.sync(self._close, callback_timeout=timeout)
+        except RuntimeError:  # IOLoop is closed
+            result = None
+
+        if hasattr(self, '_old_logging_level'):
+            if self.asynchronous:
+                result.add_done_callback(lambda _: 
silence_logging(self._old_logging_level))
+            else:
+                silence_logging(self._old_logging_level)
 
-            for w in self.workers:
-                self.loop.add_callback(self._stop_worker, w)
-            for i in range(10):
-                if not self.workers:
-                    break
-                else:
-                    sleep(0.01)
-            del self.workers[:]
-            try:
-                self._loop_runner.run_sync(self._close, 
callback_timeout=timeout)
-            except RuntimeError:  # IOLoop is closed
-                pass
+        if not self.asynchronous:
             self._loop_runner.stop()
-        finally:
-            self.status = 'closed'
-        with ignoring(AttributeError):
-            silence_logging(self._old_logging_level)
+
+        return result
 
     @gen.coroutine
     def scale_up(self, n, **kwargs):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.1/distributed/deploy/tests/test_adaptive.py 
new/distributed-1.25.2/distributed/deploy/tests/test_adaptive.py
--- old/distributed-1.25.1/distributed/deploy/tests/test_adaptive.py    
2018-11-28 00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/deploy/tests/test_adaptive.py    
2019-01-03 18:30:24.000000000 +0100
@@ -91,7 +91,7 @@
                                  asynchronous=True)
     try:
         cluster.scheduler.allowed_failures = 1000
-        alc = Adaptive(cluster.scheduler, cluster, interval=100)
+        alc = cluster.adapt(interval=100)
         c = yield Client(cluster, asynchronous=True)
 
         futures = c.map(slowinc, range(100), delay=0.01)
@@ -120,8 +120,8 @@
         yield c.gather(futures)
 
     finally:
-        yield c._close()
-        yield cluster._close()
+        yield c.close()
+        yield cluster.close()
 
 
 @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 10, 
active_rpc_timeout=10)
@@ -199,8 +199,8 @@
             assert time() < start + 2
         assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1}
     finally:
-        yield c._close()
-        yield cluster._close()
+        yield c.close()
+        yield cluster.close()
 
 
 @gen_test()
@@ -223,8 +223,8 @@
 
         assert frequencies(pluck(1, adapt.log)) == {'up': 1}
     finally:
-        yield client._close()
-        yield cluster._close()
+        yield client.close()
+        yield cluster.close()
 
 
 @gen_test(timeout=None)
@@ -270,8 +270,8 @@
         yield gen.sleep(0.1)
         assert len(cluster.scheduler.workers) == 1
     finally:
-        yield client._close()
-        yield cluster._close()
+        yield client.close()
+        yield cluster.close()
 
 
 @gen_test(timeout=None)
@@ -295,8 +295,8 @@
             yield gen.sleep(0.1)
             assert time() < start + 1
     finally:
-        yield client._close()
-        yield cluster._close()
+        yield client.close()
+        yield cluster.close()
 
 
 @pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks")
@@ -317,8 +317,8 @@
 
         assert len(cluster.scheduler.workers) <= 1
     finally:
-        yield client._close()
-        yield cluster._close()
+        yield client.close()
+        yield cluster.close()
 
 
 def test_basic_no_loop():
@@ -358,8 +358,8 @@
         assert adaptive.log[1][1:] == ('up', {'n': 20})
 
     finally:
-        yield client._close()
-        yield cluster._close()
+        yield client.close()
+        yield cluster.close()
 
 
 @gen_test(timeout=None)
@@ -391,7 +391,7 @@
         names = {ws.name for ws in cluster.scheduler.workers.values()}
         assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'}
     finally:
-        yield cluster._close()
+        yield cluster.close()
 
 
 @gen_cluster(client=True, ncores=[])
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.1/distributed/deploy/tests/test_local.py 
new/distributed-1.25.2/distributed/deploy/tests/test_local.py
--- old/distributed-1.25.1/distributed/deploy/tests/test_local.py       
2018-12-15 17:51:37.000000000 +0100
+++ new/distributed-1.25.2/distributed/deploy/tests/test_local.py       
2019-01-03 18:30:24.000000000 +0100
@@ -50,7 +50,6 @@
             cluster.close()
             sleep(0.5)
         log = log.getvalue()
-        print(log)
         assert not log
 
 
@@ -291,8 +290,8 @@
     assert len(cluster.workers) == 1
     assert addr not in cluster.scheduler.ncores
 
-    yield c._close()
-    yield cluster._close()
+    yield c.close()
+    yield cluster.close()
 
 
 def test_silent_startup():
@@ -511,8 +510,8 @@
         yield gen.sleep(0.01)
         assert time() < start + 3
 
-    yield c._close()
-    yield cluster._close()
+    yield c.close()
+    yield cluster.close()
 
 
 def test_local_tls_restart(loop):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/nanny.py 
new/distributed-1.25.2/distributed/nanny.py
--- old/distributed-1.25.1/distributed/nanny.py 2018-11-28 00:45:41.000000000 
+0100
+++ new/distributed-1.25.2/distributed/nanny.py 2019-01-03 18:30:24.000000000 
+0100
@@ -43,7 +43,7 @@
             memory_limit='auto', reconnect=True, validate=False, quiet=False,
             resources=None, silence_logs=None, death_timeout=None, preload=(),
             preload_argv=[], security=None, contact_address=None,
-            listen_address=None, worker_class=None, **kwargs):
+            listen_address=None, worker_class=None, env=None, **kwargs):
 
         if scheduler_file:
             cfg = json_load_robust(scheduler_file)
@@ -63,6 +63,7 @@
         self.preload = preload
         self.preload_argv = preload_argv
         self.Worker = Worker if worker_class is None else worker_class
+        self.env = env or {}
 
         self.contact_address = contact_address
         self.memory_terminate_fraction = 
dask.config.get('distributed.worker.memory.terminate')
@@ -215,7 +216,8 @@
                 worker_start_args=(start_arg,),
                 silence_logs=self.silence_logs,
                 on_exit=self._on_exit,
-                worker=self.Worker
+                worker=self.Worker,
+                env=self.env,
             )
 
         self.auto_restart = True
@@ -322,7 +324,7 @@
 class WorkerProcess(object):
 
     def __init__(self, worker_args, worker_kwargs, worker_start_args,
-                 silence_logs, on_exit, worker):
+                 silence_logs, on_exit, worker, env):
         self.status = 'init'
         self.silence_logs = silence_logs
         self.worker_args = worker_args
@@ -331,6 +333,7 @@
         self.on_exit = on_exit
         self.process = None
         self.Worker = worker
+        self.env = env
 
         # Initialized when worker is ready
         self.worker_dir = None
@@ -360,7 +363,9 @@
                         silence_logs=self.silence_logs,
                         init_result_q=self.init_result_q,
                         child_stop_q=self.child_stop_q,
-                        uid=uid, Worker=self.Worker),
+                        uid=uid,
+                        Worker=self.Worker,
+                        env=self.env),
         )
         self.process.daemon = True
         self.process.set_exit_callback(self._on_exit)
@@ -488,7 +493,8 @@
 
     @classmethod
     def _run(cls, worker_args, worker_kwargs, worker_start_args,
-             silence_logs, init_result_q, child_stop_q, uid, Worker):  # 
pragma: no cover
+             silence_logs, init_result_q, child_stop_q, uid, env, Worker):  # 
pragma: no cover
+        os.environ.update(env)
         try:
             from dask.multiprocessing import initialize_worker_process
         except ImportError:   # old Dask version
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/scheduler.py 
new/distributed-1.25.2/distributed/scheduler.py
--- old/distributed-1.25.1/distributed/scheduler.py     2018-12-06 
21:02:46.000000000 +0100
+++ new/distributed-1.25.2/distributed/scheduler.py     2019-01-03 
18:30:24.000000000 +0100
@@ -3068,7 +3068,7 @@
             self.unknown_durations[prefix].add(ts)
             return default
 
-    def run_function(self, stream, function, args=(), kwargs={}):
+    def run_function(self, stream, function, args=(), kwargs={}, wait=True):
         """ Run a function within this process
 
         See Also
@@ -3077,7 +3077,7 @@
         """
         from .worker import run
         self.log_event('all', {'action': 'run-function', 'function': function})
-        return run(self, stream, function=function, args=args, kwargs=kwargs)
+        return run(self, stream, function=function, args=args, kwargs=kwargs, 
wait=wait)
 
     def set_metadata(self, stream=None, keys=None, value=None):
         try:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.1/distributed/tests/py3_test_client.py 
new/distributed-1.25.2/distributed/tests/py3_test_client.py
--- old/distributed-1.25.1/distributed/tests/py3_test_client.py 2018-12-11 
01:24:58.000000000 +0100
+++ new/distributed-1.25.2/distributed/tests/py3_test_client.py 2019-01-03 
18:30:24.000000000 +0100
@@ -158,3 +158,47 @@
             pytest.fail("array should have been destroyed")
 
         await gen.sleep(0.200)
+
+
+@gen_cluster(client=True)
+async def test_run_scheduler_async_def(c, s, a, b):
+    async def f(dask_scheduler):
+        await gen.sleep(0.01)
+        dask_scheduler.foo = 'bar'
+
+    await c.run_on_scheduler(f)
+
+    assert s.foo == 'bar'
+
+    async def f(dask_worker):
+        await gen.sleep(0.01)
+        dask_worker.foo = 'bar'
+
+    await c.run(f)
+    assert a.foo == 'bar'
+    assert b.foo == 'bar'
+
+
+@gen_cluster(client=True)
+async def test_run_scheduler_async_def_wait(c, s, a, b):
+    async def f(dask_scheduler):
+        await gen.sleep(0.01)
+        dask_scheduler.foo = 'bar'
+
+    await c.run_on_scheduler(f, wait=False)
+
+    while not hasattr(s, 'foo'):
+        await gen.sleep(0.01)
+    assert s.foo == 'bar'
+
+    async def f(dask_worker):
+        await gen.sleep(0.01)
+        dask_worker.foo = 'bar'
+
+    await c.run(f, wait=False)
+
+    while not hasattr(a, 'foo') or not hasattr(b, 'foo'):
+        await gen.sleep(0.01)
+
+    assert a.foo == 'bar'
+    assert b.foo == 'bar'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_client.py 
new/distributed-1.25.2/distributed/tests/test_client.py
--- old/distributed-1.25.1/distributed/tests/test_client.py     2018-12-15 
17:51:37.000000000 +0100
+++ new/distributed-1.25.2/distributed/tests/test_client.py     2019-01-03 
18:30:24.000000000 +0100
@@ -1871,7 +1871,7 @@
         assert cluster.scheduler.address in text
     finally:
         yield client.close()
-        yield cluster._close()
+        yield cluster.close()
 
 
 @gen_cluster(client=True)
@@ -2616,35 +2616,34 @@
 
 @gen_cluster(client=True)
 def test_run_coroutine(c, s, a, b):
-    results = yield c.run_coroutine(geninc, 1, delay=0.05)
+    results = yield c.run(geninc, 1, delay=0.05)
     assert results == {a.address: 2, b.address: 2}
 
-    results = yield c.run_coroutine(geninc, 1, delay=0.05, workers=[a.address])
+    results = yield c.run(geninc, 1, delay=0.05, workers=[a.address])
     assert results == {a.address: 2}
 
-    results = yield c.run_coroutine(geninc, 1, workers=[])
+    results = yield c.run(geninc, 1, workers=[])
     assert results == {}
 
     with pytest.raises(RuntimeError) as exc_info:
-        yield c.run_coroutine(throws, 1)
+        yield c.run(throws, 1)
     assert "hello" in str(exc_info)
 
     if sys.version_info >= (3, 5):
-        results = yield c.run_coroutine(asyncinc, 2, delay=0.01)
+        results = yield c.run(asyncinc, 2, delay=0.01)
         assert results == {a.address: 3, b.address: 3}
 
 
 def test_run_coroutine_sync(c, s, a, b):
-    result = c.run_coroutine(geninc, 2, delay=0.01)
+    result = c.run(geninc, 2, delay=0.01)
     assert result == {a['address']: 3,
                       b['address']: 3}
 
-    result = c.run_coroutine(geninc, 2,
-                             workers=[a['address']])
+    result = c.run(geninc, 2, workers=[a['address']])
     assert result == {a['address']: 3}
 
     t1 = time()
-    result = c.run_coroutine(geninc, 2, delay=10, wait=False)
+    result = c.run(geninc, 2, delay=10, wait=False)
     t2 = time()
     assert result is None
     assert t2 - t1 <= 1.0
@@ -4561,7 +4560,6 @@
         assert time() < start + 1
 
 
-@pytest.mark.xfail(reason='Other tests bleed into the logs of this one')
 def test_quiet_client_close(loop):
     with captured_logger(logging.getLogger('distributed')) as logger:
         with Client(loop=loop, processes=False, threads_per_worker=4) as c:
@@ -4573,8 +4571,12 @@
         lines = out.strip().split('\n')
         assert len(lines) <= 2
         for line in lines:
-            assert not line or 'Reconnecting' in line or set(line) == {'-'}
-        # assert not out
+            assert (
+                not line or
+                'Reconnecting' in line or
+                'garbage' in line or
+                set(line) == {'-'}
+            ), line
 
 
 @gen_cluster()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_nanny.py 
new/distributed-1.25.2/distributed/tests/test_nanny.py
--- old/distributed-1.25.1/distributed/tests/test_nanny.py      2018-11-28 
00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/tests/test_nanny.py      2019-01-03 
18:30:24.000000000 +0100
@@ -333,3 +333,13 @@
     log = log.getvalue()
     assert 'error' not in log.lower(), log
     assert 'restart' not in log.lower(), log
+
+
+@gen_cluster(ncores=[], client=True)
+def test_environment_variable(c, s):
+    a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"})
+    b = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "456"})
+    yield [a._start(), b._start()]
+    results = yield c.run(lambda: os.environ['FOO'])
+    assert results == {a.worker_address: "123", b.worker_address: "456"}
+    yield [a._close(), b._close()]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-1.25.1/distributed/tests/test_resources.py 
new/distributed-1.25.2/distributed/tests/test_resources.py
--- old/distributed-1.25.1/distributed/tests/test_resources.py  2018-11-28 
00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/tests/test_resources.py  2019-01-03 
18:30:24.000000000 +0100
@@ -262,6 +262,7 @@
         assert 'executing' in str(a.story(key))
 
 
+@pytest.mark.xfail(reason="atop fusion seemed to break this")
 @gen_cluster(client=True, ncores=[('127.0.0.1', 1, {'resources': {'A': 1}}),
                                   ('127.0.0.1', 1, {'resources': {'B': 1}})])
 def test_full_collections(c, s, a, b):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/tests/test_worker.py 
new/distributed-1.25.2/distributed/tests/test_worker.py
--- old/distributed-1.25.1/distributed/tests/test_worker.py     2018-11-28 
00:45:41.000000000 +0100
+++ new/distributed-1.25.2/distributed/tests/test_worker.py     2019-01-03 
18:30:24.000000000 +0100
@@ -399,7 +399,7 @@
         yield gen.sleep(0.001)
         raise gen.Return(dask_worker.id)
 
-    response = yield c._run_coroutine(f)
+    response = yield c.run(f)
     assert response == {a.address: a.id, b.address: b.id}
 
 
@@ -866,7 +866,7 @@
         result = yield future
         raise gen.Return(result)
 
-    results = yield c.run_coroutine(f)
+    results = yield c.run(f)
     assert results == {a.address: 11,
                        b.address: 11}
 
@@ -879,7 +879,7 @@
         result = yield future
         raise gen.Return(result)
 
-    results = client.run_coroutine(f)
+    results = client.run(f)
     assert results == {a['address']: 11,
                        b['address']: 11}
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/threadpoolexecutor.py 
new/distributed-1.25.2/distributed/threadpoolexecutor.py
--- old/distributed-1.25.1/distributed/threadpoolexecutor.py    2018-12-15 
00:02:02.000000000 +0100
+++ new/distributed-1.25.2/distributed/threadpoolexecutor.py    2019-01-03 
18:30:24.000000000 +0100
@@ -75,11 +75,12 @@
         super(ThreadPoolExecutor, self).__init__(*args, **kwargs)
         self._rejoin_list = []
         self._rejoin_lock = threading.Lock()
+        self._thread_name_prefix = kwargs.get('thread_name_prefix', 
'DaskThreadPoolExecutor')
 
     def _adjust_thread_count(self):
         if len(self._threads) < self._max_workers:
             t = threading.Thread(target=_worker,
-                                 name="ThreadPoolExecutor-%d-%d" % 
(os.getpid(), next(self._counter)),
+                                 name=self._thread_name_prefix + "-%d-%d" % 
(os.getpid(), next(self._counter)),
                                  args=(self, self._work_queue))
             t.daemon = True
             self._threads.add(t)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed/worker.py 
new/distributed-1.25.2/distributed/worker.py
--- old/distributed-1.25.1/distributed/worker.py        2018-12-15 
00:02:02.000000000 +0100
+++ new/distributed-1.25.2/distributed/worker.py        2019-01-03 
18:30:24.000000000 +0100
@@ -424,8 +424,8 @@
         self.status = None
         self._closed = Event()
         self.reconnect = reconnect
-        self.executor = executor or ThreadPoolExecutor(self.ncores)
-        self.actor_executor = ThreadPoolExecutor(1)
+        self.executor = executor or ThreadPoolExecutor(self.ncores, 
thread_name_prefix="Dask-Worker-Threads'")
+        self.actor_executor = ThreadPoolExecutor(1, 
thread_name_prefix="Dask-Actor-Threads")
         self.name = name
         self.scheduler_delay = 0
         self.stream_comms = dict()
@@ -1594,8 +1594,10 @@
                 if self.validate:
                     self.validate_state()
 
+                # dep states may have changed before gather_dep runs
+                # if a dep is no longer in-flight then don't fetch it
                 deps = tuple(dep for dep in deps
-                                 if self.dep_state.get(dep) in ('waiting', 
'flight'))
+                                 if self.dep_state.get(dep) == 'flight')
 
                 self.log.append(('request-dep', dep, worker, deps))
                 logger.debug("Request %d keys", len(deps))
@@ -1960,13 +1962,14 @@
         # logger.info("Finish job %d, %s", i, key)
         raise gen.Return(result)
 
-    def run(self, comm, function, args=(), kwargs=None):
+    def run(self, comm, function, args=(), wait=True, kwargs=None):
         kwargs = kwargs or {}
-        return run(self, comm, function=function, args=args, kwargs=kwargs)
+        return run(self, comm, function=function, args=args, kwargs=kwargs,
+                   wait=wait)
 
     def run_coroutine(self, comm, function, args=(), kwargs=None, wait=True):
         return run(self, comm, function=function, args=args, kwargs=kwargs,
-                   is_coro=True, wait=wait)
+                   wait=wait)
 
     @gen.coroutine
     def actor_execute(self, comm=None, actor=None, function=None, args=(), 
kwargs={}):
@@ -2915,9 +2918,14 @@
 
 
 @gen.coroutine
-def run(server, comm, function, args=(), kwargs={}, is_coro=False, wait=True):
-    assert wait or is_coro, "Combination not supported"
+def run(server, comm, function, args=(), kwargs={}, is_coro=None, wait=True):
     function = pickle.loads(function)
+    if is_coro is None:
+        is_coro = iscoroutinefunction(function)
+    else:
+        warnings.warn("The is_coro= parameter is deprecated. "
+                      "We now automatically detect coroutines/async functions")
+    assert wait or is_coro, "Combination not supported"
     if args:
         args = pickle.loads(args)
     if kwargs:
@@ -2928,9 +2936,15 @@
         kwargs['dask_scheduler'] = server
     logger.info("Run out-of-band function %r", funcname(function))
     try:
-        result = function(*args, **kwargs)
-        if is_coro:
-            result = (yield result) if wait else None
+        if not is_coro:
+            result = function(*args, **kwargs)
+        else:
+            if wait:
+                result = yield function(*args, **kwargs)
+            else:
+                server.loop.add_callback(function, *args, **kwargs)
+                result = None
+
     except Exception as e:
         logger.warning(" Run Failed\n"
                        "Function: %s\n"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/distributed.egg-info/PKG-INFO 
new/distributed-1.25.2/distributed.egg-info/PKG-INFO
--- old/distributed-1.25.1/distributed.egg-info/PKG-INFO        2018-12-15 
17:59:09.000000000 +0100
+++ new/distributed-1.25.2/distributed.egg-info/PKG-INFO        2019-01-04 
23:14:53.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 1.25.1
+Version: 1.25.2
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.readthedocs.io/en/latest/
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-1.25.1/docs/source/changelog.rst 
new/distributed-1.25.2/docs/source/changelog.rst
--- old/distributed-1.25.1/docs/source/changelog.rst    2018-12-15 
17:57:26.000000000 +0100
+++ new/distributed-1.25.2/docs/source/changelog.rst    2019-01-04 
23:13:19.000000000 +0100
@@ -1,6 +1,21 @@
 Changelog
 =========
 
+1.25.2 - 2019-01-04
+-------------------
+
+-  Clean up LocalCluster logging better in async mode (:pr:`2448`) `Matthew 
Rocklin`_
+-  Add short error message if bokeh cannot be imported (:pr:`2444`) `Dirk 
Petersen`_
+-  Add optional environment variables to Nanny (:pr:`2431`) `Matthew Rocklin`_
+-  Make the direct keyword docstring entries uniform (:pr:`2441`) `Matthew 
Rocklin`_
+-  Make LocalCluster.close async friendly (:pr:`2437`) `Matthew Rocklin`_
+-  gather_dep: don't request dependencies we already found out we don't want 
(:pr:`2428`) `tjb900`_
+-  Add parameters to Client.run docstring (:pr:`2429`) `Matthew Rocklin`_
+-  Support coroutines and async-def functions in run/run_scheduler 
(:pr:`2427`) `Matthew Rocklin`_
+-  Name threads in ThreadPoolExecutors (:pr:`2408`) `Matthew Rocklin`_
+
+
+
 1.25.1 - 2018-12-15
 -------------------
 
@@ -880,3 +895,5 @@
 .. _`Diane Trout`: https://github.com/detrout
 .. _`tjb900`: https://github.com/tjb900
 .. _`Stephan Hoyer`: https://github.com/shoyer
+.. _`tjb900`: https://github.com/tjb900
+.. _`Dirk Petersen`: https://github.com/dirkpetersen


Reply via email to