This allows us to avoid repeated open() and close() syscalls and speeds up the new xt/stress-sharedkv.t maintainer test by roughly 7%. --- MANIFEST | 1 + lib/PublicInbox/Lock.pm | 17 +++++++++++++ lib/PublicInbox/SharedKV.pm | 14 +++++------ xt/stress-sharedkv.t | 50 +++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 xt/stress-sharedkv.t
diff --git a/MANIFEST b/MANIFEST index c10775e4..a715214e 100644 --- a/MANIFEST +++ b/MANIFEST @@ -444,3 +444,4 @@ xt/perf-msgview.t xt/perf-nntpd.t xt/perf-threading.t xt/solver.t +xt/stress-sharedkv.t diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index bb213de4..c0c4c15c 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -41,6 +41,23 @@ sub lock_for_scope { PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self); } +sub lock_acquire_fast { + $_[0]->{lockfh} or return lock_acquire($_[0]); + flock($_[0]->{lockfh}, LOCK_EX) or croak "lock (fast) failed: $!"; +} + +sub lock_release_fast { + flock($_[0]->{lockfh} // return, LOCK_UN) or + croak "unlock (fast) $_[0]->{lock_path}: $!"; +} + +# caller must use return value +sub lock_for_scope_fast { + my ($self, @single_pid) = @_; + lock_acquire_fast($self) or return; # lock_path not set + PublicInbox::OnDestroy->new(@single_pid, \&lock_release_fast, $self); +} + sub new_tmp { my ($cls, $ident) = @_; my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1); diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm index 77df0fb4..b0588060 100644 --- a/lib/PublicInbox/SharedKV.pm +++ b/lib/PublicInbox/SharedKV.pm @@ -17,7 +17,7 @@ sub dbh { my ($self, $lock) = @_; $self->{dbh} //= do { my $f = $self->{filename}; - $lock //= $self->lock_for_scope; + $lock //= $self->lock_for_scope_fast; my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', { AutoCommit => 1, RaiseError => 1, @@ -58,13 +58,13 @@ sub new { sub index_values { my ($self) = @_; - my $lock = $self->lock_for_scope; + my $lock = $self->lock_for_scope_fast; $self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)'); } sub set_maybe { my ($self, $key, $val, $lock) = @_; - $lock //= $self->lock_for_scope; + $lock //= $self->lock_for_scope_fast; my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val); INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?) @@ -83,7 +83,7 @@ SELECT k,v FROM kv sub delete_by_val { my ($self, $val, $lock) = @_; - $lock //= $self->lock_for_scope; + $lock //= $self->lock_for_scope_fast; $self->{dbh}->prepare_cached(<<'')->execute($val) + 0; DELETE FROM kv WHERE v = ? @@ -91,7 +91,7 @@ DELETE FROM kv WHERE v = ? sub replace_values { my ($self, $oldval, $newval, $lock) = @_; - $lock //= $self->lock_for_scope; + $lock //= $self->lock_for_scope_fast; $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0; UPDATE kv SET v = ? WHERE v = ? @@ -122,7 +122,7 @@ SELECT v FROM kv WHERE k = ? sub xchg { my ($self, $key, $newval, $lock) = @_; - $lock //= $self->lock_for_scope; + $lock //= $self->lock_for_scope_fast; my $oldval = get($self, $key); if (defined $newval) { set($self, $key, $newval); @@ -146,7 +146,7 @@ SELECT COUNT(k) FROM kv sub dbh_release { my ($self, $lock) = @_; my $dbh = delete $self->{dbh} or return; - $lock //= $self->lock_for_scope; # may be needed for WAL + $lock //= $self->lock_for_scope_fast; # may be needed for WAL %{$dbh->{CachedKids}} = (); # cleanup prepare_cached $dbh->disconnect; } diff --git a/xt/stress-sharedkv.t b/xt/stress-sharedkv.t new file mode 100644 index 00000000..70de9ffc --- /dev/null +++ b/xt/stress-sharedkv.t @@ -0,0 +1,50 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use v5.10.1; +use Test::More; +use Benchmark qw(:all); +use PublicInbox::TestCommon; +require_ok 'PublicInbox::SharedKV'; +my ($tmpdir, $for_destroy) = tmpdir(); +local $ENV{TMPDIR} = $tmpdir; +my $skv = PublicInbox::SharedKV->new; +my $ipc = bless {}, 'StressSharedKV'; +$ipc->wq_workers_start('stress-sharedkv', $ENV{TEST_NPROC}//4); +my $nr = $ENV{TEST_STRESS_NR} // 100_000; +my $ios = []; +my $t = timeit(1, sub { + for my $i (1..$nr) { + $ipc->wq_do('test_set_maybe', $ios, $skv, $i); + $ipc->wq_do('test_set_maybe', $ios, $skv, $i); + } +}); +diag "$nr sets done ".timestr($t); + +for my $w ($ipc->wq_workers) { + $ipc->wq_do('test_skv_done', $ios); +} +diag "done requested"; + +$ipc->wq_close; +done_testing; + +package StressSharedKV; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use Digest::SHA qw(sha1); + +sub test_set_maybe { + my ($self, $skv, $i) = @_; + my $wcb = $self->{wcb} //= do { + $skv->dbh; + sub { $skv->set_maybe(sha1($_[0]), '') }; + }; + $wcb->($i + time); +} + +sub test_skv_done { + my ($self) = @_; + delete $self->{wcb}; +} -- unsubscribe: one-click, see List-Unsubscribe header archive: https://public-inbox.org/meta/