commit b63f169c5781b7bb886c0bd922945aeba5dd6da0
Author: Oswald Buddenhagen <o...@users.sf.net>
Date:   Fri Nov 4 21:23:39 2016 +0100

    wrap message trashing into simple transactions
    
    trashing many messages at once inevitably overtaxes m$ exchange, and the
    connection breaks. without any progress tracking, it would restart from
    scratch each time, which would lead to a) it never finishing and b) many
    copies of the messages in the trash.
    
    full transactions as we do for "proper" syncing would be over the top,
    as it's not *that* bad if some messages get duplicated in the trash. so
    we record only the messages for which trashing completed, thus allowing
    some overlap between the attempts.

 TODO       |    3 ---
 src/sync.c |   33 +++++++++++++++++++++++++++------
 2 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/TODO b/TODO
index bfa068b..f991c43 100644
--- a/TODO
+++ b/TODO
@@ -1,8 +1,5 @@
 f{,data}sync() usage could be optimized by batching the calls.
 
-add some marker about message being already [remotely] trashed.
-real transactions would be certainly not particularly useful ...
-
 make SSL (connect) timeouts produce a bit more than "Unidentified socket 
error".
 
 uidvalidity lock timeout handling would be a good idea.
diff --git a/src/sync.c b/src/sync.c
index e5c532a..e78b8a9 100644
--- a/src/sync.c
+++ b/src/sync.c
@@ -175,6 +175,7 @@ typedef struct {
        driver_t *drv[2];
        const char *orig_name[2];
        message_t *new_msgs[2];
+       int_array_alloc_t trashed_msgs[2];
        int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
        int new_pending[2], flags_pending[2], trash_pending[2];
        int maxuid[2]; /* highest UID that was already propagated */
@@ -799,7 +800,7 @@ load_state( sync_vars_t *svars )
                                }
                                if ((c = buf[0]) == '#' ?
                                      (t3 = 0, (sscanf( buf + 2, "%d %d %n", 
&t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
-                                     c == '(' || c == ')' || c == '{' || c == 
'}' || c == '!' ?
+                                     c == '(' || c == ')' || c == '{' || c == 
'}' || c == '[' || c == ']' || c == '!' ?
                                        (sscanf( buf + 2, "%d", &t1 ) != 1) :
                                        c == '+' || c == '&' || c == '-' || c 
== '|' || c == '/' || c == '\\' ?
                                          (sscanf( buf + 2, "%d %d", &t1, &t2 ) 
!= 2) :
@@ -816,6 +817,10 @@ load_state( sync_vars_t *svars )
                                        svars->newuid[M] = t1;
                                else if (c == '}')
                                        svars->newuid[S] = t1;
+                               else if (c == '[')
+                                       *int_array_append( 
&svars->trashed_msgs[M] ) = t1;
+                               else if (c == ']')
+                                       *int_array_append( 
&svars->trashed_msgs[S] ) = t1;
                                else if (c == '!')
                                        svars->smaxxuid = t1;
                                else if (c == '|') {
@@ -1920,6 +1925,11 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int 
t )
        }
 }
 
+typedef struct {
+       void *aux;
+       message_t *msg;
+} trash_vars_t;
+
 static void msg_trashed( int sts, void *aux );
 static void msg_rtrashed( int sts, int uid, copy_vars_t *vars );
 
@@ -1927,6 +1937,7 @@ static void
 msgs_flags_set( sync_vars_t *svars, int t )
 {
        message_t *tmsg;
+       trash_vars_t *tv;
        copy_vars_t *cv;
 
        if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t])
@@ -1938,14 +1949,18 @@ msgs_flags_set( sync_vars_t *svars, int t )
            (svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && 
svars->ctx[1-t]->conf->trash_remote_new))) {
                debug( "trashing in %s\n", str_ms[t] );
                for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next)
-                       if ((tmsg->flags & F_DELETED) && (t == M || !tmsg->srec 
|| !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) {
+                       if ((tmsg->flags & F_DELETED) && !find_int_array( 
svars->trashed_msgs[t].array, tmsg->uid ) &&
+                           (t == M || !tmsg->srec || !(tmsg->srec->status & 
(S_EXPIRE|S_EXPIRED)))) {
                                if (svars->ctx[t]->conf->trash) {
                                        if 
(!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 
0) {
                                                debug( "%s: trashing message 
%d\n", str_ms[t], tmsg->uid );
                                                trash_total[t]++;
                                                stats();
                                                svars->trash_pending[t]++;
-                                               svars->drv[t]->trash_msg( 
svars->ctx[t], tmsg, msg_trashed, AUX );
+                                               tv = nfmalloc( sizeof(*tv) );
+                                               tv->aux = AUX;
+                                               tv->msg = tmsg;
+                                               svars->drv[t]->trash_msg( 
svars->ctx[t], tmsg, msg_trashed, tv );
                                                if (check_cancel( svars ))
                                                        goto out;
                                        } else
@@ -1982,13 +1997,17 @@ msgs_flags_set( sync_vars_t *svars, int t )
 static void
 msg_trashed( int sts, void *aux )
 {
+       trash_vars_t *vars = (trash_vars_t *)aux;
        DECL_SVARS;
 
        if (sts == DRV_MSG_BAD)
                sts = DRV_BOX_BAD;
-       if (check_ret( sts, aux ))
+       if (check_ret( sts, vars->aux ))
                return;
-       INIT_SVARS(aux);
+       INIT_SVARS(vars->aux);
+       debug( "  -> trashed %s %d\n", str_ms[t], vars->msg->uid );
+       Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
+       free( vars );
        trash_done[t]++;
        stats();
        svars->trash_pending[t]--;
@@ -2008,8 +2027,10 @@ msg_rtrashed( int sts, int uid ATTR_UNUSED, copy_vars_t 
*vars )
                free( vars );
                return;
        }
-       free( vars );
        t ^= 1;
+       debug( "  -> remote trashed %s %d\n", str_ms[t], vars->msg->uid );
+       Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
+       free( vars );
        trash_done[t]++;
        stats();
        svars->trash_pending[t]--;

------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today. http://sdm.link/xeonphi
_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to