This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Tarantool -- an efficient key/value data store".
The branch perl-iproto-async has been updated
via dcd96041dd75581e666390a55c769d29a556f579 (commit)
via faaa889632f4be5e7a079afa824af63c1c39a14c (commit)
from c2c9e82967db6acc998fe5b2314b14ef7f1e5213 (commit)
Summary of changes:
mod/silverbox/client/perl/lib/MR/IProto.pm | 191 +++++++++++---------
mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm | 15 ++-
.../client/perl/lib/MR/IProto/Cluster/Server.pm | 9 +-
.../client/perl/lib/MR/IProto/Connection/Async.pm | 106 +++++++----
.../client/perl/lib/MR/IProto/Connection/Sync.pm | 34 ++--
5 files changed, 210 insertions(+), 145 deletions(-)
commit dcd96041dd75581e666390a55c769d29a556f579
Author: Aleksey Mashanov <[email protected]>
Date: Tue Dec 7 16:06:58 2010 +0300
CPU usage optimization on a quater
diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index bb79a6e..c6db967 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -253,7 +253,7 @@ sub send {
else {
die "Method must be called in scalar context if you want to use sync"
unless defined wantarray;
my ($data, $error, $errno);
- $self->_send($message, sub {
+ $self->_send_now($message, sub {
($data, $error) = @_;
$errno = $!;
return;
@@ -297,7 +297,7 @@ sub send_bulk {
$cv->begin();
$self->_send($message, sub {
my ($data, $error) = @_;
- push @result, blessed($data) ? $data
+ push @result, ref $data ne 'HASH' ? $data
: { data => $data, error => $error };
$cv->end();
return;
@@ -478,55 +478,53 @@ L</retry_delay> is used if retry is allowed.
=cut
sub _send {
- my ($self, $message, $callback, $sync) = @_;
- die "Callback must be specified" unless $callback;
- die "Method must be called in void context" if defined wantarray;
- return $self->_send_now($message, $callback, $sync) if $sync;
- push @{$self->_queue}, [ $message, $callback ];
- $self->_try_to_send();
+ my ($self, $message, $callback) = @_;
+ if( $self->_in_progress < $self->max_parallel ) {
+ $self->_in_progress( $self->_in_progress + 1 );
+ eval { $self->_send_now($message, $callback); 1 }
+ or $self->_report_error($message, $callback, $@);
+ }
+ else {
+ push @{$self->_queue}, [ $message, $callback ];
+ }
return;
}
-sub _try_to_send {
+sub _finish_and_start {
my ($self) = @_;
- while( $self->_in_progress < $self->max_parallel && (my $task = shift @{
$self->_queue }) ) {
+ if( my $task = shift @{$self->_queue} ) {
eval { $self->_send_now(@$task); 1 }
or $self->_report_error(@$task, $@);
}
+ else {
+ $self->_in_progress( $self->_in_progress - 1 );
+ }
return;
}
sub _send_now {
my ($self, $message, $callback, $sync) = @_;
- $self->_in_progress( $self->_in_progress + 1 );
- my %args;
+ my $args;
# MR::IProto::Message OO-API
- if( blessed($message) ) {
+ if( ref $message ne 'HASH' ) {
my $response_class = $self->_reply_class->{$message->msg};
die sprintf "Cannot find response class for message code %d\n",
$message->msg unless $response_class;
- %args = (
+ $args = {
request => $message,
msg => $message->msg,
key => $message->key,
body => $message->data,
response_class => $response_class,
no_reply => $response_class->isa('MR::IProto::NoResponse'),
- retry => $message->retry ? $self->max_request_retries : 1,
- );
+ };
}
# Old-style compatible API
else {
die "unpack or no_reply must be specified" unless $message->{unpack}
|| $message->{no_reply};
- %args = (
- msg => $message->{msg},
- key => $message->{key},
- body => exists $message->{payload} ? $message->{payload}
- : ref $message->{data} ? pack delete $message->{pack} || 'L*',
@{$message->{data}}
- : $message->{data},
- no_reply => $message->{no_reply},
- unpack => $message->{unpack},
- retry => $message->{retry} ? $self->max_request_retries : 1,
- );
+ $args = $message;
+ $args->{body} = exists $args->{payload} ? delete $args->{payload}
+ : ref $message->{data} ? pack delete $message->{pack} || 'L*', @{
delete $message->{data} }
+ : delete $message->{data};
}
my $try = 1;
@@ -534,19 +532,19 @@ sub _send_now {
my $handler;
$handler = sub {
$self->_server_callback(
- [\$handler, \%args, $callback, $sync, \$try],
+ [\$handler, $args, $callback, $sync, \$try],
[...@_],
);
return;
};
- $self->_send_try($sync, \%args, $handler, $try);
+ $self->_send_try($sync, $args, $handler, $try);
return;
}
sub _send_try {
- my ($self, $sync, $args, $handler, $try, $by_resp) = @_;
+ my ($self, $sync, $args, $handler, $try) = @_;
my $xsync = $sync ? 'sync' : 'async';
- $self->_debug(2, sprintf "send msg=%d try %d of %d total", $args->{msg},
$try, $by_resp ? $self->max_request_retries : $args->{retry} );
+ $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg},
$try, $self->max_request_retries ) if $self->debug >= 2;
my $server = $self->cluster->server( $args->{key} );
$server->$xsync->send($args->{msg}, $args->{body}, $handler,
$args->{no_reply});
return;
@@ -579,20 +577,21 @@ sub _server_callback {
my ($resp_msg, $data, $error) = @$resp_args;
eval {
if ($error) {
- $self->_debug(2, "send: failed");
- if( $$try++ < $args->{retry} ) {
+ $self->_debug("send: failed") if $self->debug >= 2;
+ my $retry = defined $args->{request} ? $args->{request}->retry() :
$args->{retry};
+ if( $retry && $$try++ < $self->max_request_retries ) {
$self->_send_retry($sync, $args, $$handler, $$try);
}
else {
undef $$handler;
- $self->_report_error($args->{request}, $callback, $error);
+ $self->_report_error($args->{request}, $callback, $error,
$sync);
}
}
else {
my $ok = eval {
die "Request and reply message code is different: $resp_msg !=
$args->{msg}\n"
unless $args->{no_reply} || $resp_msg == $args->{msg};
- if( $args->{request} ) {
+ if( defined $args->{request} ) {
$data = $args->{response_class}->new( data => $data,
request => $args->{request} );
}
else {
@@ -601,31 +600,30 @@ sub _server_callback {
1;
};
if($ok) {
- if( $args->{request} && $data->retry && $$try++ <
$self->max_request_retries ) {
- $self->_send_retry($sync, $args, $$handler, $$try, 1);
+ if( defined $args->{request} && $data->retry && $$try++ <
$self->max_request_retries ) {
+ $self->_send_retry($sync, $args, $$handler, $$try);
}
else {
undef $$handler;
- $self->_in_progress( $self->_in_progress - 1 );
- $self->_try_to_send();
+ $self->_finish_and_start() unless $sync;
$callback->($data);
}
}
else {
undef $$handler;
- $self->_report_error($args->{request}, $callback, $@);
+ $self->_report_error($args->{request}, $callback, $@, $sync);
}
}
1;
} or do {
undef $$handler;
- $self->_debug(0, "unhandled fatal error: $@");
+ $self->_debug("unhandled fatal error: $@");
};
return;
}
sub _report_error {
- my ($self, $request, $callback, $error) = @_;
+ my ($self, $request, $callback, $error, $sync) = @_;
my $errobj = $request
? MR::IProto::Error->new(
request => $request,
@@ -633,16 +631,13 @@ sub _report_error {
errno => 0+$!,
)
: undef;
- $self->_in_progress( $self->_in_progress - 1 );
- $self->_try_to_send();
+ $self->_finish_and_start() unless $sync;
$callback->($errobj, $error);
return;
}
sub _debug {
- my ($self, $level, $msg) = @_;
- return if $self->debug < $level;
- $self->debug_cb->($msg);
+ $_[0]->debug_cb->($_[1]);
return;
}
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
index 6f015a1..d5f46b8 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster.pm
@@ -107,6 +107,12 @@ has servers => (
=cut
+has _one => (
+ is => 'ro',
+ isa => 'Maybe[MR::IProto::Cluster::Server]',
+ lazy_build => 1,
+);
+
has _ketama => (
is => 'ro',
isa => 'ArrayRef[ArrayRef]',
@@ -137,8 +143,8 @@ Get server from balancing using C<$key>.
sub server {
my ($self, $key) = @_;
- my $n = @{$self->servers};
- return $self->servers->[0] if $n == 1;
+ my $one = $self->_one;
+ return $one if defined $one;
my $method = $self->balance == RR ? '_balance_rr'
: $self->balance == KETAMA ? '_balance_ketama'
: '_balance_hash';
@@ -174,6 +180,11 @@ sub timeout {
=cut
+sub _build__one {
+ my ($self) = @_;
+ return @{$self->servers} == 1 ? $self->servers->[0] : undef;
+}
+
sub _build__ketama {
my $self = shift;
my @ketama;
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index 0f7dfec..0061a3f 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -227,7 +227,6 @@ This method is called when message is started to send.
=cut
sub _send_started {
- my ($self, $sync, $msg, $data) = @_;
return;
}
@@ -238,20 +237,17 @@ This method is called when message is received.
=cut
sub _recv_finished {
- my ($self, $sync, $msg, $data, $error) = @_;
return;
}
sub _debug {
- my ($self, $level, $msg) = @_;
- return if $self->debug < $level;
+ my ($self, $msg) = @_;
$self->debug_cb->( sprintf "%s:%d: %s", $self->host, $self->port, $msg );
return;
}
sub _debug_dump {
- my ($self, $level, $msg, $datum) = @_;
- return if $self->debug < $level;
+ my ($self, $msg, $datum) = @_;
unless($self->dump_no_ints) {
$msg .= join(' ', unpack('L*', $datum));
$msg .= ' > ';
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
index 9f12c9c..49c6232 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Async.pm
@@ -52,6 +52,12 @@ has _no_reply => (
lazy_build => 1,
);
+has _on_drain => (
+ is => 'ro',
+ isa => 'CodeRef',
+ lazy_build => 1,
+);
+
=head1 PUBLIC METHODS
=over
@@ -65,8 +71,13 @@ For list of arguments see L</_send>.
sub send {
my $self = shift;
- push @{$self->_queue}, [ @_ ];
- $self->_try_to_send();
+ if( $self->_in_progress < $self->max_parallel ) {
+ $self->_in_progress( $self->_in_progress + 1 );
+ $self->_send(@_);
+ }
+ else {
+ push @{$self->_queue}, [...@_];
+ }
return;
}
@@ -98,32 +109,40 @@ sub _send {
my ($self, $msg, $payload, $callback, $no_reply) = @_;
my $sync = $self->_choose_sync();
my $header = $self->_pack_header($msg, length $payload, $sync);
+ my $server = $self->server;
$self->_callbacks->{$sync} = $callback;
- $self->_send_started($sync, $msg, $payload);
- $self->_debug_dump(5, 'send header: ', $header);
- $self->_debug_dump(5, 'send payload: ', $payload);
- $self->_handle->push_write( $header . $payload );
+ $server->_send_started($sync, $msg, $payload);
+ if( $server->debug >= 5 ) {
+ $server->_debug_dump('send header: ', $header);
+ $server->_debug_dump('send payload: ', $payload);
+ }
+ my $handle = $self->_handle;
+ $handle->push_write( $header . $payload );
if( $no_reply ) {
push @{$self->_no_reply}, $sync;
+ $handle->on_drain( $self->_on_drain ) unless defined
$handle->{on_drain};
}
else {
- $self->_handle->push_read( chunk => 12, $self->_read_reply );
+ $handle->push_read( chunk => 12, $self->_read_reply );
}
return;
}
sub _build__read_reply {
my ($self) = @_;
+ my $server = $self->server;
weaken($self);
+ weaken($server);
return sub {
my ($handle, $data) = @_;
- $self->_debug_dump(6, 'recv header: ', $data);
+ my $dump_resp = $server->debug >= 6;
+ $server->_debug_dump('recv header: ', $data) if $dump_resp;
my ($msg, $payload_length, $sync) = $self->_unpack_header($data);
$handle->unshift_read( chunk => $payload_length, sub {
my ($handle, $data) = @_;
- $self->_debug_dump(6, 'recv payload: ', $data);
- $self->_recv_finished($sync, $msg, $data);
- $self->_try_to_send();
+ $server->_debug_dump('recv payload: ', $data) if $dump_resp;
+ $server->_recv_finished($sync, $msg, $data);
+ $self->_finish_and_start();
delete($self->_callbacks->{$sync})->($msg, $data);
return;
});
@@ -134,15 +153,29 @@ sub _build__read_reply {
sub _try_to_send {
my ($self) = @_;
while( $self->_in_progress < $self->max_parallel && (my $task = shift @{
$self->_queue }) ) {
+ $self->_in_progress( $self->_in_progress + 1 );
$self->_send(@$task);
}
return;
}
+sub _finish_and_start {
+ my ($self) = @_;
+ if( my $task = shift @{$self->_queue} ) {
+ $self->_send(@$task);
+ }
+ else {
+ $self->_in_progress( $self->_in_progress - 1 );
+ }
+ return;
+}
+
sub _build__handle {
my ($self) = @_;
- $self->_debug(4, "connecting");
+ my $server = $self->server;
+ $server->_debug("connecting") if $server->debug >= 4;
weaken($self);
+ weaken($server);
return AnyEvent::Handle->new(
connect => [ $self->host, $self->port ],
no_delay => $self->tcp_nodelay,
@@ -153,22 +186,23 @@ sub _build__handle {
},
on_connect => sub {
my ($handle) = @_;
- $self->_debug(1, "connected");
+ $server->_debug("connected") if $server->debug >= 1;
return;
},
on_error => sub {
my ($handle, $fatal, $message) = @_;
- $self->_debug(0, ($fatal ? 'fatal ' : '') . 'error: ' . $message);
+ $server->_debug(($fatal ? 'fatal ' : '') . 'error: ' . $message);
my @callbacks;
foreach my $sync ( keys %{$self->_callbacks} ) {
- $self->_recv_finished($sync, undef, undef, $message);
+ $server->_recv_finished($sync, undef, undef, $message);
+ $self->_in_progress( $self->_in_progress - 1 );
push @callbacks, $self->_callbacks->{$sync};
}
- $self->server->active(0);
+ $server->active(0);
$self->_clear_handle();
$self->_clear_callbacks();
$self->_clear_no_reply();
- $self->_debug(1, 'closing socket');
+ $server->_debug('closing socket') if $server->debug >= 1;
$handle->destroy();
$self->_try_to_send();
$_->(undef, undef, $message) foreach @callbacks;
@@ -180,17 +214,28 @@ sub _build__handle {
$handle->_error( Errno::ETIMEDOUT ) if keys %{$self->_callbacks};
return;
},
- on_drain => sub {
- my ($handle) = @_;
+ );
+}
+
+sub _build__on_drain {
+ my ($self) = @_;
+ my $server = $self->server;
+ weaken($self);
+ weaken($server);
+ return sub {
+ my ($handle) = @_;
+ if( $self->_has_no_reply() ) {
foreach my $sync ( @{$self->_no_reply} ) {
- $self->_recv_finished($sync, undef, undef);
+ $server->_recv_finished($sync, undef, undef);
+ $self->_in_progress( $self->_in_progress - 1 );
delete($self->_callbacks->{$sync})->(undef, undef);
}
$self->_clear_no_reply();
$self->_try_to_send();
- return;
- },
- );
+ $handle->on_drain(undef);
+ }
+ return;
+ };
}
sub _build__queue {
@@ -211,25 +256,14 @@ sub _build__no_reply {
around _choose_sync => sub {
my ($orig, $self) = @_;
my $sync;
+ my $callbacks = $self->_callbacks;
for( 1 .. 50 ) {
$sync = $self->$orig();
- return $sync unless exists $self->_callbacks->{$sync};
+ return $sync unless exists $callbacks->{$sync};
}
die "Can't choose sync value after 50 iterations";
};
-before _send_started => sub {
- my ($self) = @_;
- $self->_in_progress( $self->_in_progress + 1 );
- return;
-};
-
-after _recv_finished => sub {
- my ($self) = @_;
- $self->_in_progress( $self->_in_progress - 1 );
- return;
-};
-
=back
=head1 SEE ALSO
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
index 5402284..b00ef00 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Connection/Sync.pm
@@ -36,60 +36,65 @@ See L<MR::IProto::Connection/send> for more information.
sub send {
my ($self, $msg, $payload, $callback, $no_reply) = @_;
my ($sync, $resp_msg, $resp_payload);
+ my $server = $self->server;
my $ok = eval {
$sync = $self->_choose_sync();
+ $server->_send_started($sync, $msg, $payload);
+ my $socket = $self->_socket;
my $header = $self->_pack_header($msg, length $payload, $sync);
- $self->_send_started($sync, $msg, $payload);
- $self->_debug_dump(5, 'send header: ', $header);
- $self->_debug_dump(5, 'send payload: ', $payload);
+ if( $server->debug >= 5 ) {
+ $server->_debug_dump('send header: ', $header);
+ $server->_debug_dump('send payload: ', $payload);
+ }
my $write = $header . $payload;
while( length $write ) {
- my $written = syswrite($self->_socket, $write);
+ my $written = syswrite($socket, $write);
die $! unless defined $written;
substr $write, 0, $written, '';
}
unless( $no_reply ) {
+ my $dump_resp = $server->debug >= 6;
my $resp_header;
my $to_read = 12;
while( $to_read ) {
- my $read = sysread($self->_socket, my $buf, $to_read);
+ my $read = sysread($socket, my $buf, $to_read);
die $! unless defined $read;
die "EOF during read of header" if $read == 0;
$resp_header .= $buf;
$to_read -= $read;
}
- $self->_debug_dump(6, 'recv header: ', $resp_header);
+ $server->_debug_dump('recv header: ', $resp_header) if $dump_resp;
($resp_msg, my $resp_length, my $resp_sync) =
$self->_unpack_header($resp_header);
die "Request and reply sync is different: $resp_sync != $sync"
unless $resp_sync == $sync;
$to_read = $resp_length;
while( $to_read ) {
- my $read = sysread($self->_socket, my $buf, $to_read);
+ my $read = sysread($socket, my $buf, $to_read);
die $! unless defined $read;
die "EOF during read of payload" if $read == 0;
$resp_payload .= $buf;
$to_read -= $read;
}
- $self->_debug_dump(6, 'recv payload: ', $resp_payload);
+ $server->_debug_dump('recv payload: ', $resp_payload) if
$dump_resp;
}
1;
};
if($ok) {
- $self->_recv_finished($sync, $resp_msg, $resp_payload);
+ $server->_recv_finished($sync, $resp_msg, $resp_payload);
$callback->($resp_msg, $resp_payload);
}
else {
my $error = $@ =~ /^(.*?) at \S+ line \d+/s ? $1 : $@;
- $self->_debug(0, "error: $error");
+ $server->_debug("error: $error");
$! = Errno::ETIMEDOUT if $! == Errno::EINPROGRESS; # Hack over
IO::Socket behaviour
if($self->_has_socket()) {
close($self->_socket);
$self->_clear_socket();
}
- $self->server->active(0);
- $self->_recv_finished($sync, undef, undef, $error);
+ $server->active(0);
+ $server->_recv_finished($sync, undef, undef, $error);
$callback->(undef, undef, $error);
}
return;
@@ -113,7 +118,8 @@ sub set_timeout {
sub _build__socket {
my ($self) = @_;
- $self->_debug(4, "connecting");
+ my $server = $self->server;
+ $server->_debug("connecting") if $server->debug >= 4;
my $socket = IO::Socket::INET->new(
PeerHost => $self->host,
PeerPort => $self->port,
@@ -123,7 +129,7 @@ sub _build__socket {
$socket->sockopt(SO_KEEPALIVE, 1) if $self->tcp_keepalive;
$socket->setsockopt((getprotobyname('tcp'))[2], TCP_NODELAY, 1) if
$self->tcp_nodelay;
$self->_set_timeout($socket, $self->timeout) if $self->timeout;
- $self->_debug(1, "connected");
+ $server->_debug("connected") if $server->debug >= 4;
return $socket;
}
commit faaa889632f4be5e7a079afa824af63c1c39a14c
Author: Aleksey Mashanov <[email protected]>
Date: Fri Dec 3 13:43:41 2010 +0300
Less usage of closoures to prevent large memory usage
diff --git a/mod/silverbox/client/perl/lib/MR/IProto.pm
b/mod/silverbox/client/perl/lib/MR/IProto.pm
index b2228d8..bb79a6e 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto.pm
@@ -406,6 +406,7 @@ sub _build_debug_cb {
my $prefix = $self->prefix;
return sub {
my ($msg) = @_;
+ chomp $msg;
warn sprintf "%s: %s\n", $prefix, $msg;
return;
};
@@ -498,107 +499,128 @@ sub _try_to_send {
sub _send_now {
my ($self, $message, $callback, $sync) = @_;
$self->_in_progress( $self->_in_progress + 1 );
- my ($req_msg, $key, $body, $unpack, $retry, $response_class, $no_reply);
+ my %args;
# MR::IProto::Message OO-API
if( blessed($message) ) {
- $req_msg = $message->msg;
- $key = $message->key;
- $body = $message->data;
- $response_class = $self->_reply_class->{$req_msg};
- die sprintf "Cannot find response class for message code $req_msg\n"
unless $response_class;
- $no_reply = $response_class->isa('MR::IProto::NoResponse');
- $retry = $message->retry ? $self->max_request_retries : 1;
+ my $response_class = $self->_reply_class->{$message->msg};
+ die sprintf "Cannot find response class for message code %d\n",
$message->msg unless $response_class;
+ %args = (
+ request => $message,
+ msg => $message->msg,
+ key => $message->key,
+ body => $message->data,
+ response_class => $response_class,
+ no_reply => $response_class->isa('MR::IProto::NoResponse'),
+ retry => $message->retry ? $self->max_request_retries : 1,
+ );
}
# Old-style compatible API
else {
- $req_msg = $message->{msg};
- $key = $message->{key};
- $body = exists $message->{payload} ? $message->{payload}
- : ref $message->{data} ? pack delete $message->{pack} || 'L*',
@{$message->{data}}
- : $message->{data};
- $no_reply = $message->{no_reply};
- $unpack = $no_reply ? sub { 0 } : $message->{unpack} or die "unpack or
no_reply must be specified";
- $retry = $message->{retry} ? $self->max_request_retries : 1;
+ die "unpack or no_reply must be specified" unless $message->{unpack}
|| $message->{no_reply};
+ %args = (
+ msg => $message->{msg},
+ key => $message->{key},
+ body => exists $message->{payload} ? $message->{payload}
+ : ref $message->{data} ? pack delete $message->{pack} || 'L*',
@{$message->{data}}
+ : $message->{data},
+ no_reply => $message->{no_reply},
+ unpack => $message->{unpack},
+ retry => $message->{retry} ? $self->max_request_retries : 1,
+ );
}
my $try = 1;
weaken($self);
- my ($sub, $server);
- my $xsync = $sync ? 'sync' : 'async';
- my $do_try = sub {
- my ($by_resp) = @_;
- $self->_debug(2, sprintf "send msg=%d try %d of %d total", $req_msg,
$try, $by_resp ? $self->max_request_retries : $retry );
- $server = $self->cluster->server( $key );
- $server->$xsync->send($req_msg, $body, $sub, $no_reply);
+ my $handler;
+ $handler = sub {
+ $self->_server_callback(
+ [\$handler, \%args, $callback, $sync, \$try],
+ [...@_],
+ );
+ return;
};
- my $next_try = $sync
- ? sub {
- my ($by_resp) = @_;
- Time::HiRes::sleep($self->retry_delay);
- $do_try->($by_resp);
- return;
- }
- : sub {
- my ($by_resp) = @_;
- my $timer;
- $timer = AnyEvent->timer(
- after => $self->retry_delay,
- cb => sub {
- undef $timer;
- $do_try->($by_resp);
- return;
- },
- );
- };
- $sub = sub {
- my ($resp_msg, $data, $error) = @_;
+ $self->_send_try($sync, \%args, $handler, $try);
+ return;
+}
+
+sub _send_try {
+ my ($self, $sync, $args, $handler, $try, $by_resp) = @_;
+ my $xsync = $sync ? 'sync' : 'async';
+ $self->_debug(2, sprintf "send msg=%d try %d of %d total", $args->{msg},
$try, $by_resp ? $self->max_request_retries : $args->{retry} );
+ my $server = $self->cluster->server( $args->{key} );
+ $server->$xsync->send($args->{msg}, $args->{body}, $handler,
$args->{no_reply});
+ return;
+}
+
+sub _send_retry {
+ my ($self, @in) = @_;
+ my ($sync) = @in;
+ if( $sync ) {
+ Time::HiRes::sleep($self->retry_delay);
+ $self->_send_try(@in);
+ }
+ else {
+ my $timer;
+ $timer = AnyEvent->timer(
+ after => $self->retry_delay,
+ cb => sub {
+ undef $timer;
+ $self->_send_try(@in);
+ return;
+ },
+ );
+ }
+ return;
+}
+
+sub _server_callback {
+ my ($self, $req_args, $resp_args) = @_;
+ my ($handler, $args, $callback, $sync, $try) = @$req_args;
+ my ($resp_msg, $data, $error) = @$resp_args;
+ eval {
if ($error) {
$self->_debug(2, "send: failed");
- if( $try++ < $retry ) {
- $next_try->();
+ if( $$try++ < $args->{retry} ) {
+ $self->_send_retry($sync, $args, $$handler, $$try);
}
else {
- undef $sub;
- undef $do_try;
- undef $next_try;
- $self->_report_error($unpack ? undef : $message, $callback,
$error);
+ undef $$handler;
+ $self->_report_error($args->{request}, $callback, $error);
}
}
else {
my $ok = eval {
- die "Request and reply message code is different: $resp_msg !=
$req_msg\n"
- unless $no_reply || $resp_msg == $req_msg;
- if ($unpack) {
- $data = [ $unpack->($data) ];
+ die "Request and reply message code is different: $resp_msg !=
$args->{msg}\n"
+ unless $args->{no_reply} || $resp_msg == $args->{msg};
+ if( $args->{request} ) {
+ $data = $args->{response_class}->new( data => $data,
request => $args->{request} );
}
else {
- $data = $response_class->new( data => $data, request =>
$message );
+ $data = $args->{no_reply} ? [ 0 ] : [
$args->{unpack}->($data) ];
}
1;
};
if($ok) {
- if( !$unpack && $data->retry && $try++ <
$self->max_request_retries ) {
- $next_try->(1);
+ if( $args->{request} && $data->retry && $$try++ <
$self->max_request_retries ) {
+ $self->_send_retry($sync, $args, $$handler, $$try, 1);
}
else {
- undef $sub;
- undef $do_try;
- undef $next_try;
+ undef $$handler;
$self->_in_progress( $self->_in_progress - 1 );
$self->_try_to_send();
$callback->($data);
}
}
else {
- undef $sub;
- undef $do_try;
- undef $next_try;
- $self->_report_error($unpack ? undef : $message, $callback,
$@);
+ undef $$handler;
+ $self->_report_error($args->{request}, $callback, $@);
}
}
- return;
+ 1;
+ } or do {
+ undef $$handler;
+ $self->_debug(0, "unhandled fatal error: $@");
};
- $do_try->();
return;
}
diff --git a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
index d468d96..0f7dfec 100644
--- a/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
+++ b/mod/silverbox/client/perl/lib/MR/IProto/Cluster/Server.pm
@@ -214,6 +214,7 @@ sub _build_debug_cb {
my ($self) = @_;
return sub {
my ($msg) = @_;
+ chomp $msg;
warn "MR::IProto: $msg\n";
return;
};
--
Tarantool -- an efficient key/value data store
_______________________________________________
Mailing list: https://launchpad.net/~tarantool-developers
Post to : [email protected]
Unsubscribe : https://launchpad.net/~tarantool-developers
More help : https://help.launchpad.net/ListHelp