Hello community, here is the log from the commit of package python-distributed for openSUSE:Factory checked in at 2020-12-01 14:22:42 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-distributed (Old) and /work/SRC/openSUSE:Factory/.python-distributed.new.5913 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-distributed" Tue Dec 1 14:22:42 2020 rev:38 rq:851839 version:2.30.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes 2020-10-25 18:06:35.759343920 +0100 +++ /work/SRC/openSUSE:Factory/.python-distributed.new.5913/python-distributed.changes 2020-12-01 14:22:58.793605863 +0100 @@ -1,0 +2,10 @@ +Sat Nov 21 22:35:43 UTC 2020 - Arun Persaud <a...@gmx.de> + +- update to version 2.30.1: + * Pin pytest-asyncio version (GH#4212) James Bourbeau + * Replace AsyncProcess exit handler by weakref.finalize (GH#4184) + Peter Andreas Entschev + * Remove hard coded connect handshake timeouts (GH#4176) Florian + Jetter + +------------------------------------------------------------------- Old: ---- distributed-2.30.0.tar.gz New: ---- distributed-2.30.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-distributed.spec ++++++ --- /var/tmp/diff_new_pack.OSExks/_old 2020-12-01 14:22:59.577606711 +0100 +++ /var/tmp/diff_new_pack.OSExks/_new 2020-12-01 14:22:59.581606716 +0100 @@ -21,7 +21,7 @@ # Test requires network connection %bcond_with test Name: python-distributed -Version: 2.30.0 +Version: 2.30.1 Release: 0 Summary: Library for distributed computing with Python License: BSD-3-Clause ++++++ distributed-2.30.0.tar.gz -> distributed-2.30.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/PKG-INFO new/distributed-2.30.1/PKG-INFO --- old/distributed-2.30.0/PKG-INFO 2020-10-07 00:36:08.340993600 +0200 +++ new/distributed-2.30.1/PKG-INFO 2020-11-04 04:39:51.990585000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.30.0 +Version: 2.30.1 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/_version.py new/distributed-2.30.1/distributed/_version.py --- old/distributed-2.30.0/distributed/_version.py 2020-10-07 00:36:08.349047400 +0200 +++ new/distributed-2.30.1/distributed/_version.py 2020-11-04 04:39:51.991822700 +0100 @@ -8,11 +8,11 @@ version_json = ''' { - "date": "2020-10-06T17:35:34-0500", + "date": "2020-11-03T21:38:02-0600", "dirty": false, "error": null, - "full-revisionid": "a1dc5f437b39c1b35a9b05cbc048e3a793b89715", - "version": "2.30.0" + "full-revisionid": "4c238e6b104e259c99efeff417daf1bfd6722be8", + "version": "2.30.1" } ''' # END VERSION_JSON diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/comm/core.py new/distributed-2.30.1/distributed/comm/core.py --- old/distributed-2.30.0/distributed/comm/core.py 2020-10-03 01:12:36.000000000 +0200 +++ new/distributed-2.30.1/distributed/comm/core.py 2020-11-04 04:32:37.000000000 +0100 @@ -213,9 +213,15 @@ async def on_connection(self, comm: Comm, handshake_overrides=None): local_info = {**comm.handshake_info(), **(handshake_overrides or {})} + + timeout = dask.config.get("distributed.comm.timeouts.connect") + timeout = parse_timedelta(timeout, default="seconds") try: - write = await asyncio.wait_for(comm.write(local_info), 1) - handshake = await asyncio.wait_for(comm.read(), 1) + # Timeout is to ensure that we'll terminate connections eventually. + # Connector side will employ smaller timeouts and we should only + # reach this if the comm is dead anyhow. + write = await asyncio.wait_for(comm.write(local_info), timeout=timeout) + handshake = await asyncio.wait_for(comm.read(), timeout=timeout) # This would be better, but connections leak if worker is closed quickly # write, handshake = await asyncio.gather(comm.write(local_info), comm.read()) except Exception as e: @@ -262,79 +268,71 @@ comm = None start = time() - deadline = start + timeout - error = None - - def _raise(error): - error = error or "connect() didn't finish in time" - msg = "Timed out trying to connect to %r after %s s: %s" % ( - addr, - timeout, - error, - ) - raise IOError(msg) - - backoff = 0.01 - if timeout and timeout / 20 < backoff: - backoff = timeout / 20 - retry_timeout_backoff = random.randrange(140, 160) / 100 - - # This starts a thread - while True: + def time_left(): + deadline = start + timeout + return max(0, deadline - time()) + + backoff_base = 0.01 + attempt = 0 + + # Prefer multiple small attempts than one long attempt. This should protect + # primarily from DNS race conditions + # gh3104, gh4176, gh4167 + intermediate_cap = timeout / 5 + active_exception = None + while time_left() > 0: try: - while deadline - time() > 0: - - async def _(): - comm = await connector.connect( - loc, deserialize=deserialize, **connection_args - ) - local_info = { - **comm.handshake_info(), - **(handshake_overrides or {}), - } - try: - handshake = await asyncio.wait_for(comm.read(), 1) - write = await asyncio.wait_for(comm.write(local_info), 1) - # This would be better, but connections leak if worker is closed quickly - # write, handshake = await asyncio.gather(comm.write(local_info), comm.read()) - except Exception as e: - with suppress(Exception): - await comm.close() - raise CommClosedError() from e - - comm.remote_info = handshake - comm.remote_info["address"] = comm._peer_addr - comm.local_info = local_info - comm.local_info["address"] = comm._local_addr - - comm.handshake_options = comm.handshake_configuration( - comm.local_info, comm.remote_info - ) - return comm - - with suppress(TimeoutError): - comm = await asyncio.wait_for( - _(), timeout=min(deadline - time(), retry_timeout_backoff) - ) - break - if not comm: - _raise(error) + comm = await asyncio.wait_for( + connector.connect(loc, deserialize=deserialize, **connection_args), + timeout=min(intermediate_cap, time_left()), + ) + break except FatalCommClosedError: raise - except EnvironmentError as e: - error = str(e) - if time() < deadline: - logger.debug("Could not connect, waiting before retrying") - await asyncio.sleep(backoff) - backoff *= random.randrange(140, 160) / 100 - retry_timeout_backoff *= random.randrange(140, 160) / 100 - backoff = min(backoff, 1) # wait at most one second - else: - _raise(error) - else: - break - + # CommClosed, EnvironmentError inherit from OSError + except (TimeoutError, OSError) as exc: + active_exception = exc + + # The intermediate capping is mostly relevant for the initial + # connect. Afterwards we should be more forgiving + intermediate_cap = intermediate_cap * 1.5 + # FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + + upper_cap = min(time_left(), backoff_base * (2 ** attempt)) + backoff = random.uniform(0, upper_cap) + attempt += 1 + logger.debug("Could not connect, waiting for %s before retrying", backoff) + await asyncio.sleep(backoff) + else: + raise IOError( + f"Timed out trying to connect to {addr} after {timeout} s" + ) from active_exception + + local_info = { + **comm.handshake_info(), + **(handshake_overrides or {}), + } + try: + # This would be better, but connections leak if worker is closed quickly + # write, handshake = await asyncio.gather(comm.write(local_info), comm.read()) + handshake = await asyncio.wait_for(comm.read(), time_left()) + await asyncio.wait_for(comm.write(local_info), time_left()) + except Exception as exc: + with suppress(Exception): + await comm.close() + raise IOError( + f"Timed out during handshake while connecting to {addr} after {timeout} s" + ) from exc + + comm.remote_info = handshake + comm.remote_info["address"] = comm._peer_addr + comm.local_info = local_info + comm.local_info["address"] = comm._local_addr + + comm.handshake_options = comm.handshake_configuration( + comm.local_info, comm.remote_info + ) return comm diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/comm/tcp.py new/distributed-2.30.1/distributed/comm/tcp.py --- old/distributed-2.30.0/distributed/comm/tcp.py 2020-09-16 04:37:04.000000000 +0200 +++ new/distributed-2.30.1/distributed/comm/tcp.py 2020-11-04 04:32:37.000000000 +0100 @@ -1,6 +1,7 @@ import errno import logging import socket +from ssl import SSLError import struct import sys from tornado import gen @@ -349,7 +350,6 @@ stream = await self.client.connect( ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs ) - # Under certain circumstances tornado will have a closed connnection with an error and not raise # a StreamClosedError. # @@ -360,6 +360,8 @@ except StreamClosedError as e: # The socket connect() call failed convert_stream_closed_error(self, e) + except SSLError as err: + raise FatalCommClosedError() from err local_address = self.prefix + get_stream_address(stream) comm = self.comm_class( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/comm/tests/test_comms.py new/distributed-2.30.1/distributed/comm/tests/test_comms.py --- old/distributed-2.30.0/distributed/comm/tests/test_comms.py 2020-09-18 22:52:00.000000000 +0200 +++ new/distributed-2.30.1/distributed/comm/tests/test_comms.py 2020-11-04 04:32:37.000000000 +0100 @@ -1,45 +1,42 @@ import asyncio -import types -from functools import partial import os import sys import threading +import types import warnings +from functools import partial +import distributed import pkg_resources import pytest - -from tornado import ioloop -from tornado.concurrent import Future - -import distributed -from distributed.metrics import time -from distributed.utils import get_ip, get_ipv6 -from distributed.utils_test import ( - requires_ipv6, - has_ipv6, - get_cert, - get_server_ssl_context, - get_client_ssl_context, -) -from distributed.utils_test import loop # noqa: F401 - -from distributed.protocol import to_serialize, Serialized, serialize, deserialize - -from distributed.comm.registry import backends, get_backend from distributed.comm import ( - tcp, - inproc, + CommClosedError, connect, + get_address_host, + get_local_address_for, + inproc, listen, - CommClosedError, parse_address, parse_host_port, - unparse_host_port, resolve_address, - get_address_host, - get_local_address_for, + tcp, + unparse_host_port, +) +from distributed.comm.registry import backends, get_backend +from distributed.comm.tcp import TCP, TCPBackend, TCPConnector +from distributed.metrics import time +from distributed.protocol import Serialized, deserialize, serialize, to_serialize +from distributed.utils import get_ip, get_ipv6 +from distributed.utils_test import loop # noqa: F401 +from distributed.utils_test import ( + get_cert, + get_client_ssl_context, + get_server_ssl_context, + has_ipv6, + requires_ipv6, ) +from tornado import ioloop +from tornado.concurrent import Future EXTERNAL_IP4 = get_ip() if has_ipv6(): @@ -218,7 +215,7 @@ await comm.write(msg) await comm.close() - listener = await tcp.TCPListener("localhost", handle_comm) + listener = await tcp.TCPListener("127.0.0.1", handle_comm) host, port = listener.get_host_port() assert host in ("localhost", "127.0.0.1", "::1") assert port > 0 @@ -264,7 +261,7 @@ server_ctx = get_server_ssl_context() client_ctx = get_client_ssl_context() - listener = await tcp.TLSListener("localhost", handle_comm, ssl_context=server_ctx) + listener = await tcp.TLSListener("127.0.0.1", handle_comm, ssl_context=server_ctx) host, port = listener.get_host_port() assert host in ("localhost", "127.0.0.1", "::1") assert port > 0 @@ -665,7 +662,8 @@ with pytest.raises(EnvironmentError) as excinfo: await connect(listener.contact_address, timeout=2, ssl_context=cli_ctx) - assert "certificate verify failed" in str(excinfo.value) + + assert "certificate verify failed" in str(excinfo.value.__cause__) # @@ -797,6 +795,88 @@ # +async def echo(comm): + message = await comm.read() + await comm.write(message) + + +@pytest.mark.asyncio +async def test_retry_connect(monkeypatch): + async def echo(comm): + message = await comm.read() + await comm.write(message) + + class UnreliableConnector(TCPConnector): + def __init__(self): + + self.num_failures = 2 + self.failures = 0 + super().__init__() + + async def connect(self, address, deserialize=True, **connection_args): + if self.failures > self.num_failures: + return await super().connect(address, deserialize, **connection_args) + else: + self.failures += 1 + raise IOError() + + class UnreliableBackend(TCPBackend): + _connector_class = UnreliableConnector + + monkeypatch.setitem(backends, "tcp", UnreliableBackend()) + + listener = await listen("tcp://127.0.0.1:1234", echo) + try: + comm = await connect(listener.contact_address) + await comm.write(b"test") + msg = await comm.read() + assert msg == b"test" + finally: + listener.stop() + + +@pytest.mark.asyncio +async def test_handshake_slow_comm(monkeypatch): + class SlowComm(TCP): + def __init__(self, *args, delay_in_comm=0.5, **kwargs): + super().__init__(*args, **kwargs) + self.delay_in_comm = delay_in_comm + + async def read(self, *args, **kwargs): + await asyncio.sleep(self.delay_in_comm) + return await super().read(*args, **kwargs) + + async def write(self, *args, **kwargs): + await asyncio.sleep(self.delay_in_comm) + res = await super(type(self), self).write(*args, **kwargs) + return res + + class SlowConnector(TCPConnector): + comm_class = SlowComm + + class SlowBackend(TCPBackend): + _connector_class = SlowConnector + + monkeypatch.setitem(backends, "tcp", SlowBackend()) + + listener = await listen("tcp://127.0.0.1:1234", echo) + try: + comm = await connect(listener.contact_address) + await comm.write(b"test") + msg = await comm.read() + assert msg == b"test" + + import dask + + with dask.config.set({"distributed.comm.timeouts.connect": "100ms"}): + with pytest.raises( + IOError, match="Timed out during handshake while connecting to" + ): + await connect(listener.contact_address) + finally: + listener.stop() + + async def check_connect_timeout(addr): t1 = time() with pytest.raises(IOError): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/process.py new/distributed-2.30.1/distributed/process.py --- old/distributed-2.30.0/distributed/process.py 2020-09-18 22:52:00.000000000 +0200 +++ new/distributed-2.30.1/distributed/process.py 2020-11-04 04:32:37.000000000 +0100 @@ -1,4 +1,3 @@ -import atexit import logging import os from queue import Queue as PyQueue @@ -81,8 +80,10 @@ dask.config.global_config, ), ) - _dangling.add(self._process) self._name = self._process.name + self._proc_finalizer = weakref.finalize( + self, _asyncprocess_finalizer, self._process + ) self._watch_q = PyQueue() self._exit_future = Future() self._exit_callback = None @@ -118,8 +119,8 @@ # We don't join the thread here as a finalizer can be called # asynchronously from anywhere - self._finalizer = weakref.finalize(self, stop_thread, q=self._watch_q) - self._finalizer.atexit = False + self._thread_finalizer = weakref.finalize(self, stop_thread, q=self._watch_q) + self._thread_finalizer.atexit = False def _on_exit(self, exitcode): # Called from the event loop when the child process exited @@ -292,7 +293,7 @@ immediately and does not ensure the child process has exited. """ if not self._closed: - self._finalizer() + self._thread_finalizer() self._process = None self._closed = True @@ -334,15 +335,10 @@ self._process.daemon = value -_dangling = weakref.WeakSet() - - -@atexit.register -def _cleanup_dangling(): - for proc in list(_dangling): - if proc.is_alive(): - try: - logger.info("reaping stray process %s" % (proc,)) - proc.terminate() - except OSError: - pass +def _asyncprocess_finalizer(proc): + if proc.is_alive(): + try: + logger.info("reaping stray process %s" % (proc,)) + proc.terminate() + except OSError: + pass diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed/utils_test.py new/distributed-2.30.1/distributed/utils_test.py --- old/distributed-2.30.0/distributed/utils_test.py 2020-09-18 22:52:01.000000000 +0200 +++ new/distributed-2.30.1/distributed/utils_test.py 2020-11-04 04:32:37.000000000 +0100 @@ -44,7 +44,6 @@ from .core import connect, rpc, CommClosedError, Status from .deploy import SpecCluster from .metrics import time -from .process import _cleanup_dangling from .proctitle import enable_proctitle_on_children from .security import Security from .utils import ( @@ -1451,7 +1450,6 @@ else: assert not mp_context.active_children() - _cleanup_dangling() for proc in mp_context.active_children(): proc.terminate() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/distributed.egg-info/PKG-INFO new/distributed-2.30.1/distributed.egg-info/PKG-INFO --- old/distributed-2.30.0/distributed.egg-info/PKG-INFO 2020-10-07 00:36:07.000000000 +0200 +++ new/distributed-2.30.1/distributed.egg-info/PKG-INFO 2020-11-04 04:39:51.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: distributed -Version: 2.30.0 +Version: 2.30.1 Summary: Distributed scheduler for Dask Home-page: https://distributed.dask.org Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/distributed-2.30.0/docs/source/changelog.rst new/distributed-2.30.1/docs/source/changelog.rst --- old/distributed-2.30.0/docs/source/changelog.rst 2020-10-07 00:34:03.000000000 +0200 +++ new/distributed-2.30.1/docs/source/changelog.rst 2020-11-04 04:33:50.000000000 +0100 @@ -1,6 +1,14 @@ Changelog ========= +2.30.1 - 2020-11-03 +------------------- + +- Pin ``pytest-asyncio`` version (:pr:`4212`) `James Bourbeau`_ +- Replace ``AsyncProcess`` exit handler by ``weakref.finalize`` (:pr:`4184`) `Peter Andreas Entschev`_ +- Remove hard coded connect handshake timeouts (:pr:`4176`) `Florian Jetter`_ + + 2.30.0 - 2020-10-06 ------------------- _______________________________________________ openSUSE Commits mailing list -- commit@lists.opensuse.org To unsubscribe, email commit-le...@lists.opensuse.org List Netiquette: https://en.opensuse.org/openSUSE:Mailing_list_netiquette List Archives: https://lists.opensuse.org/archives/list/commit@lists.opensuse.org