commit c84f4a8a5d00bba20caa1c6c672d3db2a3e9fb94
Author: Oswald Buddenhagen <o...@kde.org>
Date:   Sat Aug 25 18:26:23 2012 +0200

    fully asynchronous IMAP operation
    
    - asynchronous sockets using an event loop
      - connect & starttls have completion callback parameters
      - callbacks for notification about filled input buffer and emptied
        output buffer
    - unsent imap command queue
      - used when
        - socket output buffer is non-empty
        - number of commands in flight exceeds limit
        - last sent command requires round-trip
        - command has a dependency on completion of previous command
      - trashnc is tri-state so only a single "scout" trash APPEND/COPY is
        sent at first. a possibly resulting CREATE is injected in front of
        the remaining trash commands, so they can succeed (or be cancel()d
        if it fails).
      - queue's presence necessitates imap_cancel implementation

 configure.in   |    2 +-
 src/drv_imap.c |  289 +++++++++++++++++++++---------------
 src/isync.h    |   34 ++++-
 src/mbsync.1   |    4 +-
 src/socket.c   |  392 +++++++++++++++++++++++++++++++++++++-----------
 5 files changed, 506 insertions(+), 215 deletions(-)

diff --git a/configure.in b/configure.in
index f63081a..36b07d0 100644
--- a/configure.in
+++ b/configure.in
@@ -9,7 +9,7 @@ if test "$GCC" = yes; then
     CFLAGS="$CFLAGS -pipe -W -Wall -Wshadow -Wstrict-prototypes"
 fi
 
-AC_CHECK_HEADERS(sys/filio.h sys/poll.h sys/select.h)
+AC_CHECK_HEADERS(sys/poll.h sys/select.h)
 AC_CHECK_FUNCS(vasprintf)
 
 AC_CHECK_LIB(socket, socket, [SOCK_LIBS="-lsocket"])
diff --git a/src/drv_imap.c b/src/drv_imap.c
index 64e0e44..2f9347f 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -81,7 +81,8 @@ typedef struct imap_store {
        const char *prefix;
        int ref_count;
        int uidnext; /* from SELECT responses */
-       unsigned trashnc:1; /* trash folder's existence is not confirmed yet */
+       /* trash folder's existence is not confirmed yet */
+       enum { TrashUnknown, TrashChecking, TrashKnown } trashnc;
        unsigned got_namespace:1;
        list_t *ns_personal, *ns_other, *ns_shared; /* NAMESPACE info */
        message_t **msgapp; /* FETCH results */
@@ -89,12 +90,15 @@ typedef struct imap_store {
        parse_list_state_t parse_list_sts;
        /* command queue */
        int nexttag, num_in_progress, literal_pending;
+       struct imap_cmd *pending, **pending_append;
        struct imap_cmd *in_progress, **in_progress_append;
 
        /* Used during sequential operations like connect */
        enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } 
greeting;
+       int canceling; /* imap_cancel() is in progress */
        union {
                void (*imap_open)( store_t *srv, void *aux );
+               void (*imap_cancel)( void *aux );
        } callbacks;
        void *callback_aux;
 
@@ -115,6 +119,7 @@ struct imap_cmd {
                int data_len;
                int uid; /* to identify fetch responses */
                unsigned
+                       high_prio:1, /* if command is queued, put it at the 
front of the queue. */
                        to_trash:1, /* we are storing to trash, not current. */
                        create:1, /* create the mailbox if we get an error ... 
*/
                        trycreate:1; /* ... but only if this is true or the 
server says so. */
@@ -179,8 +184,6 @@ static const char *cap_list[] = {
 #define RESP_NO       1
 #define RESP_CANCEL   2
 
-static int get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd );
-
 static INLINE void imap_ref( imap_store_t *ctx ) { ++ctx->ref_count; }
 static int imap_deref( imap_store_t *ctx );
 
@@ -221,30 +224,18 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, 
int response )
        free( cmd );
 }
 
-static struct imap_cmd *
-v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
-                   const char *fmt, va_list ap )
+static int
+send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
 {
        int bufl, litplus;
        const char *buffmt;
        char buf[1024];
 
-       assert( ctx );
-       assert( ctx->gen.bad_callback );
-       assert( cmd );
-       assert( cmd->param.done );
-
-       while (ctx->literal_pending)
-               if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
-                       goto bail;
-
        cmd->tag = ++ctx->nexttag;
-       if (fmt)
-               nfvasprintf( &cmd->cmd, fmt, ap );
        if (!cmd->param.data) {
                buffmt = "%d %s\r\n";
                litplus = 0;
-       } else if ((cmd->param.to_trash && ctx->trashnc) || !CAP(LITERALPLUS)) {
+       } else if ((cmd->param.to_trash && ctx->trashnc == TrashUnknown) || 
!CAP(LITERALPLUS)) {
                buffmt = "%d %s{%d}\r\n";
                litplus = 0;
        } else {
@@ -272,27 +263,52 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd 
*cmd,
        } else if (cmd->param.cont || cmd->param.data) {
                ctx->literal_pending = 1;
        }
+       if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
+               ctx->trashnc = TrashChecking;
        cmd->next = 0;
        *ctx->in_progress_append = cmd;
        ctx->in_progress_append = &cmd->next;
        ctx->num_in_progress++;
-       return cmd;
+       return 0;
 
   bail:
        done_imap_cmd( ctx, cmd, RESP_CANCEL );
-       return NULL;
+       return -1;
 }
 
-static struct imap_cmd *
-submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, const char *fmt, ... 
)
+static int
+cmd_submittable( imap_store_t *ctx, struct imap_cmd *cmd )
 {
-       struct imap_cmd *ret;
-       va_list ap;
+       return !ctx->conn.write_buf &&
+              !ctx->literal_pending &&
+              !(cmd->param.to_trash && ctx->trashnc == TrashChecking) &&
+              ctx->num_in_progress < ((imap_store_conf_t 
*)ctx->gen.conf)->server->max_in_progress;
+}
 
-       va_start( ap, fmt );
-       ret = v_submit_imap_cmd( ctx, cmd, fmt, ap );
-       va_end( ap );
-       return ret;
+static int
+flush_imap_cmds( imap_store_t *ctx )
+{
+       struct imap_cmd *cmd;
+
+       while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
+               if (!(ctx->pending = cmd->next))
+                       ctx->pending_append = &ctx->pending;
+               if (send_imap_cmd( ctx, cmd ) < 0)
+                       return -1;
+       }
+       return 0;
+}
+
+static void
+cancel_pending_imap_cmds( imap_store_t *ctx )
+{
+       struct imap_cmd *cmd;
+
+       while ((cmd = ctx->pending)) {
+               if (!(ctx->pending = cmd->next))
+                       ctx->pending_append = &ctx->pending;
+               done_imap_cmd( ctx, cmd, RESP_CANCEL );
+       }
 }
 
 static void
@@ -308,6 +324,29 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
 }
 
 static int
+submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
+{
+       assert( ctx );
+       assert( ctx->gen.bad_callback );
+       assert( cmd );
+       assert( cmd->param.done );
+
+       if ((ctx->pending && !cmd->param.high_prio) || !cmd_submittable( ctx, 
cmd )) {
+               if (ctx->pending && cmd->param.high_prio) {
+                       cmd->next = ctx->pending;
+                       ctx->pending = cmd;
+               } else {
+                       cmd->next = 0;
+                       *ctx->pending_append = cmd;
+                       ctx->pending_append = &cmd->next;
+               }
+               return 0;
+       }
+
+       return send_imap_cmd( ctx, cmd );
+}
+
+static int
 imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
            void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response 
),
            const char *fmt, ... )
@@ -318,12 +357,9 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
                cmdp = new_imap_cmd( sizeof(*cmdp) );
        cmdp->param.done = done;
        va_start( ap, fmt );
-       cmdp = v_submit_imap_cmd( ctx, cmdp, fmt, ap );
+       nfvasprintf( &cmdp->cmd, fmt, ap );
        va_end( ap );
-       if (!cmdp)
-               return RESP_CANCEL;
-
-       return get_cmd_result( ctx, cmdp );
+       return submit_imap_cmd( ctx, cmdp );
 }
 
 static void
@@ -393,25 +429,6 @@ imap_refcounted_done( struct imap_cmd_refcounted_state 
*sts )
        free( sts );
 }
 
-/*
-static void
-drain_imap_replies( imap_store_t *ctx )
-{
-       while (ctx->num_in_progress)
-               get_cmd_result( ctx, 0 );
-}
-*/
-
-static int
-process_imap_replies( imap_store_t *ctx )
-{
-       while (ctx->num_in_progress > ((imap_store_conf_t 
*)ctx->gen.conf)->server->max_in_progress ||
-              socket_pending( &ctx->conn ))
-               if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
-                       return RESP_CANCEL;
-       return RESP_OK;
-}
-
 static int
 is_atom( list_t *list )
 {
@@ -798,20 +815,22 @@ struct imap_cmd_trycreate {
 static void imap_open_store_greeted( imap_store_t * );
 static void get_cmd_result_p2( imap_store_t *, struct imap_cmd *, int );
 
-static int
-get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
+static void
+imap_socket_read( void *aux )
 {
+       imap_store_t *ctx = (imap_store_t *)aux;
        struct imap_cmd *cmdp, **pcmdp;
        char *cmd, *arg, *arg1, *p;
        int resp, resp2, tag, greeted;
 
        greeted = ctx->greeting;
+       if (ctx->parse_list_sts.level) {
+               cmd = 0;
+               goto do_fetch;
+       }
        for (;;) {
-               if (!(cmd = socket_read_line( &ctx->conn ))) {
-                       if (socket_fill( &ctx->conn ) < 0)
-                               return RESP_CANCEL;
-                       continue;
-               }
+               if (!(cmd = socket_read_line( &ctx->conn )))
+                       return;
 
                arg = next_arg( &cmd );
                if (*arg == '*') {
@@ -850,11 +869,8 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                  do_fetch:
                                        if ((resp = parse_imap_list( ctx, &cmd, 
&ctx->parse_list_sts )) == LIST_BAD)
                                                break; /* stream is likely to 
be useless now */
-                                       if (resp == LIST_PARTIAL) {
-                                               if (socket_fill( &ctx->conn ) < 
0)
-                                                       return RESP_CANCEL;
-                                               goto do_fetch;
-                                       }
+                                       if (resp == LIST_PARTIAL)
+                                               return;
                                        if (parse_fetch( ctx, 
ctx->parse_list_sts.head ) < 0)
                                                break; /* this may mean 
anything, so prefer not to spam the log */
                                }
@@ -865,8 +881,10 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                        if (greeted == GreetingPending) {
                                imap_ref( ctx );
                                imap_open_store_greeted( ctx );
-                               return imap_deref( ctx ) ? RESP_CANCEL : 
RESP_OK;
+                               if (imap_deref( ctx ))
+                                       return;
                        }
+                       continue;
                } else if (!ctx->in_progress) {
                        error( "IMAP error: unexpected reply: %s %s\n", arg, 
cmd ? cmd : "" );
                        break; /* this may mean anything, so prefer not to spam 
the log */
@@ -876,24 +894,22 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                        cmdp = ctx->in_progress;
                        if (cmdp->param.data) {
                                if (cmdp->param.to_trash)
-                                       ctx->trashnc = 0; /* Can't get NO 
[TRYCREATE] any more. */
+                                       ctx->trashnc = TrashKnown; /* Can't get 
NO [TRYCREATE] any more. */
                                p = cmdp->param.data;
                                cmdp->param.data = 0;
                                if (socket_write( &ctx->conn, p, 
cmdp->param.data_len, GiveOwn ) < 0)
-                                       return RESP_CANCEL;
+                                       return;
                        } else if (cmdp->param.cont) {
                                if (cmdp->param.cont( ctx, cmdp, cmd ))
-                                       return RESP_CANCEL;
+                                       return;
                        } else {
                                error( "IMAP error: unexpected command 
continuation request\n" );
                                break;
                        }
                        if (socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0)
-                               return RESP_CANCEL;
+                               return;
                        if (!cmdp->param.cont)
                                ctx->literal_pending = 0;
-                       if (!tcmd)
-                               return RESP_OK;
                } else {
                        tag = atoi( arg );
                        for (pcmdp = &ctx->in_progress; (cmdp = *pcmdp); pcmdp 
= &cmdp->next)
@@ -910,7 +926,7 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                        arg = next_arg( &cmd );
                        if (!strcmp( "OK", arg )) {
                                if (cmdp->param.to_trash)
-                                       ctx->trashnc = 0; /* Can't get NO 
[TRYCREATE] any more. */
+                                       ctx->trashnc = TrashKnown; /* Can't get 
NO [TRYCREATE] any more. */
                                resp = RESP_OK;
                        } else {
                                if (!strcmp( "NO", arg )) {
@@ -921,10 +937,11 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                                struct imap_cmd_trycreate *cmd2 
=
                                                        (struct 
imap_cmd_trycreate *)new_imap_cmd( sizeof(*cmd2) );
                                                cmd2->orig_cmd = cmdp;
-                                               cmd2->gen.param.done = 
get_cmd_result_p2;
+                                               cmd2->gen.param.high_prio = 1;
                                                p = strchr( cmdp->cmd, '"' );
-                                               if (!submit_imap_cmd( ctx, 
&cmd2->gen, "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p ))
-                                                       return RESP_CANCEL;
+                                               if (imap_exec( ctx, &cmd2->gen, 
get_cmd_result_p2,
+                                                              "CREATE %.*s", 
strchr( p + 1, '"' ) - p + 1, p ) < 0)
+                                                       return;
                                                continue;
                                        }
                                        resp = RESP_NO;
@@ -941,13 +958,17 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                imap_invoke_bad_callback( ctx );
                        done_imap_cmd( ctx, cmdp, resp );
                        if (imap_deref( ctx ))
-                               resp = RESP_CANCEL;
-                       if (resp == RESP_CANCEL || !tcmd || tcmd == cmdp)
-                               return resp;
+                               return;
+                       if (ctx->canceling && !ctx->in_progress) {
+                               ctx->canceling = 0;
+                               ctx->callbacks.imap_cancel( ctx->callback_aux );
+                               return;
+                       }
                }
+               if (flush_imap_cmds( ctx ) < 0)
+                       return;
        }
        imap_invoke_bad_callback( ctx );
-       return RESP_CANCEL;
 }
 
 static void
@@ -960,8 +981,11 @@ get_cmd_result_p2( imap_store_t *ctx, struct imap_cmd 
*cmd, int response )
                done_imap_cmd( ctx, ocmd, response );
        } else {
                ctx->uidnext = 0;
+               if (ocmd->param.to_trash)
+                       ctx->trashnc = TrashKnown;
                ocmd->param.create = 0;
-               submit_imap_cmd( ctx, ocmd, 0 );
+               ocmd->param.high_prio = 1;
+               submit_imap_cmd( ctx, ocmd );
        }
 }
 
@@ -974,6 +998,7 @@ imap_cancel_store( store_t *gctx )
 
        socket_close( &ctx->conn );
        cancel_submitted_imap_cmds( ctx );
+       cancel_pending_imap_cmds( ctx );
        free_generic_messages( ctx->gen.msgs );
        free_string_list( ctx->gen.boxes );
        free_list( ctx->ns_personal );
@@ -1082,10 +1107,15 @@ do_cram_auth( imap_store_t *ctx, struct imap_cmd *cmdp, 
const char *prompt )
 }
 #endif
 
+static void imap_open_store_connected( int, void * );
+#ifdef HAVE_LIBSSL
+static void imap_open_store_tlsstarted1( int, void * );
+#endif
 static void imap_open_store_p2( imap_store_t *, struct imap_cmd *, int );
 static void imap_open_store_authenticate( imap_store_t * );
 #ifdef HAVE_LIBSSL
 static void imap_open_store_authenticate_p2( imap_store_t *, struct imap_cmd 
*, int );
+static void imap_open_store_tlsstarted2( int, void * );
 static void imap_open_store_authenticate_p3( imap_store_t *, struct imap_cmd 
*, int );
 #endif
 static void imap_open_store_authenticate2( imap_store_t * );
@@ -1131,26 +1161,41 @@ imap_open_store( store_conf_t *conf,
        ctx->callback_aux = aux;
        set_bad_callback( &ctx->gen, (void (*)(void *))imap_open_store_bail, 
ctx );
        ctx->in_progress_append = &ctx->in_progress;
+       ctx->pending_append = &ctx->pending;
 
-       socket_init( &ctx->conn, (void (*)( void * ))imap_invoke_bad_callback, 
ctx );
+       socket_init( &ctx->conn, &srvc->sconf,
+                    (void (*)( void * ))imap_invoke_bad_callback,
+                    imap_socket_read, (int (*)(void *))flush_imap_cmds, ctx );
+       socket_connect( &ctx->conn, imap_open_store_connected );
+}
 
-       if (!socket_connect( &srvc->sconf, &ctx->conn ))
-               goto bail;
+static void
+imap_open_store_connected( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+#ifdef HAVE_LIBSSL
+       imap_store_conf_t *cfg = (imap_store_conf_t *)ctx->gen.conf;
+       imap_server_conf_t *srvc = cfg->server;
+#endif
 
+       if (!ok)
+               imap_open_store_bail( ctx );
 #ifdef HAVE_LIBSSL
-       if (srvc->sconf.use_imaps) {
-               if (socket_start_tls( &srvc->sconf, &ctx->conn )) {
-                       imap_open_store_ssl_bail( ctx );
-                       return;
-               }
-       }
+       else if (srvc->sconf.use_imaps)
+               socket_start_tls( &ctx->conn, imap_open_store_tlsstarted1 );
 #endif
-       get_cmd_result( ctx, 0 );
-       return;
+}
 
-  bail:
-       imap_open_store_bail( ctx );
+#ifdef HAVE_LIBSSL
+static void
+imap_open_store_tlsstarted1( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+
+       if (!ok)
+               imap_open_store_ssl_bail( ctx );
 }
+#endif
 
 static void
 imap_open_store_greeted( imap_store_t *ctx )
@@ -1213,7 +1258,16 @@ imap_open_store_authenticate_p2( imap_store_t *ctx, 
struct imap_cmd *cmd ATTR_UN
 {
        if (response != RESP_OK)
                imap_open_store_bail( ctx );
-       else if (socket_start_tls( &((imap_server_conf_t 
*)ctx->gen.conf)->sconf, &ctx->conn ))
+       else
+               socket_start_tls( &ctx->conn, imap_open_store_tlsstarted2 );
+}
+
+static void
+imap_open_store_tlsstarted2( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+
+       if (!ok)
                imap_open_store_ssl_bail( ctx );
        else
                imap_exec( ctx, 0, imap_open_store_authenticate_p3, 
"CAPABILITY" );
@@ -1343,7 +1397,7 @@ static void
 imap_open_store_finalize( imap_store_t *ctx )
 {
        set_bad_callback( &ctx->gen, 0, 0 );
-       ctx->trashnc = 1;
+       ctx->trashnc = TrashUnknown;
        ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux );
 }
 
@@ -1404,8 +1458,7 @@ imap_select( store_t *gctx, int create,
 
 /******************* imap_load *******************/
 
-static int imap_submit_load( imap_store_t *, const char *, struct 
imap_cmd_refcounted_state *,
-                             struct imap_cmd ** );
+static int imap_submit_load( imap_store_t *, const char *, struct 
imap_cmd_refcounted_state * );
 static void imap_load_p2( imap_store_t *, struct imap_cmd *, int );
 
 static void
@@ -1420,7 +1473,6 @@ imap_load( store_t *gctx, int minuid, int maxuid, int 
*excs, int nexcs,
                free( excs );
                cb( DRV_OK, aux );
        } else {
-               struct imap_cmd *cmd2 = 0;
                struct imap_cmd_refcounted_state *sts = 
imap_refcounted_new_state( cb, aux );
 
                ctx->msgapp = &ctx->gen.msgs;
@@ -1435,35 +1487,29 @@ imap_load( store_t *gctx, int minuid, int maxuid, int 
*excs, int nexcs,
                                if (i != j)
                                        bl += sprintf( buf + bl, ":%d", excs[i] 
);
                        }
-                       if (imap_submit_load( ctx, buf, sts, &cmd2 ) < 0)
+                       if (imap_submit_load( ctx, buf, sts ) < 0)
                                goto done;
                }
                if (maxuid == INT_MAX)
                        maxuid = ctx->uidnext >= 0 ? ctx->uidnext - 1 : 
1000000000;
                if (maxuid >= minuid) {
                        sprintf( buf, "%d:%d", minuid, maxuid );
-                       imap_submit_load( ctx, buf, sts, &cmd2 );
+                       imap_submit_load( ctx, buf, sts );
                }
          done:
                free( excs );
                if (!--sts->ref_count)
                        imap_refcounted_done( sts );
-               else
-                       get_cmd_result( ctx, cmd2 );
        }
 }
 
 static int
-imap_submit_load( imap_store_t *ctx, const char *buf, struct 
imap_cmd_refcounted_state *sts,
-                  struct imap_cmd **cmdp )
+imap_submit_load( imap_store_t *ctx, const char *buf, struct 
imap_cmd_refcounted_state *sts )
 {
-       struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
-       cmd->param.done = imap_load_p2;
-       *cmdp = cmd;
-       return submit_imap_cmd( ctx, cmd,
-                               "UID FETCH %s (UID%s%s)", buf,
-                               (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
-                               (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : 
"" ) ? 0 : -1;
+       return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_load_p2,
+                         "UID FETCH %s (UID%s%s)", buf,
+                         (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
+                         (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" );
 }
 
 static void
@@ -1528,12 +1574,9 @@ imap_flags_helper( imap_store_t *ctx, int uid, char 
what, int flags,
 {
        char buf[256];
 
-       struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
-       cmd->param.done = imap_set_flags_p2;
        buf[imap_make_flags( flags, buf )] = 0;
-       if (!submit_imap_cmd( ctx, cmd, "UID STORE %d %cFLAGS.SILENT %s", uid, 
what, buf ))
-               return -1;
-       return process_imap_replies( ctx ) == RESP_CANCEL ? -1 : 0;
+       return imap_exec( ctx, imap_refcounted_new_cmd( sts ), 
imap_set_flags_p2,
+                         "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
 }
 
 static void
@@ -1705,8 +1748,16 @@ static void
 imap_cancel( store_t *gctx,
              void (*cb)( void *aux ), void *aux )
 {
-       (void)gctx;
-       cb( aux );
+       imap_store_t *ctx = (imap_store_t *)gctx;
+
+       cancel_pending_imap_cmds( ctx );
+       if (ctx->in_progress) {
+               ctx->canceling = 1;
+               ctx->callbacks.imap_cancel = cb;
+               ctx->callback_aux = aux;
+       } else {
+               cb( aux );
+       }
 }
 
 /******************* imap_commit *******************/
@@ -1753,7 +1804,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, 
int *err )
        server->require_ssl = 1;
        server->sconf.use_tlsv1 = 1;
 #endif
-       server->max_in_progress = 50;
+       server->max_in_progress = INT_MAX;
 
        while (getcline( cfg ) && cfg->cmd) {
                if (!strcasecmp( "Host", cfg->cmd )) {
diff --git a/src/isync.h b/src/isync.h
index 1c8c038..0e1e9c3 100644
--- a/src/isync.h
+++ b/src/isync.h
@@ -73,15 +73,36 @@ typedef struct server_conf {
 #endif
 } server_conf_t;
 
+typedef struct buff_chunk {
+       struct buff_chunk *next;
+       char *data;
+       int len;
+       char buf[1];
+} buff_chunk_t;
+
 typedef struct {
+       /* connection */
        int fd;
+       int state;
+       const server_conf_t *conf; /* needed during connect */
 #ifdef HAVE_LIBSSL
        SSL *ssl;
 #endif
 
        void (*bad_callback)( void *aux ); /* async fail while sending or 
listening */
+       void (*read_callback)( void *aux ); /* data available for reading */
+       int (*write_callback)( void *aux ); /* all *queued* data was sent */
+       union {
+               void (*connect)( int ok, void *aux );
+               void (*starttls)( int ok, void *aux );
+       } callbacks;
        void *callback_aux;
 
+       /* writing */
+       buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+       int write_offset; /* offset into buffer head */
+
+       /* reading */
        int offset; /* start of filled bytes in buffer */
        int bytes; /* number of filled bytes in buffer */
        int scanoff; /* offset to continue scanning for newline at, relative to 
'offset' */
@@ -335,22 +356,27 @@ extern const char *Home;
 
 /* call this before doing anything with the socket */
 static INLINE void socket_init( conn_t *conn,
+                                const server_conf_t *conf,
                                 void (*bad_callback)( void *aux ),
+                                void (*read_callback)( void *aux ),
+                                int (*write_callback)( void *aux ),
                                 void *aux )
 {
+       conn->conf = conf;
        conn->bad_callback = bad_callback;
+       conn->read_callback = read_callback;
+       conn->write_callback = write_callback;
        conn->callback_aux = aux;
        conn->fd = -1;
+       conn->write_buf_append = &conn->write_buf;
 }
-int socket_connect( const server_conf_t *conf, conn_t *sock );
-int socket_start_tls( const server_conf_t *conf, conn_t *sock );
+void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) );
+void socket_start_tls(conn_t *conn, void (*cb)( int ok, void *aux ) );
 void socket_close( conn_t *sock );
-int socket_fill( conn_t *sock );
 int socket_read( conn_t *sock, char *buf, int len ); /* never waits */
 char *socket_read_line( conn_t *sock ); /* don't free return value; never 
waits */
 typedef enum { KeepOwn = 0, GiveOwn } ownership_t;
 int socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn );
-int socket_pending( conn_t *sock );
 
 void cram( const char *challenge, const char *user, const char *pass,
            char **_final, int *_finallen );
diff --git a/src/mbsync.1 b/src/mbsync.1
index 4f47668..e8cbabb 100644
--- a/src/mbsync.1
+++ b/src/mbsync.1
@@ -281,10 +281,8 @@ Use TLSv1 for communication with the IMAP server over SSL?
 \fBPipelineDepth\fR \fIdepth\fR
 Maximum number of IMAP commands which can be simultaneously in flight.
 Setting this to \fI1\fR disables pipelining.
-Setting it to a too big value may deadlock isync.
-Currently, this affects only a few commands.
 This is mostly a debugging only option.
-(Default: \fI50\fR)
+(Default: \fIunlimited\fR)
 ..
 .SS IMAP Stores
 The reference point for relative \fBPath\fRs is whatever the server likes it
diff --git a/src/socket.c b/src/socket.c
index c8d1a57..bf8a837 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -36,56 +36,67 @@
 #include <assert.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <stddef.h>
 #include <errno.h>
 #include <string.h>
+#include <fcntl.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
-#ifdef HAVE_SYS_FILIO_H
-# include <sys/filio.h>
-#endif
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
 
+enum {
+       SCK_CONNECTING,
+#ifdef HAVE_LIBSSL
+       SCK_STARTTLS,
+#endif
+       SCK_READY
+};
+
 static void
 socket_fail( conn_t *conn )
 {
        conn->bad_callback( conn->callback_aux );
 }
 
-static void
-socket_perror( const char *func, conn_t *sock, int ret )
-{
 #ifdef HAVE_LIBSSL
+static int
+ssl_return( const char *func, conn_t *conn, int ret )
+{
        int err;
 
-       if (sock->ssl) {
-               switch ((err = SSL_get_error( sock->ssl, ret ))) {
-               case SSL_ERROR_SYSCALL:
-               case SSL_ERROR_SSL:
-                       if ((err = ERR_get_error()) == 0) {
-                               if (ret == 0)
-                                       error( "SSL_%s: got EOF\n", func );
-                               else
-                                       error( "SSL_%s: %s\n", func, 
strerror(errno) );
-                       } else
-                               error( "SSL_%s: %s\n", func, ERR_error_string( 
err, 0 ) );
-                       break;
-               default:
-                       error( "SSL_%s: unhandled SSL error %d\n", func, err );
-                       break;
+       switch ((err = SSL_get_error( conn->ssl, ret ))) {
+       case SSL_ERROR_NONE:
+               return ret;
+       case SSL_ERROR_WANT_WRITE:
+               conf_fd( conn->fd, POLLIN, POLLOUT );
+               /* fallthrough */
+       case SSL_ERROR_WANT_READ:
+               return 0;
+       case SSL_ERROR_SYSCALL:
+       case SSL_ERROR_SSL:
+               if (!(err = ERR_get_error())) {
+                       if (ret == 0)
+                               error( "SSL_%s: unexpected EOF\n", func );
+                       else
+                               error( "SSL_%s: %s\n", func, strerror( errno ) 
);
+               } else {
+                       error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) 
);
                }
-       } else
-#endif
-       if (ret < 0)
-               perror( func );
+               break;
+       default:
+               error( "SSL_%s: unhandled SSL error %d\n", func, err );
+               break;
+       }
+       if (conn->state == SCK_STARTTLS)
+               conn->callbacks.starttls( 0, conn->callback_aux );
        else
-               error( "%s: unexpected EOF\n", func );
-       socket_fail( sock );
+               socket_fail( conn );
+       return -1;
 }
 
-#ifdef HAVE_LIBSSL
 /* Some of this code is inspired by / lifted from mutt. */
 
 static int
@@ -245,45 +256,85 @@ init_ssl_ctx( const server_conf_t *conf )
        return 0;
 }
 
-int
-socket_start_tls( const server_conf_t *conf, conn_t *sock )
+static void start_tls_p2( conn_t * );
+static void start_tls_p3( conn_t *, int );
+
+void
+socket_start_tls( conn_t *conn, void (*cb)( int ok, void *aux ) )
 {
-       int ret;
        static int ssl_inited;
 
+       conn->callbacks.starttls = cb;
+
        if (!ssl_inited) {
                SSL_library_init();
                SSL_load_error_strings();
                ssl_inited = 1;
        }
 
-       if (!conf->SSLContext && init_ssl_ctx( conf ))
-               return 1;
-
-       sock->ssl = SSL_new( ((server_conf_t *)conf)->SSLContext );
-       SSL_set_fd( sock->ssl, sock->fd );
-       if ((ret = SSL_connect( sock->ssl )) <= 0) {
-               socket_perror( "connect", sock, ret );
-               return 1;
+       if (!conn->conf->SSLContext && init_ssl_ctx( conn->conf )) {
+               start_tls_p3( conn, 0 );
+               return;
        }
 
-       /* verify the server certificate */
-       if (verify_cert( conf, sock ))
-               return 1;
+       conn->ssl = SSL_new( ((server_conf_t *)conn->conf)->SSLContext );
+       SSL_set_fd( conn->ssl, conn->fd );
+       SSL_set_mode( conn->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER );
+       start_tls_p2( conn );
+}
 
-       info( "Connection is now encrypted\n" );
-       return 0;
+static void
+start_tls_p2( conn_t *conn )
+{
+       switch (ssl_return( "connect", conn, SSL_connect( conn->ssl ) )) {
+       case -1:
+               start_tls_p3( conn, 0 );
+               break;
+       case 0:
+               break;
+       default:
+               /* verify the server certificate */
+               if (verify_cert( conn->conf, conn )) {
+                       start_tls_p3( conn, 0 );
+               } else {
+                       info( "Connection is now encrypted\n" );
+                       start_tls_p3( conn, 1 );
+               }
+               break;
+       }
+}
+
+static void start_tls_p3( conn_t *conn, int ok )
+{
+       conn->state = SCK_READY;
+       conn->callbacks.starttls( ok, conn->callback_aux );
 }
 
 #endif /* HAVE_LIBSSL */
 
-int
-socket_connect( const server_conf_t *conf, conn_t *sock )
+static void socket_fd_cb( int, void * );
+
+static void socket_connected2( conn_t * );
+static void socket_connect_bail( conn_t * );
+
+static void
+socket_close_internal( conn_t *sock )
+{
+       del_fd( sock->fd );
+       close( sock->fd );
+       sock->fd = -1;
+}
+
+void
+socket_connect( conn_t *sock, void (*cb)( int ok, void *aux ) )
 {
+       const server_conf_t *conf = sock->conf;
        struct hostent *he;
        struct sockaddr_in addr;
        int s, a[2];
 
+       sock->callbacks.connect = cb;
+
        /* open connection to IMAP server */
        if (conf->tunnel) {
                infon( "Starting tunnel '%s'... ", conf->tunnel );
@@ -304,6 +355,10 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
 
                close( a[0] );
                sock->fd = a[1];
+
+               fcntl( a[1], F_SETFL, O_NONBLOCK );
+               add_fd( a[1], socket_fd_cb, sock );
+
        } else {
                memset( &addr, 0, sizeof(addr) );
                addr.sin_port = conf->port ? htons( conf->port ) :
@@ -317,7 +372,7 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
                he = gethostbyname( conf->host );
                if (!he) {
                        error( "IMAP error: Cannot resolve server '%s'\n", 
conf->host );
-                       return -1;
+                       goto bail;
                }
                info( "ok\n" );
 
@@ -328,36 +383,87 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
                        perror( "socket" );
                        exit( 1 );
                }
+               sock->fd = s;
+               fcntl( s, F_SETFL, O_NONBLOCK );
+               add_fd( s, socket_fd_cb, sock );
 
-               infon( "Connecting to %s:%hu... ", inet_ntoa( addr.sin_addr ), 
ntohs( addr.sin_port ) );
+               infon( "Connecting to %s (%s:%hu) ... ",
+                      conf->host, inet_ntoa( addr.sin_addr ), ntohs( 
addr.sin_port ) );
                if (connect( s, (struct sockaddr *)&addr, sizeof(addr) )) {
-                       close( s );
-                       perror( "connect" );
-                       return -1;
+                       if (errno != EINPROGRESS) {
+                               perror( "connect" );
+                               socket_close_internal( sock );
+                               goto bail;
+                       }
+                       conf_fd( s, 0, POLLOUT );
+                       sock->state = SCK_CONNECTING;
+                       info( "\n" );
+                       return;
                }
 
-               sock->fd = s;
        }
        info( "ok\n" );
-       return 0;
+       socket_connected2( sock );
+       return;
+
+  bail:
+       socket_connect_bail( sock );
+}
+
+static void
+socket_connected( conn_t *conn )
+{
+       int soerr;
+       socklen_t selen = sizeof(soerr);
+
+       infon( "Connecting to %s: ", conn->conf->host );
+       if (getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, &soerr, &selen )) {
+               perror( "getsockopt" );
+               exit( 1 );
+       }
+       if (soerr) {
+               errno = soerr;
+               perror( "connect" );
+               socket_close_internal( conn );
+               socket_connect_bail( conn );
+               return;
+       }
+       info( "ok\n" );
+       socket_connected2( conn );
+}
+
+static void
+socket_connected2( conn_t *conn )
+{
+       conf_fd( conn->fd, 0, POLLIN );
+       conn->state = SCK_READY;
+       conn->callbacks.connect( 1, conn->callback_aux );
 }
 
+static void
+socket_connect_bail( conn_t *conn )
+{
+       conn->callbacks.connect( 0, conn->callback_aux );
+}
+
+static void dispose_chunk( conn_t *conn );
+
 void
 socket_close( conn_t *sock )
 {
-       if (sock->fd >= 0) {
-               close( sock->fd );
-               sock->fd = -1;
-       }
+       if (sock->fd >= 0)
+               socket_close_internal( sock );
 #ifdef HAVE_LIBSSL
        if (sock->ssl) {
                SSL_free( sock->ssl );
                sock->ssl = 0;
        }
 #endif
+       while (sock->write_buf)
+               dispose_chunk( sock );
 }
 
-int
+static void
 socket_fill( conn_t *sock )
 {
        char *buf;
@@ -366,22 +472,31 @@ socket_fill( conn_t *sock )
        if (!len) {
                error( "Socket error: receive buffer full. Probably protocol 
error.\n" );
                socket_fail( sock );
-               return -1;
+               return;
        }
        assert( sock->fd >= 0 );
        buf = sock->buf + n;
-       n =
 #ifdef HAVE_LIBSSL
-               sock->ssl ? SSL_read( sock->ssl, buf, len ) :
+       if (sock->ssl) {
+               if ((n = ssl_return( "read", sock, SSL_read( sock->ssl, buf, 
len ) )) <= 0)
+                       return;
+               if (n == len && SSL_pending( sock->ssl ))
+                       fake_fd( sock->fd, POLLIN );
+       } else
 #endif
-               read( sock->fd, buf, len );
-       if (n <= 0) {
-               socket_perror( "read", sock, n );
-               return -1;
-       } else {
-               sock->bytes += n;
-               return 0;
+       {
+               if ((n = read( sock->fd, buf, len )) < 0) {
+                       perror( "read" );
+                       socket_fail( sock );
+                       return;
+               } else if (!n) {
+                       error( "read: unexpected EOF\n" );
+                       socket_fail( sock );
+                       return;
+               }
        }
+       sock->bytes += n;
+       sock->read_callback( sock->callback_aux );
 }
 
 int
@@ -426,40 +541,141 @@ socket_read_line( conn_t *b )
        return s;
 }
 
-int
-socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn )
+static int
+do_write( conn_t *sock, char *buf, int len )
 {
        int n;
 
        assert( sock->fd >= 0 );
-       n =
 #ifdef HAVE_LIBSSL
-               sock->ssl ? SSL_write( sock->ssl, buf, len ) :
+       if (sock->ssl)
+               return ssl_return( "write", sock, SSL_write( sock->ssl, buf, 
len ) );
 #endif
-               write( sock->fd, buf, len );
-       if (takeOwn == GiveOwn)
-               free( buf );
-       if (n != len) {
-               socket_perror( "write", sock, n );
-               return -1;
+       n = write( sock->fd, buf, len );
+       if (n < 0) {
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       perror( "write" );
+                       socket_fail( sock );
+               } else {
+                       n = 0;
+                       conf_fd( sock->fd, POLLIN, POLLOUT );
+               }
+       } else if (n != len) {
+               conf_fd( sock->fd, POLLIN, POLLOUT );
        }
-       return 0;
+       return n;
+}
+
+static void
+dispose_chunk( conn_t *conn )
+{
+       buff_chunk_t *bc = conn->write_buf;
+       if (!(conn->write_buf = bc->next))
+               conn->write_buf_append = &conn->write_buf;
+       if (bc->data != bc->buf)
+               free( bc->data );
+       free( bc );
+}
+
+static int
+do_queued_write( conn_t *conn )
+{
+       buff_chunk_t *bc;
+
+       if (!conn->write_buf)
+               return 0;
+
+       while ((bc = conn->write_buf)) {
+               int n, len = bc->len - conn->write_offset;
+               if ((n = do_write( conn, bc->data + conn->write_offset, len )) 
< 0)
+                       return -1;
+               if (n != len) {
+                       conn->write_offset += n;
+                       return 0;
+               }
+               conn->write_offset = 0;
+               dispose_chunk( conn );
+       }
+#ifdef HAVE_LIBSSL
+       if (conn->ssl && SSL_pending( conn->ssl ))
+               fake_fd( conn->fd, POLLIN );
+#endif
+       return conn->write_callback( conn->callback_aux );
+}
+
+static void
+do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+       buff_chunk_t *bc;
+
+       if (takeOwn == GiveOwn) {
+               bc = nfmalloc( offsetof(buff_chunk_t, buf) );
+               bc->data = buf;
+       } else {
+               bc = nfmalloc( offsetof(buff_chunk_t, buf) + len );
+               bc->data = bc->buf;
+               memcpy( bc->data, buf, len );
+       }
+       bc->len = len;
+       bc->next = 0;
+       *conn->write_buf_append = bc;
+       conn->write_buf_append = &bc->next;
 }
 
 int
-socket_pending( conn_t *sock )
+socket_write( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+       if (conn->write_buf) {
+               do_append( conn, buf, len, takeOwn );
+               return len;
+       } else {
+               int n = do_write( conn, buf, len );
+               if (n != len && n >= 0) {
+                       conn->write_offset = n;
+                       do_append( conn, buf, len, takeOwn );
+               } else if (takeOwn) {
+                       free( buf );
+               }
+               return n;
+       }
+}
+
+static void
+socket_fd_cb( int events, void *aux )
 {
-       int num = -1;
+       conn_t *conn = (conn_t *)aux;
+
+       if (events & POLLERR) {
+               error( "Unidentified socket error.\n" );
+               socket_fail( conn );
+               return;
+       }
+
+       if (conn->state == SCK_CONNECTING) {
+               socket_connected( conn );
+               return;
+       }
+
+       if (events & POLLOUT)
+               conf_fd( conn->fd, POLLIN, 0 );
 
-       if (ioctl( sock->fd, FIONREAD, &num ) < 0)
-               return -1;
-       if (num > 0)
-               return num;
 #ifdef HAVE_LIBSSL
-       if (sock->ssl)
-               return SSL_pending( sock->ssl );
+       if (conn->state == SCK_STARTTLS) {
+               start_tls_p2( conn );
+               return;
+       }
+       if (conn->ssl) {
+               if (do_queued_write( conn ) < 0)
+                       return;
+               socket_fill( conn );
+               return;
+       }
 #endif
-       return 0;
+
+       if ((events & POLLOUT) && do_queued_write( conn ) < 0)
+               return;
+       if (events & POLLIN)
+               socket_fill( conn );
 }
 
 #ifdef HAVE_LIBSSL

------------------------------------------------------------------------------
Live Security Virtual Conference
Exclusive live event will cover all the ways today's security and 
threat landscape has changed and how IT managers can respond. Discussions 
will include endpoint security, mobile security and the latest in malware 
threats. http://www.accelacomm.com/jaw/sfrnl04242012/114/50122263/
_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to