This internal API is better suited for fork-friendliness (but locking + dedupe still needs to be re-added).
Normal "json" is the default, though stream-friendly "concatjson" and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working (though tests aren't working, yet). For normal "json", the biggest downside is the necessity of a trailing "null" element at the end of the array because of parallel processes, since (AFAIK) regular JSON doesn't allow trailing commas, unlike JavaScript. --- MANIFEST | 1 + lib/PublicInbox/LeiOverview.pm | 188 +++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiQuery.pm | 66 +++--------- lib/PublicInbox/LeiXSearch.pm | 25 +++-- 4 files changed, 217 insertions(+), 63 deletions(-) create mode 100644 lib/PublicInbox/LeiOverview.pm diff --git a/MANIFEST b/MANIFEST index caddd8df..810aec42 100644 --- a/MANIFEST +++ b/MANIFEST @@ -166,6 +166,7 @@ lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExternal.pm +lib/PublicInbox/LeiOverview.pm lib/PublicInbox/LeiQuery.pm lib/PublicInbox/LeiSearch.pm lib/PublicInbox/LeiStore.pm diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm new file mode 100644 index 00000000..8a1f4f82 --- /dev/null +++ b/lib/PublicInbox/LeiOverview.pm @@ -0,0 +1,188 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# per-mitem/smsg iterators for search results +# "ovv" => "Overview viewer" +package PublicInbox::LeiOverview; +use strict; +use v5.10.1; +use POSIX qw(strftime); +use File::Spec; +use PublicInbox::MID qw($MID_EXTRACT); +use PublicInbox::Address qw(pairs); +use PublicInbox::Config; +use PublicInbox::Search qw(get_pct); + +# cf. https://en.wikipedia.org/wiki/JSON_streaming +my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing + +sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } + +sub new { + my ($class, $lei) = @_; + my $opt = $lei->{opt}; + my $out = $opt->{output} // '-'; + $out = '/dev/stdout' if $out eq '-'; + + my $fmt = $opt->{'format'}; + $fmt = lc($fmt) if defined $fmt; + if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/ + my $ofmt = lc $1; + $fmt //= $ofmt; + return $lei->fail(<<"") if $fmt ne $ofmt; +--format=$fmt and --output=$ofmt conflict + + } + $fmt //= 'json' if $out eq '/dev/stdout'; + $fmt //= 'maildir'; # TODO + + if (index($out, '://') < 0) { # not a URL, so assume path + $out = File::Spec->canonpath($out); + } # else URL + + my $self = bless { fmt => $fmt, out => $out }, $class; + my $json; + if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) { + $json = $self->{json} = ref(PublicInbox::Config->json); + } + my ($isatty, $seekable); + if ($out eq '/dev/stdout') { + $isatty = -t $lei->{1}; + $lei->start_pager if $isatty; + $opt->{pretty} //= $isatty; + } elsif ($json) { + return $lei->fail('JSON formats only output to stdout'); + } + $self; +} + +# called once by parent +sub ovv_begin { + my ($self, $lei) = @_; + if ($self->{fmt} eq 'json') { + print { $lei->{1} } '['; + } # TODO HTML/Atom/... +} + +# called once by parent (via PublicInbox::EOFpipe) +sub ovv_end { + my ($self, $lei) = @_; + if ($self->{fmt} eq 'json') { + # JSON doesn't allow trailing commas, and preventing + # trailing commas is a PITA when parallelizing outputs + print { $lei->{1} } "null]\n"; + } elsif ($self->{fmt} eq 'concatjson') { + print { $lei->{1} } "\n"; + } +} + +sub ovv_atfork_child { + my ($self) = @_; + # reopen dedupe here +} + +# prepares an smsg for JSON +sub _unbless_smsg { + my ($smsg, $mitem) = @_; + + delete @$smsg{qw(lines bytes num tid)}; + $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt + $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate + $smsg->{relevance} = get_pct($mitem) if $mitem; + + if (my $r = delete $smsg->{references}) { + $smsg->{references} = [ + map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ]; + } + if (my $m = delete($smsg->{mid})) { + $smsg->{'m'} = "<$m>"; + } + for my $f (qw(from to cc)) { + my $v = delete $smsg->{$f} or next; + $smsg->{substr($f, 0, 1)} = pairs($v); + } + $smsg->{'s'} = delete $smsg->{subject}; + # can we be bothered to parse From/To/Cc into arrays? + scalar { %$smsg }; # unbless +} + +sub ovv_atexit_child { + my ($self, $lei) = @_; + my $bref = delete $lei->{ovv_buf} or return; + print { $lei->{1} } $$bref; +} + +# JSON module ->pretty output wastes too much vertical white space, +# this (IMHO) provides better use of screen real-estate while not +# being excessively compact: +sub _json_pretty { + my ($json, $k, $v) = @_; + if (ref $v eq 'ARRAY') { + if (@$v) { + my $sep = ",\n" . (' ' x (length($k) + 7)); + if (ref($v->[0])) { # f/t/c + $v = '[' . join($sep, map { + my $pair = $json->encode($_); + $pair =~ s/(null|"),"/$1, "/g; + $pair; + } @$v) . ']'; + } else { # references + $v = '[' . join($sep, map { + substr($json->encode([$_]), 1, -1); + } @$v) . ']'; + } + } else { + $v = '[]'; + } + } + qq{ "$k": }.$v; +} + +sub ovv_each_smsg_cb { + my ($self, $lei) = @_; + $lei->{ovv_buf} = \(my $buf = ''); + my $json = $self->{json}->new; + if ($json) { + $json->utf8->canonical; + $json->ascii(1) if $lei->{opt}->{ascii}; + } + if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { + my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; + sub { # DIY prettiness :P + my ($smsg, $mitem) = @_; + $smsg = _unbless_smsg($smsg, $mitem); + $buf .= "{\n"; + $buf .= join(",\n", map { + my $v = $smsg->{$_}; + if (ref($v)) { + _json_pretty($json, $_, $v); + } else { + $v = $json->encode([$v]); + qq{ "$_": }.substr($v, 1, -1); + } + } sort keys %$smsg); + $buf .= $EOR; + if (length($buf) > 65536) { + print { $lei->{1} } $buf; + $buf = ''; + } + } + } elsif ($json) { + my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL + sub { + my ($smsg, $mitem) = @_; + delete @$smsg{qw(tid num)}; + $buf .= $json->encode(_unbless_smsg(@_)) . $ORS; + if (length($buf) > 65536) { + print { $lei->{1} } $buf; + $buf = ''; + } + } + } elsif ($self->{fmt} eq 'oid') { + sub { + my ($smsg, $mitem) = @_; + } + } # else { ... +} + +1; diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 2f4b99e5..7ca01454 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -5,43 +5,8 @@ package PublicInbox::LeiQuery; use strict; use v5.10.1; -use PublicInbox::MID qw($MID_EXTRACT); -use POSIX qw(strftime); -use PublicInbox::Address qw(pairs); use PublicInbox::DS qw(dwaitpid); -sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } - -# prepares an smsg for JSON -sub _smsg_unbless ($) { - my ($smsg) = @_; - - delete @$smsg{qw(lines bytes)}; - $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt - $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate - - if (my $r = delete $smsg->{references}) { - $smsg->{references} = [ - map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ]; - } - if (my $m = delete($smsg->{mid})) { - $smsg->{'m'} = "<$m>"; - } - # XXX breaking to/cc, into structured arrays or tables which - # distinguish "$phrase <$address>" causes pretty printing JSON - # to take up too much vertical space. I can't get either - # Cpanel::JSON::XS or JSON::XS or jq(1) only indent when - # wrapping is necessary, rather than blindly indenting and - # adding vertical space everywhere. - for my $f (qw(from to cc)) { - my $v = delete $smsg->{$f} or next; - $smsg->{substr($f, 0, 1)} = $v; - } - $smsg->{'s'} = delete $smsg->{subject}; - # can we be bothered to parse From/To/Cc into arrays? - scalar { %$smsg }; # unbless -} - sub _vivify_external { # _externals_each callback my ($src, $dir) = @_; if (-f "$dir/ei.lock") { @@ -68,6 +33,7 @@ sub lei_q { # src: LeiXSearch || LeiSearch || Inbox my @srcs; require PublicInbox::LeiXSearch; + require PublicInbox::LeiOverview; my $lxs = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external @@ -83,23 +49,9 @@ sub lei_q { // $lxs->wq_workers($j); } unshift(@srcs, $sto->search) if $opt->{'local'}; - my $out = $opt->{output} // '-'; - $out = 'json:/dev/stdout' if $out eq '-'; - my $isatty = -t $self->{1}; # no forking workers after this - $self->start_pager if $isatty; - my $json = substr($out, 0, 5) eq 'json:' ? - ref(PublicInbox::Config->json)->new : undef; - if ($json) { - if ($opt->{pretty} //= $isatty) { - $json->pretty(1)->space_before(0); - $json->indent_length($opt->{indent} // 2); - } - $json->utf8; # avoid Wide character in print warnings - $json->ascii(1) if $opt->{ascii}; # for "\uXXXX" - $json->canonical; - } - + require PublicInbox::LeiOverview; + $self->{ovv} = PublicInbox::LeiOverview->new($self); my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; $mset_opt{qstr} = join(' ', map {; @@ -124,7 +76,17 @@ sub lei_q { $mset_opt{relevance} //= -2 if $opt->{thread}; # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); $self->{mset_opt} = \%mset_opt; - $lxs->do_query($self, \@srcs); + $self->{ovv}->ovv_begin($self); + pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; + require PublicInbox::EOFpipe; + my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self); + $lxs->do_query($self, $qry_done, \@srcs); + $eof->event_step unless $self->{sock}; +} + +sub query_done { # PublicInbox::EOFpipe callback + my ($self) = @_; + $self->{ovv}->ovv_end($self); } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 94f7c2bc..c030b2b2 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); -use PublicInbox::Search qw(get_pct); use Sys::Syslog qw(syslog); sub new { @@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread } my $mo = { %{$lei->{mset_opt}} }; my $mset; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); do { $mset = $srch->mset($mo->{qstr}, $mo); my $ids = $srch->mset_to_artnums($mset, $mo); my $ctx = { ids => $ids }; my $i = 0; - my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items; + my %n2item = map { ($ids->[$i++], $_) } $mset->items; while ($over->expand_thread($ctx)) { for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; # next if $dd->is_smsg_dup($smsg); TODO - if (my $p = delete $n2p{$smsg->{num}}) { - $smsg->{relevance} = $p; - } - print { $self->{1} } Dumper($smsg); + my $mitem = delete $n2item{$smsg->{num}}; + $each_smsg->($smsg, $mitem); # $self->out($buf .= $ORS); # $emit_cb->($smsg); } @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); + $lei->{ovv}->ovv_atexit_child($lei); } sub query_mset { # non-parallel for non-"--thread" users @@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users my $mset; local %SIG = (%SIG, $lei->atfork_child_wq($self)); $self->attach_external($_) for @$srcs; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); do { $mset = $self->mset($mo->{qstr}, $mo); for my $it ($mset->items) { my $smsg = smsg_for($self, $it) or next; # next if $dd->is_smsg_dup($smsg); - $smsg->{relevance} = get_pct($it); - use Data::Dumper; - print { $self->{1} } Dumper($smsg); + $each_smsg->($smsg, $it); # $self->out($buf .= $ORS) if defined $buf; #$emit_cb->($smsg); } } while (_mset_more($mset, $mo)); + $lei->{ovv}->ovv_atexit_child($lei); } sub do_query { - my ($self, $lei_orig, $srcs) = @_; + my ($self, $lei_orig, $qry_done, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); + $io[0] = $qry_done; # don't need stdin $io[1]->autoflush(1); $io[2]->autoflush(1); if ($lei->{opt}->{thread}) { @@ -160,6 +160,9 @@ sub do_query { for my $rmt (@{$self->{remotes} // []}) { $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } + + # sent off to children, they will drop remaining references to it + close $qry_done; } sub ipc_atfork_child { @@ -170,7 +173,7 @@ sub ipc_atfork_child { sub ipc_atfork_prepare { my ($self) = @_; - $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]); + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } -- unsubscribe: one-click, see List-Unsubscribe header archive: https://public-inbox.org/meta/