On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote: > When importing from vddk, nbdcopy may be blocked for few minutes(!) > trying to get extents. While nbdcopy is blocked, imageio server closes > the idle connections. When we finally get a request from nbdcopy, we > fail to detect that the connection was closed. > > Detecting a closed connection is hard and racy. In the good case, we get > a BrokenPipe error. In the bad case, imageio closed the socket right > after we sent a request, and we get an invalid status line. When using > imageio proxy, we may get http error (e.g. 500) if the proxy connection > to imageio server on the host was closed. > > Even worse, when we find that the connection was closed, it is not safe > to reopen the connection, since qemu-nbd does not ensure yet that data > written to the previous connection will be flushed when we flush the new > connection. > > Fix the issue by keeping the connections alive. A pool keeper thread > sends a flush request on idle connection every ~30 seconds. This also > improves data integrity and efficiency, using idle time to flush written > data. > > Fixes https://bugzilla.redhat.com/2032324
Ideally imageio would not just time out after such a short time when a client has connections open. (Do we actually hold the TCP-level connection open during this time?) Is there a no-op ping-like request that we can send? If TCP-level connection is open, can we enable TCP keepalives? Rich. > output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ > 1 file changed, 71 insertions(+) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index 8d088c4e..172da358 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -13,50 +13,60 @@ > # GNU General Public License for more details. > # > # You should have received a copy of the GNU General Public License along > # with this program; if not, write to the Free Software Foundation, Inc., > # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. > > import json > import queue > import socket > import ssl > +import threading > import time > > from contextlib import contextmanager > from http.client import HTTPSConnection, HTTPConnection > from urllib.parse import urlparse > > import nbdkit > > # Using version 2 supporting the buffer protocol for better performance. > API_VERSION = 2 > > # Maximum number of connection to imageio server. Based on testing with > imageio > # client, this give best performance. > MAX_CONNECTIONS = 4 > > +# Maximum idle time allowed for imageio connections. > +IDLE_TIMEOUT = 30 > + > # Required parameters. > size = None > url = None > > # Optional parameters. > cafile = None > insecure = False > is_ovirt_host = False > > # List of options read from imageio server. > options = None > > # Pool of HTTP connections. > pool = None > > +# Set when plugin is cleaning up. > +done = threading.Event() > + > +# Set when periodic flush request fails. > +pool_error = None > + > > # Parse parameters. > def config(key, value): > global cafile, url, is_ovirt_host, insecure, size > > if key == "cafile": > cafile = value > elif key == "insecure": > insecure = value.lower() in ['true', '1'] > elif key == "is_ovirt_host": > @@ -84,25 +94,31 @@ def after_fork(): > options = get_options(http, url) > http.close() > > nbdkit.debug("imageio features: flush=%(can_flush)r " > "zero=%(can_zero)r unix_socket=%(unix_socket)r " > "max_readers=%(max_readers)r max_writers=%(max_writers)r" > % options) > > pool = create_http_pool(url, options) > > + t = threading.Thread(target=pool_keeper, name="poolkeeper") > + t.daemon = True > + t.start() > + > > # This function is not actually defined before nbdkit 1.28, but it > # doesn't particularly matter if we don't close the pool because > # clients should call flush(). > def cleanup(): > + nbdkit.debug("cleaning up") > + done.set() > close_http_pool(pool) > > > def thread_model(): > """ > Using parallel model to speed up transfer with multiple connections to > imageio server. > """ > return nbdkit.THREAD_MODEL_PARALLEL > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): > r = http.getresponse() > if r.status != 200: > request_failed(r, > "could not write zeroes offset %d size %d" % > (offset, count)) > > r.read() > > > def flush(h, flags): > + if pool_error: > + raise pool_error > + > # Wait until all inflight requests are completed, and send a flush > # request for all imageio connections. > locked = [] > > # Lock the pool by taking all connections out. > while len(locked) < pool.maxsize: > locked.append(pool.get()) > > try: > for item in locked: > @@ -348,26 +367,78 @@ def create_http_pool(url, options): > > pool = queue.Queue(count) > > for i in range(count): > http = create_http(url, unix_socket=unix_socket) > pool.put(PoolItem(http)) > > return pool > > > +def pool_keeper(): > + """ > + Thread flushing idle connections, keeping them alive. > + > + If a connection does not send any request for 60 seconds, imageio > + server closes the connection. Recovering from closed connection is > + hard and unsafe, so this thread ensure that connections never > + becomes idle by sending a flush request if the connection is idle > + for too much time. > + > + In normal conditions, all connections are busy most of the time, so > + the keeper will find no idle connections. If there short delays in > + nbdcopy, the keeper will find some idle connections, but will > + quickly return them back to the pool. In the pathological case when > + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send > + a flush request on all connections every ~30 seconds, until nbdcopy > + starts communicating again. > + """ > + global pool_error > + > + nbdkit.debug("pool keeper started") > + > + while not done.wait(IDLE_TIMEOUT / 2): > + idle = [] > + > + while True: > + try: > + idle.append(pool.get_nowait()) > + except queue.Empty: > + break > + > + if idle: > + now = time.monotonic() > + for item in idle: > + if item.last_used and now - item.last_used > IDLE_TIMEOUT: > + nbdkit.debug("Flushing idle connection") > + try: > + send_flush(item.http) > + item.last_used = now > + except Exception as e: > + # We will report this error on the next request. > + pool_error = e > + item.last_used = None > + > + pool.put(item) > + > + nbdkit.debug("pool keeper stopped") > + > + > @contextmanager > def http_context(pool): > """ > Context manager yielding an imageio http connection from the pool. Blocks > until a connection is available. > """ > + if pool_error: > + raise pool_error > + > item = pool.get() > try: > yield item.http > finally: > item.last_used = time.monotonic() > pool.put(item) > > > def close_http_pool(pool): > """ > -- > 2.33.1 -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-df lists disk usage of guests without needing to install any software inside the virtual machine. Supports Linux and Windows. http://people.redhat.com/~rjones/virt-df/ _______________________________________________ Libguestfs mailing list Libguestfs@redhat.com https://listman.redhat.com/mailman/listinfo/libguestfs