This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Tarantool -- an efficient key/value data store".

The branch perl-iproto-async has been updated
       via  dcd96041dd75581e666390a55c769d29a556f579 (commit)
       via  faaa889632f4be5e7a079afa824af63c1c39a14c (commit)
      from  c2c9e82967db6acc998fe5b2314b14ef7f1e5213 (commit)

Summary of changes:
 mod/silverbox/client/perl/lib/MR/IProto.pm         |  191 +++++++++++---------
 mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm |   15 ++-
 .../client/perl/lib/MR/IProto/Cluster/Server.pm    |    9 +-
 .../client/perl/lib/MR/IProto/Connection/Async.pm  |  106 +++++++----
 .../client/perl/lib/MR/IProto/Connection/Sync.pm   |   34 ++--
 5 files changed, 210 insertions(+), 145 deletions(-)

commit dcd96041dd75581e666390a55c769d29a556f579
Author: Aleksey Mashanov <[email protected]>
Date:   Tue Dec 7 16:06:58 2010 +0300

    CPU usage optimization on a quater

diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm 
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index bb79a6e..c6db967 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -253,7 +253,7 @@ sub send {
     else {
         die "Method must be called in scalar context if you want to use sync" 
unless defined wantarray;
         my ($data, $error, $errno);
-        $self->_send($message, sub {
+        $self->_send_now($message, sub {
             ($data, $error) = @_;
             $errno = $!;
             return;
@@ -297,7 +297,7 @@ sub send_bulk {
         $cv->begin();
         $self->_send($message, sub {
             my ($data, $error) = @_;
-            push @result, blessed($data) ? $data
+            push @result, ref $data ne 'HASH' ? $data
                 : { data => $data, error => $error };
             $cv->end();
             return;
@@ -478,55 +478,53 @@ L</retry_delay> is used if retry is allowed.
 =cut
 
 sub _send {
-    my ($self, $message, $callback, $sync) = @_;
-    die "Callback must be specified" unless $callback;
-    die "Method must be called in void context" if defined wantarray;
-    return $self->_send_now($message, $callback, $sync) if $sync;
-    push @{$self->_queue}, [ $message, $callback ];
-    $self->_try_to_send();
+    my ($self, $message, $callback) = @_;
+    if( $self->_in_progress < $self->max_parallel ) {
+        $self->_in_progress( $self->_in_progress + 1 );
+        eval { $self->_send_now($message, $callback); 1 }
+            or $self->_report_error($message, $callback, $@);
+    }
+    else {
+        push @{$self->_queue}, [ $message, $callback ];
+    }
     return;
 }
 
-sub _try_to_send {
+sub _finish_and_start {
     my ($self) = @_;
-    while( $self->_in_progress < $self->max_parallel && (my $task = shift @{ 
$self->_queue }) ) {
+    if( my $task = shift @{$self->_queue} ) {
         eval { $self->_send_now(@$task); 1 }
             or $self->_report_error(@$task, $@);
     }
+    else {
+        $self->_in_progress( $self->_in_progress - 1 );
+    }
     return;
 }
 
 sub _send_now {
     my ($self, $message, $callback, $sync) = @_;
-    $self->_in_progress( $self->_in_progress + 1 );
-    my %args;
+    my $args;
     # MR::IProto::Message OO-API
-    if( blessed($message) ) {
+    if( ref $message ne 'HASH' ) {
         my $response_class = $self->_reply_class->{$message->msg};
         die sprintf "Cannot find response class for message code %d\n", 
$message->msg unless $response_class;
-        %args = (
+        $args = {
             request        => $message,
             msg            => $message->msg,
             key            => $message->key,
             body           => $message->data,
             response_class => $response_class,
             no_reply       => $response_class->isa('MR::IProto::NoResponse'),
-            retry          => $message->retry ? $self->max_request_retries : 1,
-        );
+        };
     }
     # Old-style compatible API
     else {
         die "unpack or no_reply must be specified" unless $message->{unpack} 
|| $message->{no_reply};
-        %args = (
-            msg      => $message->{msg},
-            key      => $message->{key},
-            body     => exists $message->{payload} ? $message->{payload}
-                : ref $message->{data} ? pack delete $message->{pack} || 'L*', 
@{$message->{data}}
-                : $message->{data},
-            no_reply => $message->{no_reply},
-            unpack   => $message->{unpack},
-            retry    => $message->{retry} ? $self->max_request_retries : 1,
-        );
+        $args = $message;
+        $args->{body} = exists $args->{payload} ? delete $args->{payload}
+            : ref $message->{data} ? pack delete $message->{pack} || 'L*', @{ 
delete $message->{data} }
+            : delete $message->{data};
     }
 
     my $try = 1;
@@ -534,19 +532,19 @@ sub _send_now {
     my $handler;
     $handler = sub {
         $self->_server_callback(
-            [\$handler, \%args, $callback, $sync, \$try],
+            [\$handler, $args, $callback, $sync, \$try],
             [...@_],
         );
         return;
     };
-    $self->_send_try($sync, \%args, $handler, $try);
+    $self->_send_try($sync, $args, $handler, $try);
     return;
 }
 
 sub _send_try {
-    my ($self, $sync, $args, $handler, $try, $by_resp) = @_;
+    my ($self, $sync, $args, $handler, $try) = @_;
     my $xsync = $sync ? 'sync' : 'async';
-    $self->_debug(2, sprintf "send msg=%d try %d of %d total", $args->{msg}, 
$try, $by_resp ? $self->max_request_retries : $args->{retry} );
+    $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg}, 
$try, $self->max_request_retries ) if $self->debug >= 2;
     my $server = $self->cluster->server( $args->{key} );
     $server->$xsync->send($args->{msg}, $args->{body}, $handler, 
$args->{no_reply});
     return;
@@ -579,20 +577,21 @@ sub _server_callback {
     my ($resp_msg, $data, $error) = @$resp_args;
     eval {
         if ($error) {
-            $self->_debug(2, "send: failed");
-            if( $$try++ < $args->{retry} ) {
+            $self->_debug("send: failed") if $self->debug >= 2;
+            my $retry = defined $args->{request} ? $args->{request}->retry() : 
$args->{retry};
+            if( $retry && $$try++ < $self->max_request_retries ) {
                 $self->_send_retry($sync, $args, $$handler, $$try);
             }
             else {
                 undef $$handler;
-                $self->_report_error($args->{request}, $callback, $error);
+                $self->_report_error($args->{request}, $callback, $error, 
$sync);
             }
         }
         else {
             my $ok = eval {
                 die "Request and reply message code is different: $resp_msg != 
$args->{msg}\n"
                     unless $args->{no_reply} || $resp_msg == $args->{msg};
-                if( $args->{request} ) {
+                if( defined $args->{request} ) {
                     $data = $args->{response_class}->new( data => $data, 
request => $args->{request} );
                 }
                 else {
@@ -601,31 +600,30 @@ sub _server_callback {
                 1;
             };
             if($ok) {
-                if( $args->{request} && $data->retry && $$try++ < 
$self->max_request_retries ) {
-                    $self->_send_retry($sync, $args, $$handler, $$try, 1);
+                if( defined $args->{request} && $data->retry && $$try++ < 
$self->max_request_retries ) {
+                    $self->_send_retry($sync, $args, $$handler, $$try);
                 }
                 else {
                     undef $$handler;
-                    $self->_in_progress( $self->_in_progress - 1 );
-                    $self->_try_to_send();
+                    $self->_finish_and_start() unless $sync;
                     $callback->($data);
                 }
             }
             else {
                 undef $$handler;
-                $self->_report_error($args->{request}, $callback, $@);
+                $self->_report_error($args->{request}, $callback, $@, $sync);
             }
         }
         1;
     } or do {
         undef $$handler;
-        $self->_debug(0, "unhandled fatal error: $@");
+        $self->_debug("unhandled fatal error: $@");
     };
     return;
 }
 
 sub _report_error {
-    my ($self, $request, $callback, $error) = @_;
+    my ($self, $request, $callback, $error, $sync) = @_;
     my $errobj = $request
         ? MR::IProto::Error->new(
             request => $request,
@@ -633,16 +631,13 @@ sub _report_error {
             errno   => 0+$!,
         )
         : undef;
-    $self->_in_progress( $self->_in_progress - 1 );
-    $self->_try_to_send();
+    $self->_finish_and_start() unless $sync;
     $callback->($errobj, $error);
     return;
 }
 
 sub _debug {
-    my ($self, $level, $msg) = @_;
-    return if $self->debug < $level;
-    $self->debug_cb->($msg);
+    $_[0]->debug_cb->($_[1]);
     return;
 }
 
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
index 6f015a1..d5f46b8 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
@@ -107,6 +107,12 @@ has servers => (
 
 =cut
 
+has _one => (
+    is  => 'ro',
+    isa => 'Maybe[MR::IProto::Cluster::Server]',
+    lazy_build => 1,
+);
+
 has _ketama => (
     is  => 'ro',
     isa => 'ArrayRef[ArrayRef]',
@@ -137,8 +143,8 @@ Get server from balancing using C<$key>.
 
 sub server {
     my ($self, $key) = @_;
-    my $n = @{$self->servers};
-    return $self->servers->[0] if $n == 1;
+    my $one = $self->_one;
+    return $one if defined $one;
     my $method = $self->balance == RR ? '_balance_rr'
         : $self->balance == KETAMA ? '_balance_ketama'
         : '_balance_hash';
@@ -174,6 +180,11 @@ sub timeout {
 
 =cut
 
+sub _build__one {
+    my ($self) = @_;
+    return @{$self->servers} == 1 ? $self->servers->[0] : undef;
+}
+
 sub _build__ketama {
     my $self = shift;
     my @ketama;
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index 0f7dfec..0061a3f 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -227,7 +227,6 @@ This method is called when message is started to send.
 =cut
 
 sub _send_started {
-    my ($self, $sync, $msg, $data) = @_;
     return;
 }
 
@@ -238,20 +237,17 @@ This method is called when message is received.
 =cut
 
 sub _recv_finished {
-    my ($self, $sync, $msg, $data, $error) = @_;
     return;
 }
 
 sub _debug {
-    my ($self, $level, $msg) = @_;
-    return if $self->debug < $level;
+    my ($self, $msg) = @_;
     $self->debug_cb->( sprintf "%s:%d: %s", $self->host, $self->port, $msg );
     return;
 }
 
 sub _debug_dump {
-    my ($self, $level, $msg, $datum) = @_;
-    return if $self->debug < $level;
+    my ($self, $msg, $datum) = @_;
     unless($self->dump_no_ints) {
         $msg .= join(' ', unpack('L*', $datum));
         $msg .= ' > ';
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
index 9f12c9c..49c6232 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
@@ -52,6 +52,12 @@ has _no_reply => (
     lazy_build => 1,
 );
 
+has _on_drain => (
+    is  => 'ro',
+    isa => 'CodeRef',
+    lazy_build => 1,
+);
+
 =head1 PUBLIC METHODS
 
 =over
@@ -65,8 +71,13 @@ For list of arguments see L</_send>.
 
 sub send {
     my $self = shift;
-    push @{$self->_queue}, [ @_ ];
-    $self->_try_to_send();
+    if( $self->_in_progress < $self->max_parallel ) {
+        $self->_in_progress( $self->_in_progress + 1 );
+        $self->_send(@_);
+    }
+    else {
+        push @{$self->_queue}, [...@_];
+    }
     return;
 }
 
@@ -98,32 +109,40 @@ sub _send {
     my ($self, $msg, $payload, $callback, $no_reply) = @_;
     my $sync = $self->_choose_sync();
     my $header = $self->_pack_header($msg, length $payload, $sync);
+    my $server = $self->server;
     $self->_callbacks->{$sync} = $callback;
-    $self->_send_started($sync, $msg, $payload);
-    $self->_debug_dump(5, 'send header: ', $header);
-    $self->_debug_dump(5, 'send payload: ', $payload);
-    $self->_handle->push_write( $header . $payload );
+    $server->_send_started($sync, $msg, $payload);
+    if( $server->debug >= 5 ) {
+        $server->_debug_dump('send header: ', $header);
+        $server->_debug_dump('send payload: ', $payload);
+    }
+    my $handle = $self->_handle;
+    $handle->push_write( $header . $payload );
     if( $no_reply ) {
         push @{$self->_no_reply}, $sync;
+        $handle->on_drain( $self->_on_drain ) unless defined 
$handle->{on_drain};
     }
     else {
-        $self->_handle->push_read( chunk => 12, $self->_read_reply );
+        $handle->push_read( chunk => 12, $self->_read_reply );
     }
     return;
 }
 
 sub _build__read_reply {
     my ($self) = @_;
+    my $server = $self->server;
     weaken($self);
+    weaken($server);
     return sub {
         my ($handle, $data) = @_;
-        $self->_debug_dump(6, 'recv header: ', $data);
+        my $dump_resp = $server->debug >= 6;
+        $server->_debug_dump('recv header: ', $data) if $dump_resp;
         my ($msg, $payload_length, $sync) = $self->_unpack_header($data);
         $handle->unshift_read( chunk => $payload_length, sub {
             my ($handle, $data) = @_;
-            $self->_debug_dump(6, 'recv payload: ', $data);
-            $self->_recv_finished($sync, $msg, $data);
-            $self->_try_to_send();
+            $server->_debug_dump('recv payload: ', $data) if $dump_resp;
+            $server->_recv_finished($sync, $msg, $data);
+            $self->_finish_and_start();
             delete($self->_callbacks->{$sync})->($msg, $data);
             return;
         });
@@ -134,15 +153,29 @@ sub _build__read_reply {
 sub _try_to_send {
     my ($self) = @_;
     while( $self->_in_progress < $self->max_parallel && (my $task = shift @{ 
$self->_queue }) ) {
+        $self->_in_progress( $self->_in_progress + 1 );
         $self->_send(@$task);
     }
     return;
 }
 
+sub _finish_and_start {
+    my ($self) = @_;
+    if( my $task = shift @{$self->_queue} ) {
+        $self->_send(@$task);
+    }
+    else {
+        $self->_in_progress( $self->_in_progress - 1 );
+    }
+    return;
+}
+
 sub _build__handle {
     my ($self) = @_;
-    $self->_debug(4, "connecting");
+    my $server = $self->server;
+    $server->_debug("connecting") if $server->debug >= 4;
     weaken($self);
+    weaken($server);
     return AnyEvent::Handle->new(
         connect    => [ $self->host, $self->port ],
         no_delay   => $self->tcp_nodelay,
@@ -153,22 +186,23 @@ sub _build__handle {
         },
         on_connect => sub {
             my ($handle) = @_;
-            $self->_debug(1, "connected");
+            $server->_debug("connected") if $server->debug >= 1;
             return;
         },
         on_error   => sub {
             my ($handle, $fatal, $message) = @_;
-            $self->_debug(0, ($fatal ? 'fatal ' : '') . 'error: ' . $message);
+            $server->_debug(($fatal ? 'fatal ' : '') . 'error: ' . $message);
             my @callbacks;
             foreach my $sync ( keys %{$self->_callbacks} ) {
-                $self->_recv_finished($sync, undef, undef, $message);
+                $server->_recv_finished($sync, undef, undef, $message);
+                $self->_in_progress( $self->_in_progress - 1 );
                 push @callbacks, $self->_callbacks->{$sync};
             }
-            $self->server->active(0);
+            $server->active(0);
             $self->_clear_handle();
             $self->_clear_callbacks();
             $self->_clear_no_reply();
-            $self->_debug(1, 'closing socket');
+            $server->_debug('closing socket') if $server->debug >= 1;
             $handle->destroy();
             $self->_try_to_send();
             $_->(undef, undef, $message) foreach @callbacks;
@@ -180,17 +214,28 @@ sub _build__handle {
             $handle->_error( Errno::ETIMEDOUT ) if keys %{$self->_callbacks};
             return;
         },
-        on_drain => sub {
-            my ($handle) = @_;
+    );
+}
+
+sub _build__on_drain {
+    my ($self) = @_;
+    my $server = $self->server;
+    weaken($self);
+    weaken($server);
+    return sub {
+        my ($handle) = @_;
+        if( $self->_has_no_reply() ) {
             foreach my $sync ( @{$self->_no_reply} ) {
-                $self->_recv_finished($sync, undef, undef);
+                $server->_recv_finished($sync, undef, undef);
+                $self->_in_progress( $self->_in_progress - 1 );
                 delete($self->_callbacks->{$sync})->(undef, undef);
             }
             $self->_clear_no_reply();
             $self->_try_to_send();
-            return;
-        },
-    );
+            $handle->on_drain(undef);
+        }
+        return;
+    };
 }
 
 sub _build__queue {
@@ -211,25 +256,14 @@ sub _build__no_reply {
 around _choose_sync => sub {
     my ($orig, $self) = @_;
     my $sync;
+    my $callbacks = $self->_callbacks;
     for( 1 .. 50 ) {
         $sync = $self->$orig();
-        return $sync unless exists $self->_callbacks->{$sync};
+        return $sync unless exists $callbacks->{$sync};
     }
     die "Can't choose sync value after 50 iterations";
 };
 
-before _send_started => sub {
-    my ($self) = @_;
-    $self->_in_progress( $self->_in_progress + 1 );
-    return;
-};
-
-after _recv_finished => sub {
-    my ($self) = @_;
-    $self->_in_progress( $self->_in_progress - 1 );
-    return;
-};
-
 =back
 
 =head1 SEE ALSO
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
index 5402284..b00ef00 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
@@ -36,60 +36,65 @@ See L<MR::IProto::Connection/send> for more information.
 sub send {
     my ($self, $msg, $payload, $callback, $no_reply) = @_;
     my ($sync, $resp_msg, $resp_payload);
+    my $server = $self->server;
     my $ok = eval {
         $sync = $self->_choose_sync();
+        $server->_send_started($sync, $msg, $payload);
+        my $socket = $self->_socket;
         my $header = $self->_pack_header($msg, length $payload, $sync);
-        $self->_send_started($sync, $msg, $payload);
-        $self->_debug_dump(5, 'send header: ', $header);
-        $self->_debug_dump(5, 'send payload: ', $payload);
+        if( $server->debug >= 5 ) {
+            $server->_debug_dump('send header: ', $header);
+            $server->_debug_dump('send payload: ', $payload);
+        }
 
         my $write = $header . $payload;
         while( length $write ) {
-            my $written = syswrite($self->_socket, $write);
+            my $written = syswrite($socket, $write);
             die $! unless defined $written;
             substr $write, 0, $written, '';
         }
 
         unless( $no_reply ) {
+            my $dump_resp = $server->debug >= 6;
             my $resp_header;
             my $to_read = 12;
             while( $to_read ) {
-                my $read = sysread($self->_socket, my $buf, $to_read);
+                my $read = sysread($socket, my $buf, $to_read);
                 die $! unless defined $read;
                 die "EOF during read of header" if $read == 0;
                 $resp_header .= $buf;
                 $to_read -= $read;
             }
-            $self->_debug_dump(6, 'recv header: ', $resp_header);
+            $server->_debug_dump('recv header: ', $resp_header) if $dump_resp;
             ($resp_msg, my $resp_length, my $resp_sync) = 
$self->_unpack_header($resp_header);
             die "Request and reply sync is different: $resp_sync != $sync" 
unless $resp_sync == $sync;
 
             $to_read = $resp_length;
             while( $to_read ) {
-                my $read = sysread($self->_socket, my $buf, $to_read);
+                my $read = sysread($socket, my $buf, $to_read);
                 die $! unless defined $read;
                 die "EOF during read of payload" if $read == 0;
                 $resp_payload .= $buf;
                 $to_read -= $read;
             }
-            $self->_debug_dump(6, 'recv payload: ', $resp_payload);
+            $server->_debug_dump('recv payload: ', $resp_payload) if 
$dump_resp;
         }
         1;
     };
     if($ok) {
-        $self->_recv_finished($sync, $resp_msg, $resp_payload);
+        $server->_recv_finished($sync, $resp_msg, $resp_payload);
         $callback->($resp_msg, $resp_payload);
     }
     else {
         my $error = $@ =~ /^(.*?) at \S+ line \d+/s ? $1 : $@;
-        $self->_debug(0, "error: $error");
+        $server->_debug("error: $error");
         $! = Errno::ETIMEDOUT if $! == Errno::EINPROGRESS; # Hack over 
IO::Socket behaviour
         if($self->_has_socket()) {
             close($self->_socket);
             $self->_clear_socket();
         }
-        $self->server->active(0);
-        $self->_recv_finished($sync, undef, undef, $error);
+        $server->active(0);
+        $server->_recv_finished($sync, undef, undef, $error);
         $callback->(undef, undef, $error);
     }
     return;
@@ -113,7 +118,8 @@ sub set_timeout {
 
 sub _build__socket {
     my ($self) = @_;
-    $self->_debug(4, "connecting");
+    my $server = $self->server;
+    $server->_debug("connecting") if $server->debug >= 4;
     my $socket = IO::Socket::INET->new(
         PeerHost => $self->host,
         PeerPort => $self->port,
@@ -123,7 +129,7 @@ sub _build__socket {
     $socket->sockopt(SO_KEEPALIVE, 1) if $self->tcp_keepalive;
     $socket->setsockopt((getprotobyname('tcp'))[2], TCP_NODELAY, 1) if 
$self->tcp_nodelay;
     $self->_set_timeout($socket, $self->timeout) if $self->timeout;
-    $self->_debug(1, "connected");
+    $server->_debug("connected") if $server->debug >= 4;
     return $socket;
 }
 

commit faaa889632f4be5e7a079afa824af63c1c39a14c
Author: Aleksey Mashanov <[email protected]>
Date:   Fri Dec 3 13:43:41 2010 +0300

    Less usage of closoures to prevent large memory usage

diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm 
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index b2228d8..bb79a6e 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -406,6 +406,7 @@ sub _build_debug_cb {
     my $prefix = $self->prefix;
     return sub {
         my ($msg) = @_;
+        chomp $msg;
         warn sprintf "%s: %s\n", $prefix, $msg;
         return;
     };
@@ -498,107 +499,128 @@ sub _try_to_send {
 sub _send_now {
     my ($self, $message, $callback, $sync) = @_;
     $self->_in_progress( $self->_in_progress + 1 );
-    my ($req_msg, $key, $body, $unpack, $retry, $response_class, $no_reply);
+    my %args;
     # MR::IProto::Message OO-API
     if( blessed($message) ) {
-        $req_msg = $message->msg;
-        $key = $message->key;
-        $body = $message->data;
-        $response_class = $self->_reply_class->{$req_msg};
-        die sprintf "Cannot find response class for message code $req_msg\n" 
unless $response_class;
-        $no_reply = $response_class->isa('MR::IProto::NoResponse');
-        $retry = $message->retry ? $self->max_request_retries : 1;
+        my $response_class = $self->_reply_class->{$message->msg};
+        die sprintf "Cannot find response class for message code %d\n", 
$message->msg unless $response_class;
+        %args = (
+            request        => $message,
+            msg            => $message->msg,
+            key            => $message->key,
+            body           => $message->data,
+            response_class => $response_class,
+            no_reply       => $response_class->isa('MR::IProto::NoResponse'),
+            retry          => $message->retry ? $self->max_request_retries : 1,
+        );
     }
     # Old-style compatible API
     else {
-        $req_msg = $message->{msg};
-        $key = $message->{key};
-        $body = exists $message->{payload} ? $message->{payload}
-            : ref $message->{data} ? pack delete $message->{pack} || 'L*', 
@{$message->{data}}
-            : $message->{data};
-        $no_reply = $message->{no_reply};
-        $unpack = $no_reply ? sub { 0 } : $message->{unpack} or die "unpack or 
no_reply must be specified";
-        $retry = $message->{retry} ? $self->max_request_retries : 1;
+        die "unpack or no_reply must be specified" unless $message->{unpack} 
|| $message->{no_reply};
+        %args = (
+            msg      => $message->{msg},
+            key      => $message->{key},
+            body     => exists $message->{payload} ? $message->{payload}
+                : ref $message->{data} ? pack delete $message->{pack} || 'L*', 
@{$message->{data}}
+                : $message->{data},
+            no_reply => $message->{no_reply},
+            unpack   => $message->{unpack},
+            retry    => $message->{retry} ? $self->max_request_retries : 1,
+        );
     }
 
     my $try = 1;
     weaken($self);
-    my ($sub, $server);
-    my $xsync = $sync ? 'sync' : 'async';
-    my $do_try = sub {
-        my ($by_resp) = @_;
-        $self->_debug(2, sprintf "send msg=%d try %d of %d total", $req_msg, 
$try, $by_resp ? $self->max_request_retries : $retry );
-        $server = $self->cluster->server( $key );
-        $server->$xsync->send($req_msg, $body, $sub, $no_reply);
+    my $handler;
+    $handler = sub {
+        $self->_server_callback(
+            [\$handler, \%args, $callback, $sync, \$try],
+            [...@_],
+        );
+        return;
     };
-    my $next_try = $sync
-        ? sub {
-            my ($by_resp) = @_;
-            Time::HiRes::sleep($self->retry_delay);
-            $do_try->($by_resp);
-            return;
-        }
-        : sub {
-            my ($by_resp) = @_;
-            my $timer;
-            $timer = AnyEvent->timer(
-                after => $self->retry_delay,
-                cb    => sub {
-                    undef $timer;
-                    $do_try->($by_resp);
-                    return;
-                },
-            );
-        };
-    $sub = sub {
-        my ($resp_msg, $data, $error) = @_;
+    $self->_send_try($sync, \%args, $handler, $try);
+    return;
+}
+
+sub _send_try {
+    my ($self, $sync, $args, $handler, $try, $by_resp) = @_;
+    my $xsync = $sync ? 'sync' : 'async';
+    $self->_debug(2, sprintf "send msg=%d try %d of %d total", $args->{msg}, 
$try, $by_resp ? $self->max_request_retries : $args->{retry} );
+    my $server = $self->cluster->server( $args->{key} );
+    $server->$xsync->send($args->{msg}, $args->{body}, $handler, 
$args->{no_reply});
+    return;
+}
+
+sub _send_retry {
+    my ($self, @in) = @_;
+    my ($sync) = @in;
+    if( $sync ) {
+        Time::HiRes::sleep($self->retry_delay);
+        $self->_send_try(@in);
+    }
+    else {
+        my $timer;
+        $timer = AnyEvent->timer(
+            after => $self->retry_delay,
+            cb    => sub {
+                undef $timer;
+                $self->_send_try(@in);
+                return;
+            },
+        );
+    }
+    return;
+}
+
+sub _server_callback {
+    my ($self, $req_args, $resp_args) = @_;
+    my ($handler, $args, $callback, $sync, $try) = @$req_args;
+    my ($resp_msg, $data, $error) = @$resp_args;
+    eval {
         if ($error) {
             $self->_debug(2, "send: failed");
-            if( $try++ < $retry ) {
-                $next_try->();
+            if( $$try++ < $args->{retry} ) {
+                $self->_send_retry($sync, $args, $$handler, $$try);
             }
             else {
-                undef $sub;
-                undef $do_try;
-                undef $next_try;
-                $self->_report_error($unpack ? undef : $message, $callback, 
$error);
+                undef $$handler;
+                $self->_report_error($args->{request}, $callback, $error);
             }
         }
         else {
             my $ok = eval {
-                die "Request and reply message code is different: $resp_msg != 
$req_msg\n"
-                    unless $no_reply || $resp_msg == $req_msg;
-                if ($unpack) {
-                    $data = [ $unpack->($data) ];
+                die "Request and reply message code is different: $resp_msg != 
$args->{msg}\n"
+                    unless $args->{no_reply} || $resp_msg == $args->{msg};
+                if( $args->{request} ) {
+                    $data = $args->{response_class}->new( data => $data, 
request => $args->{request} );
                 }
                 else {
-                    $data = $response_class->new( data => $data, request => 
$message );
+                    $data = $args->{no_reply} ? [ 0 ] : [ 
$args->{unpack}->($data) ];
                 }
                 1;
             };
             if($ok) {
-                if( !$unpack && $data->retry && $try++ < 
$self->max_request_retries ) {
-                    $next_try->(1);
+                if( $args->{request} && $data->retry && $$try++ < 
$self->max_request_retries ) {
+                    $self->_send_retry($sync, $args, $$handler, $$try, 1);
                 }
                 else {
-                    undef $sub;
-                    undef $do_try;
-                    undef $next_try;
+                    undef $$handler;
                     $self->_in_progress( $self->_in_progress - 1 );
                     $self->_try_to_send();
                     $callback->($data);
                 }
             }
             else {
-                undef $sub;
-                undef $do_try;
-                undef $next_try;
-                $self->_report_error($unpack ? undef : $message, $callback, 
$@);
+                undef $$handler;
+                $self->_report_error($args->{request}, $callback, $@);
             }
         }
-        return;
+        1;
+    } or do {
+        undef $$handler;
+        $self->_debug(0, "unhandled fatal error: $@");
     };
-    $do_try->();
     return;
 }
 
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm 
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index d468d96..0f7dfec 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -214,6 +214,7 @@ sub _build_debug_cb {
     my ($self) = @_;
     return sub {
         my ($msg) = @_;
+        chomp $msg;
         warn "MR::IProto: $msg\n";
         return;
     };

-- 
Tarantool -- an efficient key/value data store

_______________________________________________
Mailing list: https://launchpad.net/~tarantool-developers
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~tarantool-developers
More help   : https://help.launchpad.net/ListHelp

Reply via email to