commit e9efc49b6c1747da7256d0c276b97689f337f871
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Mon Jul 20 20:53:21 2020 +0200

    do away with newmaxuid
    
    now that expiration order is determined by a single loop ordered by
    far-side UIDs, it is no longer necessary to accurately track the highest
    seen UID.
    
    as a side effect, this fixes a problem reported (way too long ago) by
    Yuri D'Elia: we failed to up newmaxuid for messages we produced
    ourselves, so we would keep enumerating the same messages until we also
    propagated externally generated messages from that mailbox - which might
    have been never for the server side of archive/trash mailboxes.

 src/run-tests.pl | 18 ++++++-------
 src/sync.c       | 66 ++++++++++++++++++++++++++++--------------------
 2 files changed, 47 insertions(+), 37 deletions(-)

diff --git a/src/run-tests.pl b/src/run-tests.pl
index 77e0abc..4c257ee 100755
--- a/src/run-tests.pl
+++ b/src/run-tests.pl
@@ -61,7 +61,7 @@ my @X01 = (
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, 
"FT", I, 9, "", J, 10, "" ],
  [ 10,
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", G, 7, "FT", H, 8, 
"T", J, 9, "", I, 10, "" ],
- [ 9, 0, 9,
+ [ 10, 0, 10,
    1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 5, "T", 6, 0, "", 7, 7, 
"FT", 0, 8, "", 10, 9, "", 9, 10, "" ],
 );
 test("full", \@x01, \@X01, @O01);
@@ -73,7 +73,7 @@ my @X02 = (
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", I, 9, "", J, 10, "" ],
  [ 10,
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ],
- [ 9, 0, 9,
+ [ 10, 0, 10,
    1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 10, 9, "", 9, 10, "" ],
 );
 test("full + expunge both", \@x01, \@X02, @O02);
@@ -85,7 +85,7 @@ my @X03 = (
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, 
"FT", I, 9, "", J, 10, "" ],
  [ 10,
    A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ],
- [ 9, 0, 9,
+ [ 10, 0, 10,
    1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 0, "T", 6, 0, "", 7, 0, "T", 
10, 9, "", 9, 10, "" ],
 );
 test("full + expunge near side", \@x01, \@X03, @O03);
@@ -133,7 +133,7 @@ my @X07 = (
    A, 1, "F", B, 2, "", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "F", G, 7, 
"FT", I, 9, "", J, 10, "" ],
  [ 10,
    A, 1, "", B, 2, "F", C, 3, "F", D, 4, "", E, 5, "", G, 7, "", H, 8, "", J, 
9, "", I, 10, "" ],
- [ 9, 0, 9,
+ [ 10, 0, 10,
    1, 1, "", 2, 2, "", 3, 3, "", 4, 4, "", 5, 5, "", 6, 6, "", 7, 7, "", 8, 8, 
"", 10, 9, "", 9, 10, "" ],
 );
 test("new", \@x01, \@X07, @O07);
@@ -168,7 +168,7 @@ my @X11 = (
    A, 1, "", B, 2, "*" ],
  [ 2,
    C, 1, "*", A, 2, "" ],
- [ 2, 0, 1,
+ [ 2, 0, 2,
    0, 1, "^", 1, 2, "", 2, 0, "^" ],
 );
 test("max size", \@x10, \@X11, @O11);
@@ -180,7 +180,7 @@ my @X22 = (
    A, 1, "", B, 2, "*", C, 3, "*" ],
  [ 2,
    C, 1, "*", A, 2, "" ],
- [ 2, 0, 1,
+ [ 3, 0, 2,
    3, 1, "", 1, 2, "", 2, 0, "^" ],
 );
 test("near side max size", \@X11, \@X22, @O22);
@@ -203,7 +203,7 @@ my @X31 = (
    A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ],
  [ 5,
    A, 1, "F", B, 2, "", D, 3, "", E, 4, "S", F, 5, "" ],
- [ 6, 3, 0,
+ [ 6, 3, 5,
    1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
 );
 test("max messages", \@x30, \@X31, @O31);
@@ -215,7 +215,7 @@ my @X32 = (
    A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ],
  [ 4,
    A, 1, "F", D, 2, "", E, 3, "S", F, 4, "" ],
- [ 6, 3, 0,
+ [ 6, 3, 4,
    1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ],
 );
 test("max messages vs. unread", \@x30, \@X32, @O32);
@@ -236,7 +236,7 @@ my @X51 = (
    A, 1, "S", B, 2, "FS", C, 3, "S", D, 4, "", E, 5, "", F, 6, "" ],
  [ 6,
    B, 2, "FS", D, 4, "", E, 5, "", F, 6, "" ],
- [ 6, 3, 0,
+ [ 6, 3, 6,
    2, 2, "FS", 4, 4, "", 5, 5, "", 6, 6, "" ],
 );
 test("max messages + expunge", \@x50, \@X51, @O51);
diff --git a/src/sync.c b/src/sync.c
index 18a381b..166d7fb 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -167,7 +167,6 @@ typedef struct {
        uint ref_count, nsrecs, opts[2];
        uint new_pending[2], flags_pending[2], trash_pending[2];
        uint maxuid[2];     // highest UID that was already propagated
-       uint newmaxuid[2];  // highest UID that is currently being propagated
        uint uidval[2];     // UID validity value
        uint newuidval[2];  // UID validity obtained from driver
        uint finduid[2];    // TUID lookup makes sense only for UIDs >= this
@@ -285,6 +284,8 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs )
                        srec->msg[t] = tmsg;
                        ntmsg = tmsg->next;
                        srec->uid[t] = tmsg->uid;
+                       if (tmsg->uid == svars->maxuid[t] + 1)
+                               svars->maxuid[t] = tmsg->uid;
                        srec->status = 0;
                        srec->tuid[0] = 0;
                }
@@ -613,7 +614,7 @@ clean_strdup( const char *s )
 }
 
 
-#define JOURNAL_VERSION "3"
+#define JOURNAL_VERSION "4"
 
 static int
 prepare_state( sync_vars_t *svars )
@@ -853,8 +854,6 @@ load_state( sync_vars_t *svars )
                svars->maxxfuid = minwuid - 1;
        }
 
-       svars->newmaxuid[F] = svars->maxuid[F];
-       svars->newmaxuid[N] = svars->maxuid[N];
        int line = 0;
        if ((jfp = fopen( svars->jname, "r" ))) {
                if (!lock_state( svars ))
@@ -884,17 +883,17 @@ load_state( sync_vars_t *svars )
                                uint t1, t2, t3;
                                if ((c = buf[0]) == '#' ?
                                      (tn = 0, (sscanf( buf + 2, "%u %u %n", 
&t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
-                                     c == 'S' || c == '!' ?
+                                     c == '!' ?
                                        (sscanf( buf + 2, "%u", &t1 ) != 1) :
-                                       c == 'F' || c == 'T' || c == '+' || c 
== '&' || c == '-' || c == '=' || c == '|' ?
+                                       c == 'N' || c == 'F' || c == 'T' || c 
== '+' || c == '&' || c == '-' || c == '=' || c == '|' ?
                                          (sscanf( buf + 2, "%u %u", &t1, &t2 ) 
!= 2) :
                                          (sscanf( buf + 2, "%u %u %u", &t1, 
&t2, &t3 ) != 3))
                                {
                                        error( "Error: malformed journal entry 
at %s:%d\n", svars->jname, line );
                                        goto jbail;
                                }
-                               if (c == 'S')
-                                       svars->maxuid[t1] = 
svars->newmaxuid[t1];
+                               if (c == 'N')
+                                       svars->maxuid[t1] = t2;
                                else if (c == 'F')
                                        svars->finduid[t1] = t2;
                                else if (c == 'T')
@@ -908,10 +907,6 @@ load_state( sync_vars_t *svars )
                                        srec = nfcalloc( sizeof(*srec) );
                                        srec->uid[F] = t1;
                                        srec->uid[N] = t2;
-                                       if (svars->newmaxuid[F] < t1)
-                                               svars->newmaxuid[F] = t1;
-                                       if (svars->newmaxuid[N] < t2)
-                                               svars->newmaxuid[N] = t2;
                                        debug( "  new entry(%u,%u)\n", t1, t2 );
                                        srec->status = S_PENDING;
                                        *svars->srecadd = srec;
@@ -935,7 +930,8 @@ load_state( sync_vars_t *svars )
                                                break;
                                        case '=':
                                                debug( "aborted\n" );
-                                               svars->maxxfuid = srec->uid[F];
+                                               if (svars->maxxfuid < 
srec->uid[F])
+                                                       svars->maxxfuid = 
srec->uid[F];
                                                srec->status = S_DEAD;
                                                break;
                                        case '#':
@@ -949,12 +945,16 @@ load_state( sync_vars_t *svars )
                                        case '<':
                                                debug( "far side now %u\n", t3 
);
                                                srec->uid[F] = t3;
+                                               if (t3 == svars->maxuid[F] + 1)
+                                                       svars->maxuid[F] = t3;
                                                srec->status &= ~S_PENDING;
                                                srec->tuid[0] = 0;
                                                break;
                                        case '>':
                                                debug( "near side now %u\n", t3 
);
                                                srec->uid[N] = t3;
+                                               if (t3 == svars->maxuid[N] + 1)
+                                                       svars->maxuid[N] = t3;
                                                srec->status &= ~S_PENDING;
                                                srec->tuid[0] = 0;
                                                break;
@@ -1575,22 +1575,26 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
        for (t = 0; t < 2; t++) {
                debug( "synchronizing new messages on %s\n", str_fn[1-t] );
                for (tmsg = svars->msgs[1-t]; tmsg; tmsg = tmsg->next) {
-                       // If messages were previously ignored due to being 
excessive, they would now
-                       // appear to be newer than the messages that got 
actually synced, so increment
-                       // newmaxuid immediately to make sure we always look 
only at the newest ones.
-                       // However, committing it to maxuid must be delayed 
until all messages were
-                       // propagated, to ensure that all pending messages are 
still loaded next time
-                       // in case of interruption - in particular skipping big 
messages would otherwise
-                       // up the limit too early.
                        srec = tmsg->srec;
                        if (srec) {
                                if (srec->status & S_SKIPPED) {
                                        // The message was skipped due to being 
too big.
+                                       // We must have already seen the UID, 
but we might have been interrupted.
+                                       if (svars->maxuid[1-t] < tmsg->uid)
+                                               svars->maxuid[1-t] = tmsg->uid;
                                        if (!(svars->chan->ops[t] & OP_RENEW))
                                                continue;
                                } else {
                                        if (!(svars->chan->ops[t] & OP_NEW))
                                                continue;
+                                       // This catches messages:
+                                       // - that are actually new
+                                       // - whose propagation got interrupted
+                                       // - whose propagation was completed, 
but not logged yet
+                                       // - that aren't actually new, but a 
result of syncing, and the instant
+                                       //   maxuid upping was prevented by the 
presence of actually new messages
+                                       if (svars->maxuid[1-t] < tmsg->uid)
+                                               svars->maxuid[1-t] = tmsg->uid;
                                        if (!(srec->status & S_PENDING))
                                                continue;  // Nothing to do - 
the message is paired or expired
                                        // Propagation was scheduled, but we 
got interrupted
@@ -1606,13 +1610,14 @@ box_loaded( int sts, message_t *msgs, int total_msgs, 
int recent_msgs, void *aux
                        } else {
                                if (!(svars->chan->ops[t] & OP_NEW))
                                        continue;
-                               if (tmsg->uid <= svars->newmaxuid[1-t]) {
+                               if (tmsg->uid <= svars->maxuid[1-t]) {
                                        // The message should be already 
paired. It's not, so it was:
                                        // - previously paired, but the entry 
was expired and pruned => ignore
                                        // - attempted, but failed => ignore 
(the wisdom of this is debatable)
                                        // - ignored, as it would have been 
expunged anyway => ignore (even if undeleted)
                                        continue;
                                }
+                               svars->maxuid[1-t] = tmsg->uid;
                                debug( "new message %u\n", tmsg->uid );
 
                                if ((svars->chan->ops[t] & OP_EXPUNGE) && 
(tmsg->flags & F_DELETED)) {
@@ -1628,8 +1633,6 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int 
recent_msgs, void *aux
                                srec->uid[1-t] = tmsg->uid;
                                srec->msg[1-t] = tmsg;
                                tmsg->srec = srec;
-                               if (svars->newmaxuid[1-t] < tmsg->uid)
-                                       svars->newmaxuid[1-t] = tmsg->uid;
                                JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), 
"fresh" );
                        }
                        if ((tmsg->flags & F_FLAGGED) || tmsg->size <= 
svars->chan->stores[t]->max_size) {
@@ -1748,7 +1751,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int 
recent_msgs, void *aux
                                        // If we have so many new messages that 
some of them are instantly expired,
                                        // but some are still propagated 
because they are important, we need to
                                        // ensure explicitly that the bulk 
fetch limit is upped.
-                                       svars->maxxfuid = srec->uid[F];
+                                       if (svars->maxxfuid < srec->uid[F])
+                                               svars->maxxfuid = srec->uid[F];
                                        srec->msg[F]->srec = NULL;
                                        srec->status = S_DEAD;
                                }
@@ -1849,6 +1853,8 @@ msg_copied( int sts, uint uid, copy_vars_t *vars )
                } else {
                        JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], 
srec->uid[N], uid), "%sed message", str_hl[t] );
                        vars->srec->uid[t] = uid;
+                       if (uid == svars->maxuid[t] + 1)
+                               svars->maxuid[t] = uid;
                        vars->srec->status &= ~S_PENDING;
                        vars->srec->tuid[0] = 0;
                }
@@ -1918,10 +1924,6 @@ msgs_copied( sync_vars_t *svars, int t )
        if (svars->new_pending[t])
                goto out;
 
-       if (svars->maxuid[1-t] != svars->newmaxuid[1-t]) {
-               svars->maxuid[1-t] = svars->newmaxuid[1-t];
-               JLOG( "S %d", 1-t, "commit maxuid of %s", str_fn[1-t] );
-       }
        sync_close( svars, 1-t );
        if (check_cancel( svars ))
                goto out;
@@ -2159,6 +2161,14 @@ box_closed_p2( sync_vars_t *svars, int t )
        // the operations are idempotent, and we're about to commit the new 
state
        // right afterwards anyway.
 
+       for (t = 0; t < 2; t++) {
+               // Committing maxuid is delayed until all messages were 
propagated, to
+               // ensure that all pending messages are still loaded next time 
in case
+               // of interruption - in particular skipping big messages would 
otherwise
+               // up the limit too early.
+               JLOG( "N %d %u", (t, svars->maxuid[t]), "up maxuid of %s", 
str_fn[t] );
+       }
+
        if (((svars->state[F] | svars->state[N]) & ST_DID_EXPUNGE) || 
svars->chan->max_messages) {
                debug( "purging obsolete entries\n" );
                for (srec = svars->srecs; srec; srec = srec->next) {


_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to