Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2019-11-26 17:02:21
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.26869 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Tue Nov 26 17:02:21 2019 rev:20 rq:750861 version:2.8.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2019-11-17 19:23:27.694858205 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.26869/python-distributed.changes
 2019-11-26 17:02:55.692036843 +0100
@@ -1,0 +2,18 @@
+Sun Nov 24 17:36:02 UTC 2019 - Arun Persaud <a...@gmx.de>
+
+- update to version 2.8.1:
+  * Fix hanging worker when the scheduler leaves (GH#3250) Tom
+    Augspurger
+  * Fix NumPy writeable serialization bug (GH#3253) James Bourbeau
+  * Skip numba.cuda tests if CUDA is not available (GH#3255) Peter
+    Andreas Entschev
+  * Add new dashboard plot for memory use by key (GH#3243) Matthew
+    Rocklin
+  * Fix array.shape() -> array.shape (GH#3247) Jed Brown
+  * Fixed typos in pubsub.py (GH#3244) He Jia
+  * Fixed cupy array going out of scope (GH#3240) Mads
+    R. B. Kristensen
+  * Remove gen.coroutine usage in scheduler (GH#3242) Jim Crist-Harif
+  * Use inspect.isawaitable where relevant (GH#3241) Jim Crist-Harif
+
+-------------------------------------------------------------------

Old:
----
  distributed-2.8.0.tar.gz

New:
----
  distributed-2.8.1.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.o1ESYF/_old  2019-11-26 17:02:57.904036079 +0100
+++ /var/tmp/diff_new_pack.o1ESYF/_new  2019-11-26 17:02:57.932036068 +0100
@@ -1,7 +1,7 @@
 #
 # spec file for package python-distributed
 #
-# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2019 SUSE LLC
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -21,7 +21,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        2.8.0
+Version:        2.8.1
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-2.8.0.tar.gz -> distributed-2.8.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/PKG-INFO 
new/distributed-2.8.1/PKG-INFO
--- old/distributed-2.8.0/PKG-INFO      2019-11-14 23:59:02.000000000 +0100
+++ new/distributed-2.8.1/PKG-INFO      2019-11-23 05:48:31.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.8.0
+Version: 2.8.1
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/_version.py 
new/distributed-2.8.1/distributed/_version.py
--- old/distributed-2.8.0/distributed/_version.py       2019-11-14 
23:59:02.000000000 +0100
+++ new/distributed-2.8.1/distributed/_version.py       2019-11-23 
05:48:31.000000000 +0100
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2019-11-14T14:58:28-0800",
+ "date": "2019-11-22T22:46:55-0600",
  "dirty": false,
  "error": null,
- "full-revisionid": "4d0d58aade4460fab6e7e85a3548353671036d2c",
- "version": "2.8.0"
+ "full-revisionid": "507659d79434845e50d48c247ff42d5efd336686",
+ "version": "2.8.1"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/core.py 
new/distributed-2.8.1/distributed/core.py
--- old/distributed-2.8.0/distributed/core.py   2019-11-14 19:11:20.000000000 
+0100
+++ new/distributed-2.8.1/distributed/core.py   2019-11-19 17:18:47.000000000 
+0100
@@ -2,6 +2,7 @@
 from collections import defaultdict, deque
 from concurrent.futures import CancelledError
 from functools import partial
+from inspect import isawaitable
 import logging
 import threading
 import traceback
@@ -397,7 +398,7 @@
                     logger.debug("Calling into handler %s", handler.__name__)
                     try:
                         result = handler(comm, **msg)
-                        if hasattr(result, "__await__"):
+                        if isawaitable(result):
                             result = asyncio.ensure_future(result)
                             self._ongoing_coroutines.add(result)
                             result = await result
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.8.0/distributed/dashboard/components/scheduler.py 
new/distributed-2.8.1/distributed/dashboard/components/scheduler.py
--- old/distributed-2.8.0/distributed/dashboard/components/scheduler.py 
2019-11-14 23:49:45.000000000 +0100
+++ new/distributed-2.8.1/distributed/dashboard/components/scheduler.py 
2019-11-20 20:03:05.000000000 +0100
@@ -1,3 +1,4 @@
+from collections import defaultdict
 import logging
 import math
 from numbers import Number
@@ -36,7 +37,7 @@
 from bokeh.transform import factor_cmap, linear_cmap
 from bokeh.io import curdoc
 import dask
-from dask.utils import format_bytes
+from dask.utils import format_bytes, key_split
 from toolz import pipe
 from tornado import escape
 
@@ -428,6 +429,82 @@
             update(self.source, result)
 
 
+class MemoryByKey(DashboardComponent):
+    """ Bar chart showing memory use by key prefix"""
+
+    def __init__(self, scheduler, **kwargs):
+        with log_errors():
+            self.last = 0
+            self.scheduler = scheduler
+            self.source = ColumnDataSource(
+                {
+                    "name": ["a", "b"],
+                    "nbytes": [100, 1000],
+                    "count": [1, 2],
+                    "color": ["blue", "blue"],
+                }
+            )
+
+            fig = figure(
+                title="Memory Use",
+                tools="",
+                id="bk-memory-by-key-plot",
+                name="memory_by_key",
+                x_range=["a", "b"],
+                **kwargs,
+            )
+            rect = fig.vbar(
+                source=self.source, x="name", top="nbytes", width=0.9, 
color="color"
+            )
+            fig.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
+            fig.xaxis.major_label_orientation = -math.pi / 12
+            rect.nonselection_glyph = None
+
+            fig.xaxis.minor_tick_line_alpha = 0
+            fig.ygrid.visible = False
+
+            fig.toolbar.logo = None
+            fig.toolbar_location = None
+
+            hover = HoverTool()
+            hover.tooltips = "@name: @nbytes_text"
+            hover.tooltips = """
+            <div>
+                <p><b>Name:</b> @name</p>
+                <p><b>Bytes:</b> @nbytes_text </p>
+                <p><b>Count:</b> @count objects </p>
+            </div>
+            """
+            hover.point_policy = "follow_mouse"
+            fig.add_tools(hover)
+
+            self.fig = fig
+
+    @without_property_validation
+    def update(self):
+        with log_errors():
+            counts = defaultdict(int)
+            nbytes = defaultdict(int)
+            for ws in self.scheduler.workers.values():
+                for ts in ws.has_what:
+                    ks = key_split(ts.key)
+                    counts[ks] += 1
+                    nbytes[ks] += ts.nbytes
+
+            names = list(sorted(counts))
+            self.fig.x_range.factors = names
+            result = {
+                "name": names,
+                "count": [counts[name] for name in names],
+                "nbytes": [nbytes[name] for name in names],
+                "nbytes_text": [format_bytes(nbytes[name]) for name in names],
+                "color": [color_of(name) for name in names],
+            }
+            self.fig.title.text = "Total Use: " + 
format_bytes(sum(nbytes.values()))
+
+            update(self.source, result)
+
+
 class CurrentLoad(DashboardComponent):
     """ How many tasks are on each worker """
 
@@ -1865,6 +1942,15 @@
         doc.theme = BOKEH_THEME
 
 
+def individual_memory_by_key_doc(scheduler, extra, doc):
+    with log_errors():
+        component = MemoryByKey(scheduler, sizing_mode="stretch_both")
+        component.update()
+        add_periodic_callback(doc, component, 500)
+        doc.add_root(component.fig)
+        doc.theme = BOKEH_THEME
+
+
 def profile_doc(scheduler, extra, doc):
     with log_errors():
         doc.title = "Dask: Profile"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/dashboard/scheduler.py 
new/distributed-2.8.1/distributed/dashboard/scheduler.py
--- old/distributed-2.8.0/distributed/dashboard/scheduler.py    2019-11-14 
19:11:20.000000000 +0100
+++ new/distributed-2.8.1/distributed/dashboard/scheduler.py    2019-11-20 
20:03:05.000000000 +0100
@@ -36,6 +36,7 @@
     individual_workers_doc,
     individual_bandwidth_types_doc,
     individual_bandwidth_workers_doc,
+    individual_memory_by_key_doc,
 )
 from .core import BokehServer
 from .worker import counters_doc
@@ -408,6 +409,7 @@
     "/individual-workers": individual_workers_doc,
     "/individual-bandwidth-types": individual_bandwidth_types_doc,
     "/individual-bandwidth-workers": individual_bandwidth_workers_doc,
+    "/individual-memory-by-key": individual_memory_by_key_doc,
 }
 
 try:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.8.0/distributed/dashboard/tests/test_scheduler_bokeh.py 
new/distributed-2.8.1/distributed/dashboard/tests/test_scheduler_bokeh.py
--- old/distributed-2.8.0/distributed/dashboard/tests/test_scheduler_bokeh.py   
2019-11-12 21:02:54.000000000 +0100
+++ new/distributed-2.8.1/distributed/dashboard/tests/test_scheduler_bokeh.py   
2019-11-20 20:03:05.000000000 +0100
@@ -11,6 +11,7 @@
 from tornado import gen
 from tornado.httpclient import AsyncHTTPClient, HTTPRequest
 
+import dask
 from dask.core import flatten
 from distributed.utils import tokey, format_dashboard_link
 from distributed.client import wait
@@ -34,6 +35,7 @@
     WorkerTable,
     TaskGraph,
     ProfileServer,
+    MemoryByKey,
 )
 
 from distributed.dashboard import scheduler
@@ -690,3 +692,20 @@
         body = response.body.decode()
         assert "bokeh" in body.lower()
         assert not re.search("href=./", body)  # no absolute links
+
+
+@gen_cluster(
+    client=True, scheduler_kwargs={"services": {("dashboard", 0): 
BokehScheduler}}
+)
+async def test_memory_by_key(c, s, a, b):
+    mbk = MemoryByKey(s)
+
+    da = pytest.importorskip("dask.array")
+    x = (da.random.random((20, 20), chunks=(10, 10)) + 
1).persist(optimize_graph=False)
+    await x
+
+    y = await dask.delayed(inc)(1).persist()
+
+    mbk.update()
+    assert mbk.source.data["name"] == ["add", "inc"]
+    assert mbk.source.data["nbytes"] == [x.nbytes, sys.getsizeof(1)]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/deploy/adaptive.py 
new/distributed-2.8.1/distributed/deploy/adaptive.py
--- old/distributed-2.8.0/distributed/deploy/adaptive.py        2019-11-12 
21:02:54.000000000 +0100
+++ new/distributed-2.8.1/distributed/deploy/adaptive.py        2019-11-19 
17:18:47.000000000 +0100
@@ -1,3 +1,4 @@
+from inspect import isawaitable
 import logging
 import math
 
@@ -158,7 +159,7 @@
             # close workers more forcefully
             logger.info("Retiring workers %s", workers)
             f = self.cluster.scale_down(workers)
-            if hasattr(f, "__await__"):
+            if isawaitable(f):
                 await f
 
     async def scale_up(self, n):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/cupy.py 
new/distributed-2.8.1/distributed/protocol/cupy.py
--- old/distributed-2.8.0/distributed/protocol/cupy.py  2019-11-14 
19:11:20.000000000 +0100
+++ new/distributed-2.8.1/distributed/protocol/cupy.py  2019-11-19 
17:18:47.000000000 +0100
@@ -6,14 +6,34 @@
 
 
 class PatchedCudaArrayInterface(object):
-    # TODO: This class wont be necessary
-    #       once Cupy<7.0 is no longer supported
+    """This class do two things:
+        1) Makes sure that __cuda_array_interface__['strides']
+           behaves as specified in the protocol.
+        2) Makes sure that the cuda context is active
+           when deallocating the base cuda array.
+        Notice, this is only needed when the array to deserialize
+        isn't a native cupy array.
+    """
+
     def __init__(self, ary):
         cai = ary.__cuda_array_interface__
         cai_cupy_vsn = cupy.ndarray(0).__cuda_array_interface__["version"]
         if cai.get("strides") is None and cai_cupy_vsn < 2:
             cai.pop("strides", None)
         self.__cuda_array_interface__ = cai
+        # Save a ref to ary so it won't go out of scope
+        self.base = ary
+
+    def __del__(self):
+        # Making sure that the cuda context is active
+        # when deallocating the base cuda array
+        try:
+            import numba.cuda
+
+            numba.cuda.current_context()
+        except ImportError:
+            pass
+        del self.base
 
 
 @cuda_serialize.register(cupy.ndarray)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/protocol/numpy.py 
new/distributed-2.8.1/distributed/protocol/numpy.py
--- old/distributed-2.8.0/distributed/protocol/numpy.py 2019-11-14 
19:11:20.000000000 +0100
+++ new/distributed-2.8.1/distributed/protocol/numpy.py 2019-11-23 
05:33:46.000000000 +0100
@@ -46,7 +46,7 @@
 
     # Only serialize non-broadcasted data for arrays with zero strided axes
     if 0 in x.strides:
-        broadcast_to = (x.shape, x.flags.writeable)
+        broadcast_to = x.shape
         x = x[tuple(slice(None) if s != 0 else slice(1) for s in x.strides)]
     else:
         broadcast_to = None
@@ -103,14 +103,12 @@
         else:
             dt = np.dtype(dt)
 
-        x = np.ndarray(
-            header["shape"], dtype=dt, buffer=frames[0], 
strides=header["strides"]
-        )
-
         if header.get("broadcast_to"):
-            shape, writeable = header["broadcast_to"]
-            x = np.broadcast_to(x, shape)
-            x.setflags(write=writeable)
+            shape = header["broadcast_to"]
+        else:
+            shape = header["shape"]
+
+        x = np.ndarray(shape, dtype=dt, buffer=frames[0], 
strides=header["strides"])
 
         return x
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.8.0/distributed/protocol/tests/test_numba.py 
new/distributed-2.8.1/distributed/protocol/tests/test_numba.py
--- old/distributed-2.8.0/distributed/protocol/tests/test_numba.py      
2019-11-12 21:02:54.000000000 +0100
+++ new/distributed-2.8.1/distributed/protocol/tests/test_numba.py      
2019-11-21 19:57:09.000000000 +0100
@@ -7,6 +7,9 @@
 
 @pytest.mark.parametrize("dtype", ["u1", "u4", "u8", "f4"])
 def test_serialize_cupy(dtype):
+    if not cuda.is_available():
+        pytest.skip("CUDA is not available")
+
     ary = np.arange(100, dtype=dtype)
     x = cuda.to_device(ary)
     header, frames = serialize(x, serializers=("cuda", "dask", "pickle"))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.8.0/distributed/protocol/tests/test_numpy.py 
new/distributed-2.8.1/distributed/protocol/tests/test_numpy.py
--- old/distributed-2.8.0/distributed/protocol/tests/test_numpy.py      
2019-11-14 19:11:20.000000000 +0100
+++ new/distributed-2.8.1/distributed/protocol/tests/test_numpy.py      
2019-11-23 05:33:46.000000000 +0100
@@ -288,3 +288,22 @@
     header, frames = serialize(x)
     assert "broadcast_to" not in header
     assert sum(map(nbytes, frames)) == x.nbytes
+
+
+def test_serialize_writeable_array_readonly_base_object():
+    # Regression test for https://github.com/dask/distributed/issues/3252
+
+    x = np.arange(3)
+    # Create array which doesn't own it's own memory
+    y = np.broadcast_to(x, (3, 3))
+
+    # Make y writeable and it's base object (x) read-only
+    y.setflags(write=True)
+    x.setflags(write=False)
+
+    # Serialize / deserialize y
+    z = deserialize(*serialize(y))
+    np.testing.assert_equal(z, y)
+
+    # Ensure z and y have the same flags (including WRITEABLE)
+    assert z.flags == y.flags
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/pubsub.py 
new/distributed-2.8.1/distributed/pubsub.py
--- old/distributed-2.8.0/distributed/pubsub.py 2019-11-14 19:11:20.000000000 
+0100
+++ new/distributed-2.8.1/distributed/pubsub.py 2019-11-19 17:18:47.000000000 
+0100
@@ -74,7 +74,7 @@
 
     def remove_subscriber(self, comm=None, name=None, worker=None, 
client=None):
         if worker:
-            logger.debug("Add worker subscriber: %s %s", name, worker)
+            logger.debug("Remove worker subscriber: %s %s", name, worker)
             self.subscribers[name].remove(worker)
             for pub in self.publishers[name]:
                 self.scheduler.worker_send(
@@ -82,7 +82,7 @@
                     {"op": "pubsub-remove-subscriber", "address": worker, 
"name": name},
                 )
         elif client:
-            logger.debug("Add client subscriber: %s %s", name, client)
+            logger.debug("Remove client subscriber: %s %s", name, client)
             self.client_subscribers[name].remove(client)
             if not self.client_subscribers[name]:
                 del self.client_subscribers[name]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/scheduler.py 
new/distributed-2.8.1/distributed/scheduler.py
--- old/distributed-2.8.0/distributed/scheduler.py      2019-11-14 
19:11:20.000000000 +0100
+++ new/distributed-2.8.1/distributed/scheduler.py      2019-11-19 
17:18:47.000000000 +0100
@@ -3,6 +3,7 @@
 from collections.abc import Mapping, Set
 from datetime import timedelta
 from functools import partial
+from inspect import isawaitable
 import itertools
 import json
 import logging
@@ -24,7 +25,6 @@
     from toolz import frequencies, merge, pluck, merge_sorted, first
 from toolz import valmap, second, compose, groupby
 from tornado import gen
-from tornado.gen import Return
 from tornado.ioloop import IOLoop
 
 import dask
@@ -2007,8 +2007,8 @@
             return
         if ts is None or not ts.who_wants:  # no key yet, lets try again in a 
moment
             if retries:
-                self.loop.add_future(
-                    gen.sleep(0.2), lambda _: self.cancel_key(key, client, 
retries - 1)
+                self.loop.call_later(
+                    0.2, lambda: self.cancel_key(key, client, retries - 1)
                 )
             return
         if force or ts.who_wants == {cs}:  # no one else wants this key
@@ -2700,8 +2700,7 @@
         )
         return d[worker]
 
-    @gen.coroutine
-    def rebalance(self, comm=None, keys=None, workers=None):
+    async def rebalance(self, comm=None, keys=None, workers=None):
         """ Rebalance keys so that each worker stores roughly equal bytes
 
         **Policy**
@@ -2777,9 +2776,9 @@
                 to_recipients[recipient.address][ts.key].append(sender.address)
                 to_senders[sender.address].append(ts.key)
 
-            result = yield {
-                r: self.rpc(addr=r).gather(who_has=v) for r, v in 
to_recipients.items()
-            }
+            result = await asyncio.gather(
+                *(self.rpc(addr=r).gather(who_has=v) for r, v in 
to_recipients.items())
+            )
             for r, v in to_recipients.items():
                 self.log_event(r, {"action": "rebalance", "who_has": v})
 
@@ -2794,13 +2793,11 @@
                 },
             )
 
-            if not all(r["status"] == "OK" for r in result.values()):
-                raise Return(
-                    {
-                        "status": "missing-data",
-                        "keys": sum([r["keys"] for r in result if "keys" in 
r], []),
-                    }
-                )
+            if not all(r["status"] == "OK" for r in result):
+                return {
+                    "status": "missing-data",
+                    "keys": sum([r["keys"] for r in result if "keys" in r], 
[]),
+                }
 
             for sender, recipient, ts in msgs:
                 assert ts.state == "memory"
@@ -2811,20 +2808,21 @@
                     ("rebalance", ts.key, time(), sender.address, 
recipient.address)
                 )
 
-            result = yield {
-                r: self.rpc(addr=r).delete_data(keys=v, report=False)
-                for r, v in to_senders.items()
-            }
+            await asyncio.gather(
+                *(
+                    self.rpc(addr=r).delete_data(keys=v, report=False)
+                    for r, v in to_senders.items()
+                )
+            )
 
             for sender, recipient, ts in msgs:
                 ts.who_has.remove(sender)
                 sender.has_what.remove(ts)
                 sender.nbytes -= ts.get_nbytes()
 
-            raise Return({"status": "OK"})
+            return {"status": "OK"}
 
-    @gen.coroutine
-    def replicate(
+    async def replicate(
         self,
         comm=None,
         keys=None,
@@ -2867,7 +2865,7 @@
         tasks = {self.tasks[k] for k in keys}
         missing_data = [ts.key for ts in tasks if not ts.who_has]
         if missing_data:
-            raise Return({"status": "missing-data", "keys": missing_data})
+            return {"status": "missing-data", "keys": missing_data}
 
         # Delete extraneous data
         if delete:
@@ -2878,12 +2876,14 @@
                     for ws in random.sample(del_candidates, 
len(del_candidates) - n):
                         del_worker_tasks[ws].add(ts)
 
-            yield [
-                self.rpc(addr=ws.address).delete_data(
-                    keys=[ts.key for ts in tasks], report=False
+            await asyncio.gather(
+                *(
+                    self.rpc(addr=ws.address).delete_data(
+                        keys=[ts.key for ts in tasks], report=False
+                    )
+                    for ws, tasks in del_worker_tasks.items()
                 )
-                for ws, tasks in del_worker_tasks.items()
-            ]
+            )
 
             for ws, tasks in del_worker_tasks.items():
                 ws.has_what -= tasks
@@ -2911,11 +2911,13 @@
                 for ws in random.sample(workers - ts.who_has, count):
                     gathers[ws.address][ts.key] = [wws.address for wws in 
ts.who_has]
 
-            results = yield {
-                w: self.rpc(addr=w).gather(who_has=who_has)
-                for w, who_has in gathers.items()
-            }
-            for w, v in results.items():
+            results = await asyncio.gather(
+                *(
+                    self.rpc(addr=w).gather(who_has=who_has)
+                    for w, who_has in gathers.items()
+                )
+            )
+            for w, v in zip(gathers, results):
                 if v["status"] == "OK":
                     self.add_keys(worker=w, keys=list(gathers[w]))
                 else:
@@ -3283,7 +3285,7 @@
             if teardown:
                 teardown = pickle.loads(teardown)
             state = setup(self) if setup else None
-            if hasattr(state, "__await__"):
+            if isawaitable(state):
                 state = await state
             try:
                 while self.status == "running":
@@ -3348,8 +3350,7 @@
         else:
             return {w: ws.nthreads for w, ws in self.workers.items()}
 
-    @gen.coroutine
-    def get_call_stack(self, comm=None, keys=None):
+    async def get_call_stack(self, comm=None, keys=None):
         if keys is not None:
             stack = list(keys)
             processing = set()
@@ -3369,14 +3370,13 @@
             workers = {w: None for w in self.workers}
 
         if not workers:
-            raise gen.Return({})
+            return {}
 
-        else:
-            response = yield {
-                w: self.rpc(w).call_stack(keys=v) for w, v in workers.items()
-            }
-            response = {k: v for k, v in response.items() if v}
-            raise gen.Return(response)
+        results = await asyncio.gather(
+            *(self.rpc(w).call_stack(keys=v) for w, v in workers.items())
+        )
+        response = {w: r for w, r in zip(workers, results) if r}
+        return response
 
     def get_nbytes(self, comm=None, keys=None, summary=True):
         with log_errors():
@@ -4613,8 +4613,7 @@
         else:
             return (start_time, ws.nbytes)
 
-    @gen.coroutine
-    def get_profile(
+    async def get_profile(
         self,
         comm=None,
         workers=None,
@@ -4627,15 +4626,17 @@
             workers = self.workers
         else:
             workers = set(self.workers) & set(workers)
-        result = yield {
-            w: self.rpc(w).profile(start=start, stop=stop, key=key) for w in 
workers
-        }
+        results = await asyncio.gather(
+            *(self.rpc(w).profile(start=start, stop=stop, key=key) for w in 
workers)
+        )
+
         if merge_workers:
-            result = profile.merge(*result.values())
-        raise gen.Return(result)
+            response = profile.merge(*results)
+        else:
+            response = dict(zip(workers, results))
+        return response
 
-    @gen.coroutine
-    def get_profile_metadata(
+    async def get_profile_metadata(
         self,
         comm=None,
         workers=None,
@@ -4653,22 +4654,22 @@
             workers = self.workers
         else:
             workers = set(self.workers) & set(workers)
-        result = yield {
-            w: self.rpc(w).profile_metadata(start=start, stop=stop) for w in 
workers
-        }
+        results = await asyncio.gather(
+            *(self.rpc(w).profile_metadata(start=start, stop=stop) for w in 
workers)
+        )
 
-        counts = [v["counts"] for v in result.values()]
+        counts = [v["counts"] for v in results]
         counts = itertools.groupby(merge_sorted(*counts), lambda t: t[0] // dt 
* dt)
         counts = [(time, sum(pluck(1, group))) for time, group in counts]
 
         keys = set()
-        for v in result.values():
+        for v in results:
             for t, d in v["keys"]:
                 for k in d:
                     keys.add(k)
         keys = {k: [] for k in keys}
 
-        groups1 = [v["keys"] for v in result.values()]
+        groups1 = [v["keys"] for v in results]
         groups2 = list(merge_sorted(*groups1, key=first))
 
         last = 0
@@ -4681,7 +4682,7 @@
             for k, v in d.items():
                 keys[k][-1][1] += v
 
-        raise gen.Return({"counts": counts, "keys": keys})
+        return {"counts": counts, "keys": keys}
 
     async def get_worker_logs(self, comm=None, n=None, workers=None, 
nanny=False):
         results = await self.broadcast(
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/tests/test_nanny.py 
new/distributed-2.8.1/distributed/tests/test_nanny.py
--- old/distributed-2.8.0/distributed/tests/test_nanny.py       2019-11-12 
21:02:54.000000000 +0100
+++ new/distributed-2.8.1/distributed/tests/test_nanny.py       2019-11-23 
05:33:46.000000000 +0100
@@ -1,3 +1,4 @@
+import asyncio
 import gc
 import logging
 import os
@@ -130,6 +131,20 @@
 
 
 @pytest.mark.slow
+@gen_cluster(config={"distributed.comm.timeouts.connect": "1s"})
+async def test_no_hang_when_scheduler_closes(s, a, b):
+    # https://github.com/dask/distributed/issues/2880
+    with captured_logger("tornado.application", logging.ERROR) as logger:
+        await s.close()
+        await asyncio.sleep(1.2)
+        assert a.status == "closed"
+        assert b.status == "closed"
+
+    out = logger.getvalue()
+    assert "Timed out trying to connect" not in out
+
+
+@pytest.mark.slow
 @gen_cluster(
     Worker=Nanny, nthreads=[("127.0.0.1", 1)], worker_kwargs={"reconnect": 
False}
 )
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed/worker.py 
new/distributed-2.8.1/distributed/worker.py
--- old/distributed-2.8.0/distributed/worker.py 2019-11-14 19:11:20.000000000 
+0100
+++ new/distributed-2.8.1/distributed/worker.py 2019-11-23 05:33:46.000000000 
+0100
@@ -4,6 +4,7 @@
 from collections.abc import MutableMapping
 from datetime import timedelta
 import heapq
+from inspect import isawaitable
 import logging
 import os
 from pickle import PicklingError
@@ -748,7 +749,7 @@
         for k, metric in self.metrics.items():
             try:
                 result = metric(self)
-                if hasattr(result, "__await__"):
+                if isawaitable(result):
                     result = await result
                 custom[k] = result
             except Exception:  # TODO: log error once
@@ -761,7 +762,7 @@
         for k, f in self.startup_information.items():
             try:
                 v = f(self)
-                if hasattr(v, "__await__"):
+                if isawaitable(v):
                     v = await v
                 result[k] = v
             except Exception:  # TODO: log error once
@@ -881,6 +882,12 @@
                 )
                 self.bandwidth_workers.clear()
                 self.bandwidth_types.clear()
+            except IOError as e:
+                # Scheduler is gone. Respect distributed.comm.timeouts.connect
+                if "Timed out trying to connect" in str(e):
+                    await self.close(report=False)
+                else:
+                    raise e
             except CommClosedError:
                 logger.warning("Heartbeat to scheduler failed")
             finally:
@@ -1057,7 +1064,7 @@
                 if hasattr(plugin, "teardown")
             ]
 
-            await asyncio.gather(*[td for td in teardowns if hasattr(td, 
"__await__")])
+            await asyncio.gather(*[td for td in teardowns if isawaitable(td)])
 
             for pc in self.periodic_callbacks.values():
                 pc.stop()
@@ -2301,7 +2308,7 @@
                 if hasattr(plugin, "setup"):
                     try:
                         result = plugin.setup(worker=self)
-                        if hasattr(result, "__await__"):
+                        if isawaitable(result):
                             result = await result
                     except Exception as e:
                         msg = error_message(e)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/distributed.egg-info/PKG-INFO 
new/distributed-2.8.1/distributed.egg-info/PKG-INFO
--- old/distributed-2.8.0/distributed.egg-info/PKG-INFO 2019-11-14 
23:59:01.000000000 +0100
+++ new/distributed-2.8.1/distributed.egg-info/PKG-INFO 2019-11-23 
05:48:30.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.8.0
+Version: 2.8.1
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/docs/source/changelog.rst 
new/distributed-2.8.1/docs/source/changelog.rst
--- old/distributed-2.8.0/docs/source/changelog.rst     2019-11-14 
23:51:50.000000000 +0100
+++ new/distributed-2.8.1/docs/source/changelog.rst     2019-11-23 
05:44:44.000000000 +0100
@@ -1,6 +1,20 @@
 Changelog
 =========
 
+2.8.1 - 2019-11-22
+------------------
+
+- Fix hanging worker when the scheduler leaves (:pr:`3250`) `Tom Augspurger`_
+- Fix NumPy writeable serialization bug (:pr:`3253`) `James Bourbeau`_
+- Skip ``numba.cuda`` tests if CUDA is not available (:pr:`3255`) `Peter 
Andreas Entschev`_
+- Add new dashboard plot for memory use by key (:pr:`3243`) `Matthew Rocklin`_
+- Fix ``array.shape()`` -> ``array.shape`` (:pr:`3247`) `Jed Brown`_
+- Fixed typos in ``pubsub.py`` (:pr:`3244`) `He Jia`_
+- Fixed cupy array going out of scope (:pr:`3240`) `Mads R. B. Kristensen`_
+- Remove ``gen.coroutine`` usage in scheduler (:pr:`3242`) `Jim Crist-Harif`_
+- Use ``inspect.isawaitable`` where relevant (:pr:`3241`) `Jim Crist-Harif`_
+
+
 2.8.0 - 2019-11-14
 ------------------
 
@@ -1391,3 +1405,6 @@
 .. _`IPetrik`: https://github.com/IPetrik
 .. _`Simon Boothroyd`: https://github.com/SimonBoothroyd
 .. _`rockwellw`: https://github.com/rockwellw
+.. _`Jed Brown`: https://github.com/jedbrown
+.. _`He Jia`: https://github.com/HerculesJack
+.. _`Jim Crist-Harif`: https://github.com/jcrist
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.8.0/docs/source/efficiency.rst 
new/distributed-2.8.1/docs/source/efficiency.rst
--- old/distributed-2.8.0/docs/source/efficiency.rst    2019-05-29 
19:29:52.000000000 +0200
+++ new/distributed-2.8.1/docs/source/efficiency.rst    2019-11-19 
17:18:47.000000000 +0100
@@ -31,7 +31,7 @@
 
 .. code-block:: python
 
-   >>> x.result().shape()  # Slow from lots of data transfer
+   >>> x.result().shape  # Slow from lots of data transfer
    (1000, 1000)
 
 **Fast**


Reply via email to