This allows us to avoid repeated open() and close() syscalls
and speeds up the new xt/stress-sharedkv.t maintainer test
by roughly 7%.
---
 MANIFEST                    |  1 +
 lib/PublicInbox/Lock.pm     | 17 +++++++++++++
 lib/PublicInbox/SharedKV.pm | 14 +++++------
 xt/stress-sharedkv.t        | 50 +++++++++++++++++++++++++++++++++++++
 4 files changed, 75 insertions(+), 7 deletions(-)
 create mode 100644 xt/stress-sharedkv.t

diff --git a/MANIFEST b/MANIFEST
index c10775e4..a715214e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -444,3 +444,4 @@ xt/perf-msgview.t
 xt/perf-nntpd.t
 xt/perf-threading.t
 xt/solver.t
+xt/stress-sharedkv.t
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index bb213de4..c0c4c15c 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -41,6 +41,23 @@ sub lock_for_scope {
        PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
 }
 
+sub lock_acquire_fast {
+       $_[0]->{lockfh} or return lock_acquire($_[0]);
+       flock($_[0]->{lockfh}, LOCK_EX) or croak "lock (fast) failed: $!";
+}
+
+sub lock_release_fast {
+       flock($_[0]->{lockfh} // return, LOCK_UN) or
+                       croak "unlock (fast) $_[0]->{lock_path}: $!";
+}
+
+# caller must use return value
+sub lock_for_scope_fast {
+       my ($self, @single_pid) = @_;
+       lock_acquire_fast($self) or return; # lock_path not set
+       PublicInbox::OnDestroy->new(@single_pid, \&lock_release_fast, $self);
+}
+
 sub new_tmp {
        my ($cls, $ident) = @_;
        my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1);
diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 77df0fb4..b0588060 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -17,7 +17,7 @@ sub dbh {
        my ($self, $lock) = @_;
        $self->{dbh} //= do {
                my $f = $self->{filename};
-               $lock //= $self->lock_for_scope;
+               $lock //= $self->lock_for_scope_fast;
                my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
                        AutoCommit => 1,
                        RaiseError => 1,
@@ -58,13 +58,13 @@ sub new {
 
 sub index_values {
        my ($self) = @_;
-       my $lock = $self->lock_for_scope;
+       my $lock = $self->lock_for_scope_fast;
        $self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
 }
 
 sub set_maybe {
        my ($self, $key, $val, $lock) = @_;
-       $lock //= $self->lock_for_scope;
+       $lock //= $self->lock_for_scope_fast;
        my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
 
@@ -83,7 +83,7 @@ SELECT k,v FROM kv
 
 sub delete_by_val {
        my ($self, $val, $lock) = @_;
-       $lock //= $self->lock_for_scope;
+       $lock //= $self->lock_for_scope_fast;
        $self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
 DELETE FROM kv WHERE v = ?
 
@@ -91,7 +91,7 @@ DELETE FROM kv WHERE v = ?
 
 sub replace_values {
        my ($self, $oldval, $newval, $lock) = @_;
-       $lock //= $self->lock_for_scope;
+       $lock //= $self->lock_for_scope_fast;
        $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
 UPDATE kv SET v = ? WHERE v = ?
 
@@ -122,7 +122,7 @@ SELECT v FROM kv WHERE k = ?
 
 sub xchg {
        my ($self, $key, $newval, $lock) = @_;
-       $lock //= $self->lock_for_scope;
+       $lock //= $self->lock_for_scope_fast;
        my $oldval = get($self, $key);
        if (defined $newval) {
                set($self, $key, $newval);
@@ -146,7 +146,7 @@ SELECT COUNT(k) FROM kv
 sub dbh_release {
        my ($self, $lock) = @_;
        my $dbh = delete $self->{dbh} or return;
-       $lock //= $self->lock_for_scope; # may be needed for WAL
+       $lock //= $self->lock_for_scope_fast; # may be needed for WAL
        %{$dbh->{CachedKids}} = (); # cleanup prepare_cached
        $dbh->disconnect;
 }
diff --git a/xt/stress-sharedkv.t b/xt/stress-sharedkv.t
new file mode 100644
index 00000000..70de9ffc
--- /dev/null
+++ b/xt/stress-sharedkv.t
@@ -0,0 +1,50 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use Benchmark qw(:all);
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::SharedKV';
+my ($tmpdir, $for_destroy) = tmpdir();
+local $ENV{TMPDIR} = $tmpdir;
+my $skv = PublicInbox::SharedKV->new;
+my $ipc = bless {}, 'StressSharedKV';
+$ipc->wq_workers_start('stress-sharedkv', $ENV{TEST_NPROC}//4);
+my $nr = $ENV{TEST_STRESS_NR} // 100_000;
+my $ios = [];
+my $t = timeit(1, sub {
+       for my $i (1..$nr) {
+               $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+               $ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+       }
+});
+diag "$nr sets done ".timestr($t);
+
+for my $w ($ipc->wq_workers) {
+       $ipc->wq_do('test_skv_done', $ios);
+}
+diag "done requested";
+
+$ipc->wq_close;
+done_testing;
+
+package StressSharedKV;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use Digest::SHA qw(sha1);
+
+sub test_set_maybe {
+       my ($self, $skv, $i) = @_;
+       my $wcb = $self->{wcb} //= do {
+               $skv->dbh;
+               sub { $skv->set_maybe(sha1($_[0]), '') };
+       };
+       $wcb->($i + time);
+}
+
+sub test_skv_done {
+       my ($self) = @_;
+       delete $self->{wcb};
+}
--
unsubscribe: one-click, see List-Unsubscribe header
archive: https://public-inbox.org/meta/

Reply via email to