Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2020-12-01 14:22:42
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.5913 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Tue Dec  1 14:22:42 2020 rev:38 rq:851839 version:2.30.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2020-10-25 18:06:35.759343920 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.5913/python-distributed.changes
  2020-12-01 14:22:58.793605863 +0100
@@ -1,0 +2,10 @@
+Sat Nov 21 22:35:43 UTC 2020 - Arun Persaud <a...@gmx.de>
+
+- update to version 2.30.1:
+  * Pin pytest-asyncio version (GH#4212) James Bourbeau
+  * Replace AsyncProcess exit handler by weakref.finalize (GH#4184)
+    Peter Andreas Entschev
+  * Remove hard coded connect handshake timeouts (GH#4176) Florian
+    Jetter
+
+-------------------------------------------------------------------

Old:
----
  distributed-2.30.0.tar.gz

New:
----
  distributed-2.30.1.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.OSExks/_old  2020-12-01 14:22:59.577606711 +0100
+++ /var/tmp/diff_new_pack.OSExks/_new  2020-12-01 14:22:59.581606716 +0100
@@ -21,7 +21,7 @@
 # Test requires network connection
 %bcond_with     test
 Name:           python-distributed
-Version:        2.30.0
+Version:        2.30.1
 Release:        0
 Summary:        Library for distributed computing with Python
 License:        BSD-3-Clause

++++++ distributed-2.30.0.tar.gz -> distributed-2.30.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/PKG-INFO 
new/distributed-2.30.1/PKG-INFO
--- old/distributed-2.30.0/PKG-INFO     2020-10-07 00:36:08.340993600 +0200
+++ new/distributed-2.30.1/PKG-INFO     2020-11-04 04:39:51.990585000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.30.0
+Version: 2.30.1
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed/_version.py 
new/distributed-2.30.1/distributed/_version.py
--- old/distributed-2.30.0/distributed/_version.py      2020-10-07 
00:36:08.349047400 +0200
+++ new/distributed-2.30.1/distributed/_version.py      2020-11-04 
04:39:51.991822700 +0100
@@ -8,11 +8,11 @@
 
 version_json = '''
 {
- "date": "2020-10-06T17:35:34-0500",
+ "date": "2020-11-03T21:38:02-0600",
  "dirty": false,
  "error": null,
- "full-revisionid": "a1dc5f437b39c1b35a9b05cbc048e3a793b89715",
- "version": "2.30.0"
+ "full-revisionid": "4c238e6b104e259c99efeff417daf1bfd6722be8",
+ "version": "2.30.1"
 }
 '''  # END VERSION_JSON
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed/comm/core.py 
new/distributed-2.30.1/distributed/comm/core.py
--- old/distributed-2.30.0/distributed/comm/core.py     2020-10-03 
01:12:36.000000000 +0200
+++ new/distributed-2.30.1/distributed/comm/core.py     2020-11-04 
04:32:37.000000000 +0100
@@ -213,9 +213,15 @@
 
     async def on_connection(self, comm: Comm, handshake_overrides=None):
         local_info = {**comm.handshake_info(), **(handshake_overrides or {})}
+
+        timeout = dask.config.get("distributed.comm.timeouts.connect")
+        timeout = parse_timedelta(timeout, default="seconds")
         try:
-            write = await asyncio.wait_for(comm.write(local_info), 1)
-            handshake = await asyncio.wait_for(comm.read(), 1)
+            # Timeout is to ensure that we'll terminate connections eventually.
+            # Connector side will employ smaller timeouts and we should only
+            # reach this if the comm is dead anyhow.
+            write = await asyncio.wait_for(comm.write(local_info), 
timeout=timeout)
+            handshake = await asyncio.wait_for(comm.read(), timeout=timeout)
             # This would be better, but connections leak if worker is closed 
quickly
             # write, handshake = await asyncio.gather(comm.write(local_info), 
comm.read())
         except Exception as e:
@@ -262,79 +268,71 @@
     comm = None
 
     start = time()
-    deadline = start + timeout
-    error = None
-
-    def _raise(error):
-        error = error or "connect() didn't finish in time"
-        msg = "Timed out trying to connect to %r after %s s: %s" % (
-            addr,
-            timeout,
-            error,
-        )
-        raise IOError(msg)
-
-    backoff = 0.01
-    if timeout and timeout / 20 < backoff:
-        backoff = timeout / 20
 
-    retry_timeout_backoff = random.randrange(140, 160) / 100
-
-    # This starts a thread
-    while True:
+    def time_left():
+        deadline = start + timeout
+        return max(0, deadline - time())
+
+    backoff_base = 0.01
+    attempt = 0
+
+    # Prefer multiple small attempts than one long attempt. This should protect
+    # primarily from DNS race conditions
+    # gh3104, gh4176, gh4167
+    intermediate_cap = timeout / 5
+    active_exception = None
+    while time_left() > 0:
         try:
-            while deadline - time() > 0:
-
-                async def _():
-                    comm = await connector.connect(
-                        loc, deserialize=deserialize, **connection_args
-                    )
-                    local_info = {
-                        **comm.handshake_info(),
-                        **(handshake_overrides or {}),
-                    }
-                    try:
-                        handshake = await asyncio.wait_for(comm.read(), 1)
-                        write = await asyncio.wait_for(comm.write(local_info), 
1)
-                        # This would be better, but connections leak if worker 
is closed quickly
-                        # write, handshake = await 
asyncio.gather(comm.write(local_info), comm.read())
-                    except Exception as e:
-                        with suppress(Exception):
-                            await comm.close()
-                        raise CommClosedError() from e
-
-                    comm.remote_info = handshake
-                    comm.remote_info["address"] = comm._peer_addr
-                    comm.local_info = local_info
-                    comm.local_info["address"] = comm._local_addr
-
-                    comm.handshake_options = comm.handshake_configuration(
-                        comm.local_info, comm.remote_info
-                    )
-                    return comm
-
-                with suppress(TimeoutError):
-                    comm = await asyncio.wait_for(
-                        _(), timeout=min(deadline - time(), 
retry_timeout_backoff)
-                    )
-                    break
-            if not comm:
-                _raise(error)
+            comm = await asyncio.wait_for(
+                connector.connect(loc, deserialize=deserialize, 
**connection_args),
+                timeout=min(intermediate_cap, time_left()),
+            )
+            break
         except FatalCommClosedError:
             raise
-        except EnvironmentError as e:
-            error = str(e)
-            if time() < deadline:
-                logger.debug("Could not connect, waiting before retrying")
-                await asyncio.sleep(backoff)
-                backoff *= random.randrange(140, 160) / 100
-                retry_timeout_backoff *= random.randrange(140, 160) / 100
-                backoff = min(backoff, 1)  # wait at most one second
-            else:
-                _raise(error)
-        else:
-            break
-
+        # CommClosed, EnvironmentError inherit from OSError
+        except (TimeoutError, OSError) as exc:
+            active_exception = exc
+
+            # The intermediate capping is mostly relevant for the initial
+            # connect. Afterwards we should be more forgiving
+            intermediate_cap = intermediate_cap * 1.5
+            # FullJitter see 
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
+
+            upper_cap = min(time_left(), backoff_base * (2 ** attempt))
+            backoff = random.uniform(0, upper_cap)
+            attempt += 1
+            logger.debug("Could not connect, waiting for %s before retrying", 
backoff)
+            await asyncio.sleep(backoff)
+    else:
+        raise IOError(
+            f"Timed out trying to connect to {addr} after {timeout} s"
+        ) from active_exception
+
+    local_info = {
+        **comm.handshake_info(),
+        **(handshake_overrides or {}),
+    }
+    try:
+        # This would be better, but connections leak if worker is closed 
quickly
+        # write, handshake = await asyncio.gather(comm.write(local_info), 
comm.read())
+        handshake = await asyncio.wait_for(comm.read(), time_left())
+        await asyncio.wait_for(comm.write(local_info), time_left())
+    except Exception as exc:
+        with suppress(Exception):
+            await comm.close()
+        raise IOError(
+            f"Timed out during handshake while connecting to {addr} after 
{timeout} s"
+        ) from exc
+
+    comm.remote_info = handshake
+    comm.remote_info["address"] = comm._peer_addr
+    comm.local_info = local_info
+    comm.local_info["address"] = comm._local_addr
+
+    comm.handshake_options = comm.handshake_configuration(
+        comm.local_info, comm.remote_info
+    )
     return comm
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed/comm/tcp.py 
new/distributed-2.30.1/distributed/comm/tcp.py
--- old/distributed-2.30.0/distributed/comm/tcp.py      2020-09-16 
04:37:04.000000000 +0200
+++ new/distributed-2.30.1/distributed/comm/tcp.py      2020-11-04 
04:32:37.000000000 +0100
@@ -1,6 +1,7 @@
 import errno
 import logging
 import socket
+from ssl import SSLError
 import struct
 import sys
 from tornado import gen
@@ -349,7 +350,6 @@
             stream = await self.client.connect(
                 ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
             )
-
             # Under certain circumstances tornado will have a closed 
connnection with an error and not raise
             # a StreamClosedError.
             #
@@ -360,6 +360,8 @@
         except StreamClosedError as e:
             # The socket connect() call failed
             convert_stream_closed_error(self, e)
+        except SSLError as err:
+            raise FatalCommClosedError() from err
 
         local_address = self.prefix + get_stream_address(stream)
         comm = self.comm_class(
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/distributed-2.30.0/distributed/comm/tests/test_comms.py 
new/distributed-2.30.1/distributed/comm/tests/test_comms.py
--- old/distributed-2.30.0/distributed/comm/tests/test_comms.py 2020-09-18 
22:52:00.000000000 +0200
+++ new/distributed-2.30.1/distributed/comm/tests/test_comms.py 2020-11-04 
04:32:37.000000000 +0100
@@ -1,45 +1,42 @@
 import asyncio
-import types
-from functools import partial
 import os
 import sys
 import threading
+import types
 import warnings
+from functools import partial
 
+import distributed
 import pkg_resources
 import pytest
-
-from tornado import ioloop
-from tornado.concurrent import Future
-
-import distributed
-from distributed.metrics import time
-from distributed.utils import get_ip, get_ipv6
-from distributed.utils_test import (
-    requires_ipv6,
-    has_ipv6,
-    get_cert,
-    get_server_ssl_context,
-    get_client_ssl_context,
-)
-from distributed.utils_test import loop  # noqa: F401
-
-from distributed.protocol import to_serialize, Serialized, serialize, 
deserialize
-
-from distributed.comm.registry import backends, get_backend
 from distributed.comm import (
-    tcp,
-    inproc,
+    CommClosedError,
     connect,
+    get_address_host,
+    get_local_address_for,
+    inproc,
     listen,
-    CommClosedError,
     parse_address,
     parse_host_port,
-    unparse_host_port,
     resolve_address,
-    get_address_host,
-    get_local_address_for,
+    tcp,
+    unparse_host_port,
+)
+from distributed.comm.registry import backends, get_backend
+from distributed.comm.tcp import TCP, TCPBackend, TCPConnector
+from distributed.metrics import time
+from distributed.protocol import Serialized, deserialize, serialize, 
to_serialize
+from distributed.utils import get_ip, get_ipv6
+from distributed.utils_test import loop  # noqa: F401
+from distributed.utils_test import (
+    get_cert,
+    get_client_ssl_context,
+    get_server_ssl_context,
+    has_ipv6,
+    requires_ipv6,
 )
+from tornado import ioloop
+from tornado.concurrent import Future
 
 EXTERNAL_IP4 = get_ip()
 if has_ipv6():
@@ -218,7 +215,7 @@
         await comm.write(msg)
         await comm.close()
 
-    listener = await tcp.TCPListener("localhost", handle_comm)
+    listener = await tcp.TCPListener("127.0.0.1", handle_comm)
     host, port = listener.get_host_port()
     assert host in ("localhost", "127.0.0.1", "::1")
     assert port > 0
@@ -264,7 +261,7 @@
     server_ctx = get_server_ssl_context()
     client_ctx = get_client_ssl_context()
 
-    listener = await tcp.TLSListener("localhost", handle_comm, 
ssl_context=server_ctx)
+    listener = await tcp.TLSListener("127.0.0.1", handle_comm, 
ssl_context=server_ctx)
     host, port = listener.get_host_port()
     assert host in ("localhost", "127.0.0.1", "::1")
     assert port > 0
@@ -665,7 +662,8 @@
 
     with pytest.raises(EnvironmentError) as excinfo:
         await connect(listener.contact_address, timeout=2, ssl_context=cli_ctx)
-    assert "certificate verify failed" in str(excinfo.value)
+
+    assert "certificate verify failed" in str(excinfo.value.__cause__)
 
 
 #
@@ -797,6 +795,88 @@
 #
 
 
+async def echo(comm):
+    message = await comm.read()
+    await comm.write(message)
+
+
+@pytest.mark.asyncio
+async def test_retry_connect(monkeypatch):
+    async def echo(comm):
+        message = await comm.read()
+        await comm.write(message)
+
+    class UnreliableConnector(TCPConnector):
+        def __init__(self):
+
+            self.num_failures = 2
+            self.failures = 0
+            super().__init__()
+
+        async def connect(self, address, deserialize=True, **connection_args):
+            if self.failures > self.num_failures:
+                return await super().connect(address, deserialize, 
**connection_args)
+            else:
+                self.failures += 1
+                raise IOError()
+
+    class UnreliableBackend(TCPBackend):
+        _connector_class = UnreliableConnector
+
+    monkeypatch.setitem(backends, "tcp", UnreliableBackend())
+
+    listener = await listen("tcp://127.0.0.1:1234", echo)
+    try:
+        comm = await connect(listener.contact_address)
+        await comm.write(b"test")
+        msg = await comm.read()
+        assert msg == b"test"
+    finally:
+        listener.stop()
+
+
+@pytest.mark.asyncio
+async def test_handshake_slow_comm(monkeypatch):
+    class SlowComm(TCP):
+        def __init__(self, *args, delay_in_comm=0.5, **kwargs):
+            super().__init__(*args, **kwargs)
+            self.delay_in_comm = delay_in_comm
+
+        async def read(self, *args, **kwargs):
+            await asyncio.sleep(self.delay_in_comm)
+            return await super().read(*args, **kwargs)
+
+        async def write(self, *args, **kwargs):
+            await asyncio.sleep(self.delay_in_comm)
+            res = await super(type(self), self).write(*args, **kwargs)
+            return res
+
+    class SlowConnector(TCPConnector):
+        comm_class = SlowComm
+
+    class SlowBackend(TCPBackend):
+        _connector_class = SlowConnector
+
+    monkeypatch.setitem(backends, "tcp", SlowBackend())
+
+    listener = await listen("tcp://127.0.0.1:1234", echo)
+    try:
+        comm = await connect(listener.contact_address)
+        await comm.write(b"test")
+        msg = await comm.read()
+        assert msg == b"test"
+
+        import dask
+
+        with dask.config.set({"distributed.comm.timeouts.connect": "100ms"}):
+            with pytest.raises(
+                IOError, match="Timed out during handshake while connecting to"
+            ):
+                await connect(listener.contact_address)
+    finally:
+        listener.stop()
+
+
 async def check_connect_timeout(addr):
     t1 = time()
     with pytest.raises(IOError):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed/process.py 
new/distributed-2.30.1/distributed/process.py
--- old/distributed-2.30.0/distributed/process.py       2020-09-18 
22:52:00.000000000 +0200
+++ new/distributed-2.30.1/distributed/process.py       2020-11-04 
04:32:37.000000000 +0100
@@ -1,4 +1,3 @@
-import atexit
 import logging
 import os
 from queue import Queue as PyQueue
@@ -81,8 +80,10 @@
                 dask.config.global_config,
             ),
         )
-        _dangling.add(self._process)
         self._name = self._process.name
+        self._proc_finalizer = weakref.finalize(
+            self, _asyncprocess_finalizer, self._process
+        )
         self._watch_q = PyQueue()
         self._exit_future = Future()
         self._exit_callback = None
@@ -118,8 +119,8 @@
             # We don't join the thread here as a finalizer can be called
             # asynchronously from anywhere
 
-        self._finalizer = weakref.finalize(self, stop_thread, q=self._watch_q)
-        self._finalizer.atexit = False
+        self._thread_finalizer = weakref.finalize(self, stop_thread, 
q=self._watch_q)
+        self._thread_finalizer.atexit = False
 
     def _on_exit(self, exitcode):
         # Called from the event loop when the child process exited
@@ -292,7 +293,7 @@
         immediately and does not ensure the child process has exited.
         """
         if not self._closed:
-            self._finalizer()
+            self._thread_finalizer()
             self._process = None
             self._closed = True
 
@@ -334,15 +335,10 @@
         self._process.daemon = value
 
 
-_dangling = weakref.WeakSet()
-
-
-@atexit.register
-def _cleanup_dangling():
-    for proc in list(_dangling):
-        if proc.is_alive():
-            try:
-                logger.info("reaping stray process %s" % (proc,))
-                proc.terminate()
-            except OSError:
-                pass
+def _asyncprocess_finalizer(proc):
+    if proc.is_alive():
+        try:
+            logger.info("reaping stray process %s" % (proc,))
+            proc.terminate()
+        except OSError:
+            pass
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed/utils_test.py 
new/distributed-2.30.1/distributed/utils_test.py
--- old/distributed-2.30.0/distributed/utils_test.py    2020-09-18 
22:52:01.000000000 +0200
+++ new/distributed-2.30.1/distributed/utils_test.py    2020-11-04 
04:32:37.000000000 +0100
@@ -44,7 +44,6 @@
 from .core import connect, rpc, CommClosedError, Status
 from .deploy import SpecCluster
 from .metrics import time
-from .process import _cleanup_dangling
 from .proctitle import enable_proctitle_on_children
 from .security import Security
 from .utils import (
@@ -1451,7 +1450,6 @@
         else:
             assert not mp_context.active_children()
 
-    _cleanup_dangling()
     for proc in mp_context.active_children():
         proc.terminate()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/distributed.egg-info/PKG-INFO 
new/distributed-2.30.1/distributed.egg-info/PKG-INFO
--- old/distributed-2.30.0/distributed.egg-info/PKG-INFO        2020-10-07 
00:36:07.000000000 +0200
+++ new/distributed-2.30.1/distributed.egg-info/PKG-INFO        2020-11-04 
04:39:51.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: distributed
-Version: 2.30.0
+Version: 2.30.1
 Summary: Distributed scheduler for Dask
 Home-page: https://distributed.dask.org
 Maintainer: Matthew Rocklin
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/distributed-2.30.0/docs/source/changelog.rst 
new/distributed-2.30.1/docs/source/changelog.rst
--- old/distributed-2.30.0/docs/source/changelog.rst    2020-10-07 
00:34:03.000000000 +0200
+++ new/distributed-2.30.1/docs/source/changelog.rst    2020-11-04 
04:33:50.000000000 +0100
@@ -1,6 +1,14 @@
 Changelog
 =========
 
+2.30.1 - 2020-11-03
+-------------------
+
+- Pin ``pytest-asyncio`` version (:pr:`4212`) `James Bourbeau`_
+- Replace ``AsyncProcess`` exit handler by ``weakref.finalize`` (:pr:`4184`) 
`Peter Andreas Entschev`_
+- Remove hard coded connect handshake timeouts (:pr:`4176`) `Florian Jetter`_
+
+
 2.30.0 - 2020-10-06
 -------------------
 
_______________________________________________
openSUSE Commits mailing list -- commit@lists.opensuse.org
To unsubscribe, email commit-le...@lists.opensuse.org
List Netiquette: https://en.opensuse.org/openSUSE:Mailing_list_netiquette
List Archives: 
https://lists.opensuse.org/archives/list/commit@lists.opensuse.org

Reply via email to