This internal API is better suited for fork-friendliness (but
locking + dedupe still needs to be re-added).

Normal "json" is the default, though stream-friendly "concatjson"
and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working
(though tests aren't working, yet).

For normal "json", the biggest downside is the necessity of a
trailing "null" element at the end of the array because of
parallel processes, since (AFAIK) regular JSON doesn't allow
trailing commas, unlike JavaScript.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LeiOverview.pm | 188 +++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm    |  66 +++---------
 lib/PublicInbox/LeiXSearch.pm  |  25 +++--
 4 files changed, 217 insertions(+), 63 deletions(-)
 create mode 100644 lib/PublicInbox/LeiOverview.pm

diff --git a/MANIFEST b/MANIFEST
index caddd8df..810aec42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -166,6 +166,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
new file mode 100644
index 00000000..8a1f4f82
--- /dev/null
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -0,0 +1,188 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# per-mitem/smsg iterators for search results
+# "ovv" => "Overview viewer"
+package PublicInbox::LeiOverview;
+use strict;
+use v5.10.1;
+use POSIX qw(strftime);
+use File::Spec;
+use PublicInbox::MID qw($MID_EXTRACT);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Config;
+use PublicInbox::Search qw(get_pct);
+
+# cf. https://en.wikipedia.org/wiki/JSON_streaming
+my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+sub new {
+       my ($class, $lei) = @_;
+       my $opt = $lei->{opt};
+       my $out = $opt->{output} // '-';
+       $out = '/dev/stdout' if $out eq '-';
+
+       my $fmt = $opt->{'format'};
+       $fmt = lc($fmt) if defined $fmt;
+       if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+               my $ofmt = lc $1;
+               $fmt //= $ofmt;
+               return $lei->fail(<<"") if $fmt ne $ofmt;
+--format=$fmt and --output=$ofmt conflict
+
+       }
+       $fmt //= 'json' if $out eq '/dev/stdout';
+       $fmt //= 'maildir'; # TODO
+
+       if (index($out, '://') < 0) { # not a URL, so assume path
+                $out = File::Spec->canonpath($out);
+       } # else URL
+
+       my $self = bless { fmt => $fmt, out => $out }, $class;
+       my $json;
+       if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
+               $json = $self->{json} = ref(PublicInbox::Config->json);
+       }
+       my ($isatty, $seekable);
+       if ($out eq '/dev/stdout') {
+               $isatty = -t $lei->{1};
+               $lei->start_pager if $isatty;
+               $opt->{pretty} //= $isatty;
+       } elsif ($json) {
+               return $lei->fail('JSON formats only output to stdout');
+       }
+       $self;
+}
+
+# called once by parent
+sub ovv_begin {
+       my ($self, $lei) = @_;
+       if ($self->{fmt} eq 'json') {
+               print { $lei->{1} } '[';
+       } # TODO HTML/Atom/...
+}
+
+# called once by parent (via PublicInbox::EOFpipe)
+sub ovv_end {
+       my ($self, $lei) = @_;
+       if ($self->{fmt} eq 'json') {
+               # JSON doesn't allow trailing commas, and preventing
+               # trailing commas is a PITA when parallelizing outputs
+               print { $lei->{1} } "null]\n";
+       } elsif ($self->{fmt} eq 'concatjson') {
+               print { $lei->{1} } "\n";
+       }
+}
+
+sub ovv_atfork_child {
+       my ($self) = @_;
+       # reopen dedupe here
+}
+
+# prepares an smsg for JSON
+sub _unbless_smsg {
+       my ($smsg, $mitem) = @_;
+
+       delete @$smsg{qw(lines bytes num tid)};
+       $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+       $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+       $smsg->{relevance} = get_pct($mitem) if $mitem;
+
+       if (my $r = delete $smsg->{references}) {
+               $smsg->{references} = [
+                               map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+       }
+       if (my $m = delete($smsg->{mid})) {
+               $smsg->{'m'} = "<$m>";
+       }
+       for my $f (qw(from to cc)) {
+               my $v = delete $smsg->{$f} or next;
+               $smsg->{substr($f, 0, 1)} = pairs($v);
+       }
+       $smsg->{'s'} = delete $smsg->{subject};
+       # can we be bothered to parse From/To/Cc into arrays?
+       scalar { %$smsg }; # unbless
+}
+
+sub ovv_atexit_child {
+       my ($self, $lei) = @_;
+       my $bref = delete $lei->{ovv_buf} or return;
+       print { $lei->{1} } $$bref;
+}
+
+# JSON module ->pretty output wastes too much vertical white space,
+# this (IMHO) provides better use of screen real-estate while not
+# being excessively compact:
+sub _json_pretty {
+       my ($json, $k, $v) = @_;
+       if (ref $v eq 'ARRAY') {
+               if (@$v) {
+                       my $sep = ",\n" . (' ' x (length($k) + 7));
+                       if (ref($v->[0])) { # f/t/c
+                               $v = '[' . join($sep, map {
+                                       my $pair = $json->encode($_);
+                                       $pair =~ s/(null|"),"/$1, "/g;
+                                       $pair;
+                               } @$v) . ']';
+                       } else { # references
+                               $v = '[' . join($sep, map {
+                                       substr($json->encode([$_]), 1, -1);
+                               } @$v) . ']';
+                       }
+               } else {
+                       $v = '[]';
+               }
+       }
+       qq{  "$k": }.$v;
+}
+
+sub ovv_each_smsg_cb {
+       my ($self, $lei) = @_;
+       $lei->{ovv_buf} = \(my $buf = '');
+       my $json = $self->{json}->new;
+       if ($json) {
+               $json->utf8->canonical;
+               $json->ascii(1) if $lei->{opt}->{ascii};
+       }
+       if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+               my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+               sub { # DIY prettiness :P
+                       my ($smsg, $mitem) = @_;
+                       $smsg = _unbless_smsg($smsg, $mitem);
+                       $buf .= "{\n";
+                       $buf .= join(",\n", map {
+                               my $v = $smsg->{$_};
+                               if (ref($v)) {
+                                       _json_pretty($json, $_, $v);
+                               } else {
+                                       $v = $json->encode([$v]);
+                                       qq{  "$_": }.substr($v, 1, -1);
+                               }
+                       } sort keys %$smsg);
+                       $buf .= $EOR;
+                       if (length($buf) > 65536) {
+                               print { $lei->{1} } $buf;
+                               $buf = '';
+                       }
+               }
+       } elsif ($json) {
+               my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+               sub {
+                       my ($smsg, $mitem) = @_;
+                       delete @$smsg{qw(tid num)};
+                       $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
+                       if (length($buf) > 65536) {
+                               print { $lei->{1} } $buf;
+                               $buf = '';
+                       }
+               }
+       } elsif ($self->{fmt} eq 'oid') {
+               sub {
+                       my ($smsg, $mitem) = @_;
+               }
+       } # else { ...
+}
+
+1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 2f4b99e5..7ca01454 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -5,43 +5,8 @@
 package PublicInbox::LeiQuery;
 use strict;
 use v5.10.1;
-use PublicInbox::MID qw($MID_EXTRACT);
-use POSIX qw(strftime);
-use PublicInbox::Address qw(pairs);
 use PublicInbox::DS qw(dwaitpid);
 
-sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-
-# prepares an smsg for JSON
-sub _smsg_unbless ($) {
-       my ($smsg) = @_;
-
-       delete @$smsg{qw(lines bytes)};
-       $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
-       $smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
-
-       if (my $r = delete $smsg->{references}) {
-               $smsg->{references} = [
-                               map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
-       }
-       if (my $m = delete($smsg->{mid})) {
-               $smsg->{'m'} = "<$m>";
-       }
-       # XXX breaking to/cc, into structured arrays or tables which
-       # distinguish "$phrase <$address>" causes pretty printing JSON
-       # to take up too much vertical space.  I can't get either
-       # Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
-       # wrapping is necessary, rather than blindly indenting and
-       # adding vertical space everywhere.
-       for my $f (qw(from to cc)) {
-               my $v = delete $smsg->{$f} or next;
-               $smsg->{substr($f, 0, 1)} = $v;
-       }
-       $smsg->{'s'} = delete $smsg->{subject};
-       # can we be bothered to parse From/To/Cc into arrays?
-       scalar { %$smsg }; # unbless
-}
-
 sub _vivify_external { # _externals_each callback
        my ($src, $dir) = @_;
        if (-f "$dir/ei.lock") {
@@ -68,6 +33,7 @@ sub lei_q {
        # src: LeiXSearch || LeiSearch || Inbox
        my @srcs;
        require PublicInbox::LeiXSearch;
+       require PublicInbox::LeiOverview;
        my $lxs = PublicInbox::LeiXSearch->new;
 
        # --external is enabled by default, but allow --no-external
@@ -83,23 +49,9 @@ sub lei_q {
                        // $lxs->wq_workers($j);
        }
        unshift(@srcs, $sto->search) if $opt->{'local'};
-       my $out = $opt->{output} // '-';
-       $out = 'json:/dev/stdout' if $out eq '-';
-       my $isatty = -t $self->{1};
        # no forking workers after this
-       $self->start_pager if $isatty;
-       my $json = substr($out, 0, 5) eq 'json:' ?
-               ref(PublicInbox::Config->json)->new : undef;
-       if ($json) {
-               if ($opt->{pretty} //= $isatty) {
-                       $json->pretty(1)->space_before(0);
-                       $json->indent_length($opt->{indent} // 2);
-               }
-               $json->utf8; # avoid Wide character in print warnings
-               $json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
-               $json->canonical;
-       }
-
+       require PublicInbox::LeiOverview;
+       $self->{ovv} = PublicInbox::LeiOverview->new($self);
        my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
        $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
        $mset_opt{qstr} = join(' ', map {;
@@ -124,7 +76,17 @@ sub lei_q {
        $mset_opt{relevance} //= -2 if $opt->{thread};
        # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
        $self->{mset_opt} = \%mset_opt;
-       $lxs->do_query($self, \@srcs);
+       $self->{ovv}->ovv_begin($self);
+       pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+       require PublicInbox::EOFpipe;
+       my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
+       $lxs->do_query($self, $qry_done, \@srcs);
+       $eof->event_step unless $self->{sock};
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+       my ($self) = @_;
+       $self->{ovv}->ovv_end($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 94f7c2bc..c030b2b2 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
 use Sys::Syslog qw(syslog);
 
 sub new {
@@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread
        }
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
-               my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+               my %n2item = map { ($ids->[$i++], $_) } $mset->items;
                while ($over->expand_thread($ctx)) {
                        for my $n (@{$ctx->{xids}}) {
                                my $smsg = $over->get_art($n) or next;
                                # next if $dd->is_smsg_dup($smsg); TODO
-                               if (my $p = delete $n2p{$smsg->{num}}) {
-                                       $smsg->{relevance} = $p;
-                               }
-                               print { $self->{1} } Dumper($smsg);
+                               my $mitem = delete $n2item{$smsg->{num}};
+                               $each_smsg->($smsg, $mitem);
                                # $self->out($buf .= $ORS);
                                # $emit_cb->($smsg);
                        }
                        @{$ctx->{xids}} = ();
                }
        } while (_mset_more($mset, $mo));
+       $lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
@@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $mset;
        local %SIG = (%SIG, $lei->atfork_child_wq($self));
        $self->attach_external($_) for @$srcs;
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
                for my $it ($mset->items) {
                        my $smsg = smsg_for($self, $it) or next;
                        # next if $dd->is_smsg_dup($smsg);
-                       $smsg->{relevance} = get_pct($it);
-                       use Data::Dumper;
-                       print { $self->{1} } Dumper($smsg);
+                       $each_smsg->($smsg, $it);
                        # $self->out($buf .= $ORS) if defined $buf;
                        #$emit_cb->($smsg);
                }
        } while (_mset_more($mset, $mo));
+       $lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub do_query {
-       my ($self, $lei_orig, $srcs) = @_;
+       my ($self, $lei_orig, $qry_done, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+       $io[0] = $qry_done; # don't need stdin
        $io[1]->autoflush(1);
        $io[2]->autoflush(1);
        if ($lei->{opt}->{thread}) {
@@ -160,6 +160,9 @@ sub do_query {
        for my $rmt (@{$self->{remotes} // []}) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
+
+       # sent off to children, they will drop remaining references to it
+       close $qry_done;
 }
 
 sub ipc_atfork_child {
@@ -170,7 +173,7 @@ sub ipc_atfork_child {
 
 sub ipc_atfork_prepare {
        my ($self) = @_;
-       $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
--
unsubscribe: one-click, see List-Unsubscribe header
archive: https://public-inbox.org/meta/

Reply via email to