LGTM On Feb 28 13:44, Petr Pudlak wrote: > Since the daemon can decide to close a client connection after a > timeout, the client needs to be able to automatically reconnect. > > This patch introduces this functionality into the RPC client: > If an attempt to send data fails on 'Broken pipe', it's retried one more > time. > > Signed-off-by: Petr Pudlak <[email protected]> > --- > lib/rpc/client.py | 8 ++++---- > lib/rpc/transport.py | 32 ++++++++++++++++++++++++++++++++ > 2 files changed, 36 insertions(+), 4 deletions(-) > > diff --git a/lib/rpc/client.py b/lib/rpc/client.py > index 9e5f363..7cc16d4 100644 > --- a/lib/rpc/client.py > +++ b/lib/rpc/client.py > @@ -204,12 +204,12 @@ class AbstractClient(object): > > def _SendMethodCall(self, data): > # Send request and wait for response > - try: > + def send(try_no): > + if try_no: > + logging.debug("RPC peer disconnected, retrying") > self._InitTransport() > return self.transport.Call(data) > - except Exception: > - self._CloseTransport() > - raise > + return t.Transport.RetryOnBrokenPipe(send, lambda _: > self._CloseTransport()) > > def Close(self): > """Close the underlying connection. > diff --git a/lib/rpc/transport.py b/lib/rpc/transport.py > index b079c09..4b6ebdb 100644 > --- a/lib/rpc/transport.py > +++ b/lib/rpc/transport.py > @@ -27,6 +27,7 @@ A transport can send to and receive messages from some > endpoint. > > import collections > import errno > +import logging > import socket > import time > > @@ -177,6 +178,37 @@ class Transport: > self.Send(msg) > return self.Recv() > > + @staticmethod > + def RetryOnBrokenPipe(fn, on_error): > + """Calls a given function, retrying if it fails on the 'Broken pipe' IO > + exception. > + > + This allows to re-establish a broken connection and retry an IO > operation. > + > + The function receives one an integer argument stating the current retry > + number, 0 being the first call, 1 being the retry. > + > + If any exception occurs, on_error is invoked first with the exception > given > + as an argument. Then, if the exception is 'Broken pipe', the function > call > + is retried once more. > + > + """ > + retries = 2 > + for try_no in range(0, retries): > + try: > + return fn(try_no) > + except socket.error, ex: > + on_error(ex) > + # we retry on "Broken pipe", unless it's the last try > + if try_no == retries - 1: > + raise > + elif not (isinstance(ex.args, tuple) and (ex[0] == errno.EPIPE)): > + raise > + except Exception, ex: > + on_error(ex) > + raise > + assert False # we should never get here > + > def Close(self): > """Close the socket""" > if self.socket is not None: > -- > 1.9.0.279.gdc9e3eb >
-- Jose Antonio Lopes Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores Steuernummer: 48/725/00206 Umsatzsteueridentifikationsnummer: DE813741370
