commit 7cb85c1c551ef1d044a59767ef20dc8cf5153d4b
Author: Oswald Buddenhagen <o...@kde.org>
Date:   Sun Mar 13 14:44:49 2011 +0100

    *** full async

 configure.in   |    2 +-
 src/drv_imap.c |  475 +++++++++++++++++++++++++++------------------
 src/isync.h    |   54 +++++-
 src/mbsync.1   |    4 +-
 src/socket.c   |  499 ++++++++++++++++++++++++++++++++++++------------
 5 files changed, 705 insertions(+), 329 deletions(-)

diff --git a/configure.in b/configure.in
index 0bfc364..c3268e4 100644
--- a/configure.in
+++ b/configure.in
@@ -10,7 +10,7 @@ if test "$GCC" = yes; then
     CPPFLAGS="$CPPFLAGS -D_BSD_SOURCE"
 fi
 
-AC_CHECK_HEADERS(sys/filio.h sys/poll.h sys/select.h)
+AC_CHECK_HEADERS(sys/poll.h sys/select.h)
 AC_CHECK_FUNCS(vasprintf)
 
 AC_CHECK_LIB(socket, socket, [SOCK_LIBS="-lsocket"])
diff --git a/src/drv_imap.c b/src/drv_imap.c
index d818811..e00a29d 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -67,27 +67,39 @@ typedef struct _list {
        int len;
 } list_t;
 
+#define MAX_LIST_DEPTH 5
+
+typedef struct parse_list_state {
+       list_t *head, **stack[MAX_LIST_DEPTH];
+       int level, need_bytes;
+} parse_list_state_t;
+
 struct imap_cmd;
 
 typedef struct imap_store {
        store_t gen;
        const char *prefix;
        int uidnext; /* from SELECT responses */
-       unsigned trashnc:1; /* trash folder's existence is not confirmed yet */
+       /* trash folder's existence is not confirmed yet */
+       enum { TrashUnknown, TrashChecking, TrashKnown } trashnc;
        unsigned got_namespace:1;
        list_t *ns_personal, *ns_other, *ns_shared; /* NAMESPACE info */
        message_t **msgapp; /* FETCH results */
        unsigned caps; /* CAPABILITY results */
        int ref_count; /* for ordered destruction */
        int store_canceled; /* context is invalid, only ref_count keeps it 
alive */
+       parse_list_state_t parse_list_sts;
        /* command queue */
-       int nexttag, num_in_progress, literal_pending;
+       int nexttag, num_pending, num_in_progress, literal_pending;
+       struct imap_cmd *pending, **pending_append;
        struct imap_cmd *in_progress, **in_progress_append;
 
        /* Used during sequential operations like connect */
        enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } 
greeting;
+       int canceling; /* imap_cancel() is in progress */
        union {
                void (*imap_open)( store_t *srv, void *aux );
+               void (*imap_cancel)( void *aux );
        } callbacks;
        void *callback_aux;
 
@@ -100,6 +112,8 @@ struct imap_cmd {
        int tag;
 
        struct {
+               /* Will be called on each continuation request until it resets 
this pointer.
+                * Needs to invoke bad_callback and return < 0 on error. */
                int (*cont)( imap_store_t *ctx, struct imap_cmd *cmd, const 
char *prompt );
                void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int 
response );
                char *data;
@@ -170,9 +184,6 @@ static const char *cap_list[] = {
 #define RESP_NO       1
 #define RESP_CANCEL   2
 
-static void get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd );
-
-
 static const char *Flags[] = {
        "Draft",
        "Flagged",
@@ -220,37 +231,20 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, 
int response )
        return cancel ? -1 : 0;
 }
 
-static struct imap_cmd *
-v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
-                   const char *fmt, va_list ap )
+static int
+send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
 {
        int bufl, litplus;
        const char *buffmt;
+       char *p;
        char buf[1024];
 
-       assert( ctx );
-       assert( ctx->gen.bad_callback );
-       assert( cmd );
-       assert( cmd->param.done );
-
-       ctx->ref_count++;
-
-       while (ctx->literal_pending) {
-               get_cmd_result( ctx, 0 );
-               if (ctx->store_canceled)
-                       goto bail2;
-       }
-
-       if (ctx->conn.fd < 0)
-               goto bail; /* We got disconnected and had no chance to report 
it yet. */
-
+       socket_delimit( &ctx->conn );
        cmd->tag = ++ctx->nexttag;
-       if (fmt)
-               nfvasprintf( &cmd->cmd, fmt, ap );
        if (!cmd->param.data) {
                buffmt = "%d %s\r\n";
                litplus = 0;
-       } else if ((cmd->param.to_trash && ctx->trashnc) || !CAP(LITERALPLUS)) {
+       } else if ((cmd->param.to_trash && ctx->trashnc == TrashUnknown) || 
!CAP(LITERALPLUS)) {
                buffmt = "%d %s{%d}\r\n";
                litplus = 0;
        } else {
@@ -270,7 +264,7 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
        if (socket_write( &ctx->conn, buf, bufl, KeepOwn ) < 0)
                goto bail;
        if (litplus) {
-               char *p = cmd->param.data;
+               p = cmd->param.data;
                cmd->param.data = 0;
                if (socket_write( &ctx->conn, p, cmd->param.data_len, GiveOwn ) 
< 0 ||
                    socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0)
@@ -278,32 +272,89 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd 
*cmd,
        } else if (cmd->param.cont || cmd->param.data) {
                ctx->literal_pending = 1;
        }
+       if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
+               ctx->trashnc = TrashChecking;
        cmd->next = 0;
        *ctx->in_progress_append = cmd;
        ctx->in_progress_append = &cmd->next;
        ctx->num_in_progress++;
-       return cmd;
+       return 0;
 
   bail:
-       ctx->gen.bad_callback( ctx->gen.bad_callback_aux );
-  bail2:
        done_imap_cmd( ctx, cmd, RESP_CANCEL );
-       return NULL;
+       return -1;
 }
 
-static struct imap_cmd *
-submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, const char *fmt, ... 
)
+static int
+cmd_submittable( imap_store_t *ctx, struct imap_cmd *cmd )
 {
-       struct imap_cmd *ret;
-       va_list ap;
+       return !ctx->literal_pending &&
+              !(cmd->param.to_trash && ctx->trashnc == TrashChecking) &&
+              ctx->num_in_progress < ((imap_store_conf_t 
*)ctx->gen.conf)->server->max_in_progress;
+}
 
-       va_start( ap, fmt );
-       ret = v_submit_imap_cmd( ctx, cmd, fmt, ap );
-       va_end( ap );
-       return ret;
+static int
+flush_imap_cmds( imap_store_t *ctx )
+{
+       struct imap_cmd *cmd;
+
+       while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
+               if (!(ctx->pending = cmd->next))
+                       ctx->pending_append = &ctx->pending;
+               ctx->num_pending--;
+               if (send_imap_cmd( ctx, cmd ) < 0)
+                       return -1;
+       }
+       return 0;
 }
 
 static void
+cancel_pending_imap_cmds( imap_store_t *ctx )
+{
+       struct imap_cmd *cmd;
+
+       while ((cmd = ctx->pending)) {
+               if (!(ctx->pending = cmd->next))
+                       ctx->pending_append = &ctx->pending;
+               ctx->num_pending--;
+               done_imap_cmd( ctx, cmd, RESP_CANCEL );
+       }
+}
+
+static void
+cancel_submitted_imap_cmds( imap_store_t *ctx )
+{
+       struct imap_cmd *cmd;
+
+       while ((cmd = ctx->in_progress)) {
+               ctx->in_progress = cmd->next; /* don't update 
in_progress_append - store is dead */
+               ctx->num_in_progress--;
+               done_imap_cmd( ctx, cmd, RESP_CANCEL );
+       }
+}
+
+static int
+submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
+{
+       assert( ctx );
+       assert( ctx->gen.bad_callback );
+       assert( cmd );
+       assert( cmd->param.done );
+
+       ctx->ref_count++;
+
+       if (ctx->pending || !cmd_submittable( ctx, cmd )) {
+               cmd->next = 0;
+               *ctx->pending_append = cmd;
+               ctx->pending_append = &cmd->next;
+               ctx->num_pending++;
+               return 0;
+       }
+
+       return send_imap_cmd( ctx, cmd );
+}
+
+static int
 imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
            void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response 
),
            const char *fmt, ... )
@@ -314,12 +365,9 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
                cmdp = new_imap_cmd( sizeof(*cmdp) );
        cmdp->param.done = done;
        va_start( ap, fmt );
-       cmdp = v_submit_imap_cmd( ctx, cmdp, fmt, ap );
+       nfvasprintf( &cmdp->cmd, fmt, ap );
        va_end( ap );
-       if (!cmdp)
-               return;
-
-       get_cmd_result( ctx, cmdp );
+       return submit_imap_cmd( ctx, cmdp );
 }
 
 static void
@@ -369,27 +417,6 @@ imap_refcounted_done( struct imap_cmd_refcounted_state 
*sts )
        free( sts );
 }
 
-/*
-static void
-drain_imap_replies( imap_store_t *ctx )
-{
-       while (ctx->num_in_progress)
-               get_cmd_result( ctx, 0 );
-}
-*/
-
-/* call with a ref on ctx! */
-static void
-process_imap_replies( imap_store_t *ctx )
-{
-       while (ctx->num_in_progress > ((imap_store_conf_t 
*)ctx->gen.conf)->server->max_in_progress ||
-              socket_pending( &ctx->conn )) {
-               get_cmd_result( ctx, 0 );
-               if (ctx->store_canceled)
-                       break;
-       }
-}
-
 static int
 is_atom( list_t *list )
 {
@@ -417,66 +444,76 @@ free_list( list_t *list )
        }
 }
 
+enum {
+       LIST_OK,
+       LIST_PARTIAL,
+       LIST_BAD
+};
+
 static int
-parse_imap_list_l( imap_store_t *ctx, char **sp, list_t **curp, int level )
+parse_imap_list( imap_store_t *ctx, char **sp, parse_list_state_t *sts )
 {
-       list_t *cur;
+       list_t *cur, **curp;
        char *s = *sp, *p;
-       int n, bytes;
+       int bytes;
+
+       assert( sts );
+       assert( sts->level > 0 );
+       curp = sts->stack[--sts->level];
+       bytes = sts->need_bytes;
+       if (bytes >= 0) {
+               sts->need_bytes = -1;
+               if (!bytes)
+                       goto getline;
+               cur = (list_t *)((char *)curp - offsetof(list_t, next));
+               s = cur->val + cur->len - bytes;
+               goto getbytes;
+       }
 
        for (;;) {
                while (isspace( (unsigned char)*s ))
                        s++;
-               if (level && *s == ')') {
+               if (sts->level && *s == ')') {
                        s++;
-                       break;
+                       curp = sts->stack[--sts->level];
+                       goto next;
                }
                *curp = cur = nfmalloc( sizeof(*cur) );
-               curp = &cur->next;
                cur->val = 0; /* for clean bail */
+               curp = &cur->next;
+               *curp = 0; /* ditto */
                if (*s == '(') {
                        /* sublist */
+                       if (sts->level == MAX_LIST_DEPTH)
+                               goto bail;
                        s++;
                        cur->val = LIST;
-                       if (parse_imap_list_l( ctx, &s, &cur->child, level + 1 
))
-                               goto bail;
+                       sts->stack[sts->level++] = curp;
+                       curp = &cur->child;
+                       *curp = 0; /* for clean bail */
+                       goto next2;
                } else if (ctx && *s == '{') {
                        /* literal */
                        bytes = cur->len = strtol( s + 1, &s, 10 );
-                       if (*s != '}')
+                       if (*s != '}' || *++s)
                                goto bail;
 
                        s = cur->val = nfmalloc( cur->len );
 
-                       /* dump whats left over in the input buffer */
-                       n = ctx->conn.bytes - ctx->conn.offset;
+                 getbytes:
+                       bytes -= socket_read( &ctx->conn, s, bytes );
+                       if (bytes > 0)
+                               goto postpone;
 
-                       if (n > bytes)
-                               /* the entire message fit in the buffer */
-                               n = bytes;
-
-                       memcpy( s, ctx->conn.buf + ctx->conn.offset, n );
-                       s += n;
-                       bytes -= n;
-
-                       /* mark that we used part of the buffer */
-                       ctx->conn.offset += n;
-
-                       /* now read the rest of the message */
-                       while (bytes > 0) {
-                               if ((n = socket_read( &ctx->conn, s, bytes )) 
<= 0)
-                                       goto bail;
-                               s += n;
-                               bytes -= n;
-                       }
                        if (DFlags & XVERBOSE) {
                                puts( "=========" );
                                fwrite( cur->val, cur->len, 1, stdout );
                                puts( "=========" );
                        }
 
-                       if (buffer_gets( &ctx->conn, &s ))
-                               goto bail;
+                 getline:
+                       if (!(s = socket_read_line( &ctx->conn )))
+                               goto postpone;
                } else if (*s == '"') {
                        /* quoted string */
                        s++;
@@ -493,7 +530,7 @@ parse_imap_list_l( imap_store_t *ctx, char **sp, list_t 
**curp, int level )
                        /* atom */
                        p = s;
                        for (; *s && !isspace( (unsigned char)*s ); s++)
-                               if (level && *s == ')')
+                               if (sts->level && *s == ')')
                                        break;
                        cur->len = s - p;
                        if (cur->len == 3 && !memcmp ("NIL", p, 3))
@@ -505,56 +542,75 @@ parse_imap_list_l( imap_store_t *ctx, char **sp, list_t 
**curp, int level )
                        }
                }
 
-               if (!level)
+         next:
+               if (!sts->level)
                        break;
+         next2:
                if (!*s)
                        goto bail;
        }
        *sp = s;
-       *curp = 0;
-       return 0;
+       return LIST_OK;
 
+  postpone:
+       if (sts->level < MAX_LIST_DEPTH) {
+               sts->stack[sts->level++] = curp;
+               sts->need_bytes = bytes;
+               return LIST_PARTIAL;
+       }
   bail:
-       *curp = 0;
-       return -1;
+       free_list( sts->head );
+       return LIST_BAD;
 }
 
-static list_t *
-parse_imap_list( imap_store_t *ctx, char **sp )
+static void
+parse_list_init( parse_list_state_t *sts )
 {
-       list_t *head;
-
-       if (!parse_imap_list_l( ctx, sp, &head, 0 ))
-               return head;
-       free_list( head );
-       return NULL;
+       sts->need_bytes = -1;
+       sts->level = 1;
+       sts->head = 0;
+       sts->stack[0] = &sts->head;
 }
 
 static list_t *
 parse_list( char **sp )
 {
-       return parse_imap_list( 0, sp );
+       parse_list_state_t sts;
+       parse_list_init( &sts );
+       if (parse_imap_list( 0, sp, &sts ) == LIST_OK)
+               return sts.head;
+       return NULL;
 }
 
 static int
 parse_fetch( imap_store_t *ctx, char *cmd ) /* move this down */
 {
        list_t *tmp, *list, *flags;
-       char *body = 0;
+       char *body;
        imap_message_t *cur;
        msg_data_t *msgdata;
        struct imap_cmd *cmdp;
-       int uid = 0, mask = 0, status = 0, size = 0;
+       int uid, mask, status, size;
        unsigned i;
 
-       list = parse_imap_list( ctx, &cmd );
+       switch (parse_imap_list( ctx, &cmd, &ctx->parse_list_sts )) {
+       case LIST_OK:
+               ctx->parse_list_sts.level = 0;
+               list = ctx->parse_list_sts.head;
+               break;
+       case LIST_PARTIAL:
+               return 0;
+       default:
+               return -1;
+       }
 
        if (!is_list( list )) {
                error( "IMAP error: bogus FETCH response\n" );
-               free_list( list );
-               return -1;
+               goto bail;
        }
 
+       body = 0;
+       uid = mask = status = size = 0;
        for (tmp = list->child; tmp; tmp = tmp->next) {
                if (is_atom( tmp )) {
                        if (!strcmp( "UID", tmp->val )) {
@@ -612,8 +668,7 @@ parse_fetch( imap_store_t *ctx, char *cmd ) /* move this 
down */
                        if (cmdp->param.uid == uid)
                                goto gotuid;
                error( "IMAP error: unexpected FETCH response (UID %d)\n", uid 
);
-               free_list( list );
-               return -1;
+               goto bail;
          gotuid:
                msgdata = ((struct imap_cmd_fetch_msg *)cmdp)->msg_data;
                msgdata->data = body;
@@ -632,6 +687,7 @@ parse_fetch( imap_store_t *ctx, char *cmd ) /* move this 
down */
                cur->gen.size = size;
        }
 
+  bail:
        free_list( list );
        return 0;
 }
@@ -760,14 +816,22 @@ static void imap_open_store_greeted( imap_store_t * );
 static void get_cmd_result_p2( imap_store_t *, struct imap_cmd *, int );
 
 static void
-get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
+imap_socket_read( void *aux )
 {
+       imap_store_t *ctx = (imap_store_t *)aux;
        struct imap_cmd *cmdp, **pcmdp;
        char *cmd, *arg, *arg1, *p;
        int cancel, resp, resp2, tag, greeted;
 
        greeted = ctx->greeting;
-       while (!buffer_gets( &ctx->conn, &cmd )) {
+       if (ctx->parse_list_sts.level) {
+               cmd = 0;
+               goto do_fetch;
+       }
+
+       for (;;) {
+               if (!(cmd = socket_read_line( &ctx->conn )))
+                       return;
                arg = next_arg( &cmd );
                if (*arg == '*') {
                        arg = next_arg( &cmd );
@@ -801,6 +865,8 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                else if (!strcmp( "RECENT", arg1 ))
                                        ctx->gen.recent = atoi( arg );
                                else if(!strcmp ( "FETCH", arg1 )) {
+                                       parse_list_init( &ctx->parse_list_sts );
+                                 do_fetch:
                                        if (parse_fetch( ctx, cmd ))
                                                break; /* stream is likely to 
be useless now */
                                }
@@ -825,24 +891,22 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                        cmdp = ctx->in_progress;
                        if (cmdp->param.data) {
                                if (cmdp->param.to_trash)
-                                       ctx->trashnc = 0; /* Can't get NO 
[TRYCREATE] any more. */
+                                       ctx->trashnc = TrashKnown; /* Can't get 
NO [TRYCREATE] any more. */
                                p = cmdp->param.data;
                                cmdp->param.data = 0;
                                if (socket_write( &ctx->conn, p, 
cmdp->param.data_len, GiveOwn ) < 0)
-                                       break;
+                                       return;
                        } else if (cmdp->param.cont) {
-                               if (cmdp->param.cont( ctx, cmdp, cmd ))
-                                       break;
+                               if (cmdp->param.cont( ctx, cmdp, cmd ) < 0)
+                                       return;
                        } else {
                                error( "IMAP error: unexpected command 
continuation request\n" );
                                break;
                        }
                        if (socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0)
-                               break;
+                               return;
                        if (!cmdp->param.cont)
                                ctx->literal_pending = 0;
-                       if (!tcmd)
-                               return;
                } else {
                        tag = atoi( arg );
                        for (pcmdp = &ctx->in_progress; (cmdp = *pcmdp); pcmdp 
= &cmdp->next)
@@ -859,7 +923,7 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                        arg = next_arg( &cmd );
                        if (!strcmp( "OK", arg )) {
                                if (cmdp->param.to_trash)
-                                       ctx->trashnc = 0; /* Can't get NO 
[TRYCREATE] any more. */
+                                       ctx->trashnc = TrashKnown; /* Can't get 
NO [TRYCREATE] any more. */
                                resp = RESP_OK;
                        } else {
                                if (!strcmp( "NO", arg )) {
@@ -870,14 +934,9 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                                struct imap_cmd_trycreate *cmd2 
=
                                                        (struct 
imap_cmd_trycreate *)new_imap_cmd( sizeof(*cmd2) );
                                                cmd2->orig_cmd = cmdp;
-                                               cmd2->gen.param.done = 
get_cmd_result_p2;
-                                               ctx->ref_count++;
                                                p = strchr( cmdp->cmd, '"' );
-                                               submit_imap_cmd( ctx, 
&cmd2->gen, "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p );
-                                               if (ctx->store_canceled)
-                                                       tcmd = 0;
-                                               deref_store( ctx );
-                                               if (!tcmd)
+                                               if (imap_exec( ctx, &cmd2->gen, 
get_cmd_result_p2,
+                                                              "CREATE %.*s", 
strchr( p + 1, '"' ) - p + 1, p ) < 0)
                                                        return;
                                                continue;
                                        }
@@ -894,8 +953,17 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
                                ctx->gen.bad_callback( 
ctx->gen.bad_callback_aux );
                        if (done_imap_cmd( ctx, cmdp, resp ) < 0)
                                return;
-                       if (!tcmd || tcmd == cmdp)
+                       if (flush_imap_cmds( ctx ) < 0)
                                return;
+                       if (ctx->canceling && !ctx->in_progress) {
+                               ctx->canceling = 0;
+                               ctx->ref_count++;
+                               ctx->callbacks.imap_cancel( ctx->callback_aux );
+                               cancel = ctx->store_canceled;
+                               deref_store( ctx );
+                               if (cancel)
+                                       return;
+                       }
                }
        }
        ctx->gen.bad_callback( ctx->gen.bad_callback_aux );
@@ -912,10 +980,19 @@ get_cmd_result_p2( imap_store_t *ctx, struct imap_cmd 
*cmd, int response )
        } else {
                ctx->uidnext = 0;
                ocmd->param.create = 0;
-               submit_imap_cmd( ctx, ocmd, 0 );
+               submit_imap_cmd( ctx, ocmd );
        }
 }
 
+static void
+imap_socket_fail( void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+
+       socket_close( &ctx->conn );
+       ctx->gen.bad_callback( ctx->gen.bad_callback_aux );
+}
+
 /******************* imap_cancel_store *******************/
 
 static void
@@ -924,6 +1001,8 @@ imap_cancel_store( store_t *gctx )
        imap_store_t *ctx = (imap_store_t *)gctx;
 
        socket_close( &ctx->conn );
+       cancel_submitted_imap_cmds( ctx );
+       cancel_pending_imap_cmds( ctx );
        free_generic_messages( gctx->msgs );
        free_string_list( ctx->gen.boxes );
        free_list( ctx->ns_personal );
@@ -1000,11 +1079,14 @@ do_cram_auth( imap_store_t *ctx, struct imap_cmd *cmdp, 
const char *prompt )
                printf( ">+> %s\n", resp );
        return socket_write( &ctx->conn, resp, l, GiveOwn );
 }
-#endif
+#endif /* HAVE_LIBSSL */
 
+static void imap_open_store_connected( int, void * );
+static void imap_open_store_tlsstarted1( int, void * );
 static void imap_open_store_p2( imap_store_t *, struct imap_cmd *, int );
 static void imap_open_store_authenticate( imap_store_t * );
 static void imap_open_store_authenticate_p2( imap_store_t *, struct imap_cmd 
*, int );
+static void imap_open_store_tlsstarted2( int, void * );
 static void imap_open_store_authenticate_p3( imap_store_t *, struct imap_cmd 
*, int );
 static void imap_open_store_authenticate2( imap_store_t * );
 static void imap_open_store_authenticate2_p2( imap_store_t *, struct imap_cmd 
*, int );
@@ -1043,28 +1125,46 @@ imap_open_store( store_conf_t *conf,
        ctx = nfcalloc( sizeof(*ctx) );
        ctx->ref_count = 1;
        ctx->gen.conf = conf;
-       ctx->conn.fd = -1;
        ctx->callbacks.imap_open = cb;
        ctx->callback_aux = aux;
        set_bad_callback( &ctx->gen, (void (*)(void *))imap_open_store_bail, 
ctx );
        ctx->in_progress_append = &ctx->in_progress;
+       ctx->pending_append = &ctx->pending;
 
-       if (!socket_connect( &srvc->sconf, &ctx->conn ))
-               goto bail;
+       socket_init( &ctx->conn, &srvc->sconf, imap_socket_fail, 
imap_socket_read, ctx );
+       socket_connect( &ctx->conn, imap_open_store_connected );
+}
 
+static void
+imap_open_store_connected( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
 #if HAVE_LIBSSL
-       if (srvc->sconf.use_imaps) {
-               if (socket_start_tls( &srvc->sconf, &ctx->conn )) {
-                       imap_open_store_ssl_bail( ctx );
-                       return;
-               }
+       imap_store_conf_t *cfg = (imap_store_conf_t *)ctx->gen.conf;
+       imap_server_conf_t *srvc = cfg->server;
+#endif
+
+       if (!ok) {
+               imap_open_store_bail( ctx );
+               return;
        }
+
+#if HAVE_LIBSSL
+       if (srvc->sconf.use_imaps)
+               socket_start_tls( &ctx->conn, imap_open_store_tlsstarted1 );
 #endif
-       return;
+}
 
-  bail:
-       imap_open_store_bail( ctx );
+#if HAVE_LIBSSL
+static void
+imap_open_store_tlsstarted1( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+
+       if (!ok)
+               imap_open_store_ssl_bail( ctx );
 }
+#endif
 
 static void
 imap_open_store_greeted( imap_store_t *ctx )
@@ -1127,7 +1227,16 @@ imap_open_store_authenticate_p2( imap_store_t *ctx, 
struct imap_cmd *cmd ATTR_UN
 {
        if (response != RESP_OK)
                imap_open_store_bail( ctx );
-       else if (socket_start_tls( &((imap_server_conf_t 
*)ctx->gen.conf)->sconf, &ctx->conn ))
+       else
+               socket_start_tls( &ctx->conn, imap_open_store_tlsstarted2 );
+}
+
+static void
+imap_open_store_tlsstarted2( int ok, void *aux )
+{
+       imap_store_t *ctx = (imap_store_t *)aux;
+
+       if (!ok)
                imap_open_store_ssl_bail( ctx );
        else
                imap_exec( ctx, 0, imap_open_store_authenticate_p3, 
"CAPABILITY" );
@@ -1257,7 +1366,7 @@ static void
 imap_open_store_finalize( imap_store_t *ctx )
 {
        set_bad_callback( &ctx->gen, 0, 0 );
-       ctx->trashnc = 1;
+       ctx->trashnc = TrashUnknown;
        ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux );
 }
 
@@ -1305,8 +1414,7 @@ struct imap_cmd_select {
 };
 
 static void imap_select_p2( imap_store_t *, struct imap_cmd *, int );
-static void imap_submit_select2( imap_store_t *, const char *, struct 
imap_cmd_refcounted_state *,
-                                 struct imap_cmd_refcounted ** );
+static int imap_submit_select2( imap_store_t *, const char *, struct 
imap_cmd_refcounted_state * );
 static void imap_select2_p2( imap_store_t *, struct imap_cmd *, int );
 
 static void
@@ -1348,13 +1456,11 @@ imap_select_p2( imap_store_t *ctx, struct imap_cmd 
*cmd, int response )
                free( cmdp->excs );
                cmdp->gen.callback( response, cmdp->gen.callback_aux );
        } else {
-               struct imap_cmd_refcounted *cmd2 = 0;
                struct imap_cmd_refcounted_state *sts = nfmalloc( sizeof(*sts) 
);
                sts->callback = cmdp->gen.callback;
                sts->callback_aux = cmdp->gen.callback_aux;
                sts->ref_count = 1; /* so forced sync does not cause an early 
exit */
                sts->ret_val = DRV_OK;
-               ctx->ref_count++;
 
                ctx->msgapp = &ctx->gen.msgs;
                sort_ints( cmdp->excs, cmdp->nexcs );
@@ -1368,9 +1474,7 @@ imap_select_p2( imap_store_t *ctx, struct imap_cmd *cmd, 
int response )
                                if (i != j)
                                        bl += sprintf( buf + bl, ":%d", 
cmdp->excs[i] );
                        }
-                       imap_submit_select2( ctx, buf, sts, &cmd2 );
-                       if (ctx->store_canceled) {
-                               deref_store( ctx );
+                       if (imap_submit_select2( ctx, buf, sts ) < 0) {
                                free( cmdp->excs );
                                return;
                        }
@@ -1379,35 +1483,27 @@ imap_select_p2( imap_store_t *ctx, struct imap_cmd 
*cmd, int response )
                        cmdp->maxuid = ctx->uidnext >= 0 ? ctx->uidnext - 1 : 
1000000000;
                if (cmdp->maxuid >= cmdp->minuid) {
                        sprintf( buf, "%d:%d", cmdp->minuid, cmdp->maxuid );
-                       imap_submit_select2( ctx, buf, sts, &cmd2 );
-                       if (ctx->store_canceled) {
-                               deref_store( ctx );
+                       if (imap_submit_select2( ctx, buf, sts ) < 0) {
                                free( cmdp->excs );
                                return;
                        }
                }
-               deref_store( ctx );
                free( cmdp->excs );
                if (!--sts->ref_count)
                        imap_refcounted_done( sts );
-               else
-                       get_cmd_result( ctx, &cmd2->gen );
        }
 }
 
-static void
-imap_submit_select2( imap_store_t *ctx, const char *buf, struct 
imap_cmd_refcounted_state *sts,
-                     struct imap_cmd_refcounted **cmdp )
+static int
+imap_submit_select2( imap_store_t *ctx, const char *buf, struct 
imap_cmd_refcounted_state *sts )
 {
        struct imap_cmd_refcounted *cmd = (struct imap_cmd_refcounted 
*)new_imap_cmd( sizeof(*cmd) );
-       cmd->gen.param.done = imap_select2_p2;
        cmd->state = sts;
        sts->ref_count++;
-       *cmdp = cmd;
-       submit_imap_cmd( ctx, &cmd->gen,
-                        "UID FETCH %s (UID%s%s)", buf,
-                        (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
-                        (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" );
+       return imap_exec( ctx, &cmd->gen, imap_select2_p2,
+                         "UID FETCH %s (UID%s%s)", buf,
+                         (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
+                         (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" );
 }
 
 static void
@@ -1473,12 +1569,11 @@ imap_flags_helper( imap_store_t *ctx, int uid, char 
what, int flags,
        char buf[256];
 
        struct imap_cmd_refcounted *cmd = (struct imap_cmd_refcounted 
*)new_imap_cmd( sizeof(*cmd) );
-       cmd->gen.param.done = imap_set_flags_p2;
        cmd->state = sts;
        sts->ref_count++;
        buf[imap_make_flags( flags, buf )] = 0;
-       submit_imap_cmd( ctx, &cmd->gen, "UID STORE %d %cFLAGS.SILENT %s", uid, 
what, buf );
-       return ctx->store_canceled;
+       return imap_exec( ctx, &cmd->gen, imap_set_flags_p2,
+                         "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
 }
 
 static void
@@ -1501,20 +1596,15 @@ imap_set_flags( store_t *gctx, message_t *msg, int uid, 
int add, int del,
                sts->callback_aux = aux;
                sts->ref_count = 1; /* so forced sync does not cause an early 
exit */
                sts->ret_val = DRV_OK;
-               ctx->ref_count++;
-               if ((add && imap_flags_helper( ctx, uid, '+', add, sts )) ||
-                   (del && imap_flags_helper( ctx, uid, '-', del, sts )))
+               if ((add && imap_flags_helper( ctx, uid, '+', add, sts ) < 0) ||
+                   (del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0))
                {
-                       deref_store( ctx );
                        if (!--sts->ref_count)
                                free( sts );
                        return;
                }
                if (!--sts->ref_count)
                        imap_refcounted_done( sts );
-               else
-                       process_imap_replies( ctx );
-               deref_store( ctx );
        } else {
                cb( DRV_OK, aux );
        }
@@ -1684,8 +1774,17 @@ static void
 imap_cancel( store_t *gctx,
              void (*cb)( void *aux ), void *aux )
 {
-       (void)gctx;
-       cb( aux );
+       imap_store_t *ctx = (imap_store_t *)gctx;
+
+       socket_cancel_delimited( &ctx->conn );
+       cancel_pending_imap_cmds( ctx );
+       if (ctx->in_progress) {
+               ctx->canceling = 1;
+               ctx->callbacks.imap_cancel = cb;
+               ctx->callback_aux = aux;
+       } else {
+               cb( aux );
+       }
 }
 
 /******************* imap_commit *******************/
@@ -1732,7 +1831,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, 
int *err )
        server->require_ssl = 1;
        server->sconf.use_tlsv1 = 1;
 #endif
-       server->max_in_progress = 50;
+       server->max_in_progress = INT_MAX;
 
        while (getcline( cfg ) && cfg->cmd) {
                if (!strcasecmp( "Host", cfg->cmd )) {
@@ -1761,7 +1860,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, 
int *err )
                        server->sconf.port = parse_int( cfg );
                else if (!strcasecmp( "PipelineDepth", cfg->cmd )) {
                        if ((server->max_in_progress = parse_int( cfg )) < 1) {
-                               error( "%s:%d: PipelineDepth must be at least 
1\n" );
+                               error( "%s:%d: PipelineDepth must be at least 
1\n", cfg->file, cfg->line );
                                *err = 1;
                        }
                }
diff --git a/src/isync.h b/src/isync.h
index dd55945..b821721 100644
--- a/src/isync.h
+++ b/src/isync.h
@@ -73,16 +73,37 @@ typedef struct server_conf {
 #endif
 } server_conf_t;
 
+typedef struct buff_chunk {
+       struct buff_chunk *next;
+       char *data;
+       int len;
+       char buf[1];
+} buff_chunk_t;
+
 typedef struct {
+       /* connection */
        int fd;
+       int state;
+       const server_conf_t *conf; /* needed during connect */
 #if HAVE_LIBSSL
        SSL *ssl;
-       unsigned int use_ssl:1;
+       unsigned use_ssl:1;
 #endif
 
-       int bytes;
-       int offset;
-       char buf[1024];
+       void (*bad_callback)( void *aux ); /* async fail while sending or 
listening */
+       void (*read_callback)( void *aux ); /* data available for reading */
+       void (*action_callback)( int ok, void *aux ); /* connect or starttls */
+       void *callback_aux;
+
+       /* writing */
+       buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+       int write_offset; /* offset into buffer head */
+
+       /* reading */
+       int offset; /* start of filled bytes in buffer */
+       int bytes; /* number of filled bytes in buffer */
+       int scanoff; /* offset to continue scanning for newline at, relative to 
'offset' */
+       char buf[100000];
 } conn_t;
 
 typedef struct {
@@ -327,15 +348,28 @@ extern const char *Home;
 
 /* socket.c */
 
-int socket_connect( const server_conf_t *conf, conn_t *sock );
-int socket_start_tls( const server_conf_t *conf, conn_t *sock );
+/* call this before doing anything with the socket */
+static INLINE void socket_init( conn_t *conn,
+                                const server_conf_t *conf,
+                                void (*bad_callback)( void *aux ),
+                                void (*read_callback)( void *aux ),
+                                void *aux )
+{
+    conn->conf = conf;
+    conn->bad_callback = bad_callback;
+    conn->read_callback = read_callback;
+    conn->callback_aux = aux;
+    conn->fd = -1;
+}
+void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) );
+void socket_start_tls( conn_t *conn, void (*cb)( int ok, void *aux ) );
 void socket_close( conn_t *sock );
-int socket_read( conn_t *sock, char *buf, int len );
+int socket_read( conn_t *sock, char *buf, int len ); /* never waits */
+char *socket_read_line( conn_t *sock ); /* don't free return value; never 
waits */
 typedef enum { KeepOwn = 0, GiveOwn } ownership_t;
 int socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn );
-int socket_pending( conn_t *sock );
-
-int buffer_gets( conn_t *b, char **s );
+void socket_delimit( conn_t *conn );
+void socket_cancel_delimited( conn_t *conn );
 
 void cram( const char *challenge, const char *user, const char *pass,
            char **_final, int *_finallen );
diff --git a/src/mbsync.1 b/src/mbsync.1
index 4f47668..e8cbabb 100644
--- a/src/mbsync.1
+++ b/src/mbsync.1
@@ -281,10 +281,8 @@ Use TLSv1 for communication with the IMAP server over SSL?
 \fBPipelineDepth\fR \fIdepth\fR
 Maximum number of IMAP commands which can be simultaneously in flight.
 Setting this to \fI1\fR disables pipelining.
-Setting it to a too big value may deadlock isync.
-Currently, this affects only a few commands.
 This is mostly a debugging only option.
-(Default: \fI50\fR)
+(Default: \fIunlimited\fR)
 ..
 .SS IMAP Stores
 The reference point for relative \fBPath\fRs is whatever the server likes it
diff --git a/src/socket.c b/src/socket.c
index 4bb33c5..1351f03 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -36,52 +36,79 @@
 #include <assert.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <stddef.h>
 #include <errno.h>
 #include <string.h>
+#include <fcntl.h>
 #include <sys/socket.h>
 #include <sys/ioctl.h>
-#ifdef HAVE_SYS_FILIO_H
-# include <sys/filio.h>
-#endif
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
 
+enum {
+       SCK_CONNECTING,
 #if HAVE_LIBSSL
-/* Some of this code is inspired by / lifted from mutt. */
+       SCK_STARTTLS,
+#endif
+       SCK_READY
+};
 
 static void
-socket_perror( const char *func, conn_t *sock, int ret )
+socket_fail( conn_t *conn )
 {
 #if HAVE_LIBSSL
-       int err;
-
-       if (sock->use_ssl) {
-               switch ((err = SSL_get_error( sock->ssl, ret ))) {
-               case SSL_ERROR_SYSCALL:
-               case SSL_ERROR_SSL:
-                       if ((err = ERR_get_error()) == 0) {
-                               if (ret == 0)
-                                       error( "SSL_%s: got EOF\n", func );
-                               else
-                                       error( "SSL_%s: %s\n", func, 
strerror(errno) );
-                       } else
-                               error( "SSL_%s: %s\n", func, ERR_error_string( 
err, 0 ) );
-                       return;
-               default:
-                       error( "SSL_%s: unhandled SSL error %d\n", func, err );
-                       break;
-               }
-               return;
-       }
-#else
-       (void)sock;
+       if (conn->state == SCK_STARTTLS)
+               conn->action_callback( 0, conn->callback_aux );
+       else
 #endif
+               conn->bad_callback( conn->callback_aux );
+}
+
+static void
+socket_perror( const char *func, conn_t *conn, int ret )
+{
        if (ret < 0)
                perror( func );
        else
                error( "%s: unexpected EOF\n", func );
+       socket_fail( conn );
+}
+
+#if HAVE_LIBSSL
+/* Some of this code is inspired by / lifted from mutt. */
+
+static int
+ssl_return( const char *func, conn_t *conn, int ret )
+{
+       int err;
+
+       switch ((err = SSL_get_error( conn->ssl, ret ))) {
+       case SSL_ERROR_NONE:
+               return 1;
+       case SSL_ERROR_WANT_WRITE:
+               conf_fd( conn->fd, POLLIN, POLLOUT );
+               /* fallthrough */
+       case SSL_ERROR_WANT_READ:
+               return 0;
+       case SSL_ERROR_SYSCALL:
+       case SSL_ERROR_SSL:
+               if (!(err = ERR_get_error())) {
+                       if (ret == 0)
+                               error( "SSL_%s: unexpected EOF\n", func );
+                       else
+                               error( "SSL_%s: %s\n", func, strerror( errno ) 
);
+               } else {
+                       error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) 
);
+               }
+               break;
+       default:
+               error( "SSL_%s: unhandled SSL error %d\n", func, err );
+               break;
+       }
+       socket_fail( conn );
+       return -1;
 }
 
 static int
@@ -241,46 +268,79 @@ init_ssl_ctx( const server_conf_t *conf )
        return 0;
 }
 
-int
-socket_start_tls( const server_conf_t *conf, conn_t *sock )
+static void start_tls_p2( conn_t * );
+static void start_tls_p3( conn_t *, int );
+
+void
+socket_start_tls( conn_t *conn, void (*cb)( int ok, void *aux ) )
 {
-       int ret;
        static int ssl_inited;
 
+       conn->action_callback = cb;
+
        if (!ssl_inited) {
                SSL_library_init();
                SSL_load_error_strings();
                ssl_inited = 1;
        }
 
-       if (!conf->SSLContext && init_ssl_ctx( conf ))
-               return 1;
-
-       sock->ssl = SSL_new( ((server_conf_t *)conf)->SSLContext );
-       SSL_set_fd( sock->ssl, sock->fd );
-       if ((ret = SSL_connect( sock->ssl )) <= 0) {
-               socket_perror( "connect", sock, ret );
-               return 1;
+       if (!conn->conf->SSLContext && init_ssl_ctx( conn->conf )) {
+               start_tls_p3( conn, 0 );
+               return;
        }
 
-       /* verify the server certificate */
-       if (verify_cert( conf, sock ))
-               return 1;
+       conn->ssl = SSL_new( ((server_conf_t *)conn->conf)->SSLContext );
+       SSL_set_fd( conn->ssl, conn->fd );
+       SSL_set_mode( conn->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER );
+       start_tls_p2( conn );
+}
 
-       sock->use_ssl = 1;
-       info( "Connection is now encrypted\n" );
-       return 0;
+static void
+start_tls_p2( conn_t *conn )
+{
+       switch (ssl_return( "connect", conn, SSL_connect( conn->ssl ) )) {
+       case 0:
+               break;
+       case 1:
+               /* verify the server certificate */
+               if (verify_cert( conn->conf, conn )) {
+                       start_tls_p3( conn, 0 );
+               } else {
+                       info( "Connection is now encrypted\n" );
+                       conn->use_ssl = 1;
+                       start_tls_p3( conn, 1 );
+               }
+               break;
+       default:
+               start_tls_p3( conn, 0 );
+               break;
+       }
+}
+
+static void
+start_tls_p3( conn_t *conn, int ok )
+{
+       conn->state = SCK_READY;
+       conn->action_callback( ok, conn->callback_aux );
 }
 
 #endif /* HAVE_LIBSSL */
 
-int
-socket_connect( const server_conf_t *conf, conn_t *sock )
+static void socket_fd_cb( int, void * );
+
+static void socket_connected2( conn_t * );
+static void socket_connect_bail( conn_t * );
+
+void
+socket_connect( conn_t *sock, void (*cb)( int ok, void *aux ) )
 {
+       const server_conf_t *conf = sock->conf;
        struct hostent *he;
        struct sockaddr_in addr;
        int s, a[2];
 
+       sock->action_callback = cb;
+
        /* open connection to IMAP server */
        if (conf->tunnel) {
                infon( "Starting tunnel '%s'... ", conf->tunnel );
@@ -301,6 +361,10 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
 
                close( a[0] );
                sock->fd = a[1];
+
+               fcntl( a[1], F_SETFD, O_NONBLOCK );
+               add_fd( a[1], socket_fd_cb, sock );
+
        } else {
                memset( &addr, 0, sizeof(addr) );
                addr.sin_port = conf->port ? htons( conf->port ) :
@@ -314,7 +378,7 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
                he = gethostbyname( conf->host );
                if (!he) {
                        error( "IMAP error: Cannot resolve server '%s'\n", 
conf->host );
-                       return -1;
+                       goto bail;
                }
                info( "ok\n" );
 
@@ -325,24 +389,78 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
                        perror( "socket" );
                        exit( 1 );
                }
+               sock->fd = s;
+               fcntl( s, F_SETFD, O_NONBLOCK );
+               add_fd( s, socket_fd_cb, sock );
 
-               infon( "Connecting to %s:%hu... ", inet_ntoa( addr.sin_addr ), 
ntohs( addr.sin_port ) );
+               infon( "Connecting to %s (%s:%hu) ... ",
+                      conf->host, inet_ntoa( addr.sin_addr ), ntohs( 
addr.sin_port ) );
                if (connect( s, (struct sockaddr *)&addr, sizeof(addr) )) {
-                       close( s );
-                       perror( "connect" );
-                       return -1;
+                       if (errno != EINPROGRESS) {
+                               perror( "connect" );
+                               del_fd( s );
+                               close( s );
+                               goto bail;
+                       }
+                       conf_fd( s, 0, POLLOUT );
+                       sock->state = SCK_CONNECTING;
+                       info( "\n" );
+                       return;
                }
 
-               sock->fd = s;
        }
        info( "ok\n" );
-       return 0;
+       socket_connected2( sock );
+       return;
+
+  bail:
+       socket_connect_bail( sock );
+}
+
+static void
+socket_connected( conn_t *conn )
+{
+       int soerr;
+       socklen_t selen;
+
+       infon( "Connecting to %s: ", conn->conf->host );
+       if (getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, &soerr, &selen )) {
+               perror( "getsockopt" );
+               exit( 1 );
+       }
+       if (soerr) {
+               errno = soerr;
+               perror( "connect" );
+               del_fd( conn->fd );
+               close( conn->fd );
+               socket_connect_bail( conn );
+               return;
+       }
+       info( "ok\n" );
+       socket_connected2( conn );
 }
 
+static void
+socket_connected2( conn_t *conn )
+{
+       conf_fd( conn->fd, 0, POLLIN );
+       conn->state = SCK_READY;
+       conn->action_callback( 1, conn->callback_aux );
+}
+
+static void
+socket_connect_bail( conn_t *conn )
+{
+       conn->action_callback( 0, conn->callback_aux );
+}
+
+static void dispose_chunks( conn_t *conn, buff_chunk_t **bcp );
+
 void
 socket_close( conn_t *sock )
 {
        if (sock->fd >= 0) {
+               del_fd( sock->fd );
                close( sock->fd );
                sock->fd = -1;
        }
@@ -355,114 +473,241 @@ socket_close( conn_t *sock )
                sock->ssl = 0;
        }
 #endif
+       dispose_chunks( sock, &sock->write_buf );
 }
 
-int
-socket_read( conn_t *sock, char *buf, int len )
+static int
+do_write( conn_t *conn, char *buf, int len )
 {
        int n;
 
-       assert( sock->fd >= 0 );
-       n =
+       assert( conn->fd >= 0 );
 #if HAVE_LIBSSL
-               sock->use_ssl ? SSL_read( sock->ssl, buf, len ) :
+       if (conn->use_ssl) {
+               switch (ssl_return( "write", conn, SSL_write( conn->ssl, buf, 
len ) )) {
+               default:
+                       return -1;
+               case 1:
+                       return len;
+               case 0:
+                       return 0;
+               }
+       }
 #endif
-               read( sock->fd, buf, len );
-       if (n <= 0) {
-               socket_perror( "read", sock, n );
-               close( sock->fd );
-               sock->fd = -1;
+       n = write( conn->fd, buf, len );
+       if (n < 0) {
+               if (errno == EAGAIN) {
+                       conf_fd( conn->fd, POLLIN, POLLOUT );
+                       return 0;
+               }
+               perror( "write" );
+               socket_fail( conn );
+               return -1;
        }
        return n;
 }
 
-int
-socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn )
+static void
+dispose_chunk( conn_t *conn, buff_chunk_t **bcp )
 {
-       int n;
+       buff_chunk_t *bc = *bcp;
+       if (!(*bcp = bc->next))
+               conn->write_buf_append = bcp;
+       if (bc->data != bc->buf)
+               free( bc->data );
+       free( bc );
+}
 
-       assert( sock->fd >= 0 );
-       n =
-#if HAVE_LIBSSL
-               sock->use_ssl ? SSL_write( sock->ssl, buf, len ) :
-#endif
-               write( sock->fd, buf, len );
-       if (takeOwn == GiveOwn)
-               free( buf );
-       if (n != len) {
-               socket_perror( "write", sock, n );
-               close( sock->fd );
-               sock->fd = -1;
-               return -1;
+static void
+dispose_chunks( conn_t *conn, buff_chunk_t **bcp )
+{
+       while (*bcp)
+               dispose_chunk( conn, bcp );
+}
+
+static int
+do_queued_write( conn_t *conn )
+{
+       buff_chunk_t *bc;
+       while ((bc = conn->write_buf)) {
+               int n, len = bc->len;
+               if (!len) {
+                       dispose_chunk( conn, &conn->write_buf );
+               } else {
+                       len -= conn->write_offset;
+                       if ((n = do_write( conn, bc->data + conn->write_offset, 
len )) < 0)
+                               return -1;
+                       if (n == len) {
+                               conn->write_offset = 0;
+                               dispose_chunk( conn, &conn->write_buf );
+                       } else {
+                               conn->write_offset += n;
+                       }
+               }
        }
        return 0;
 }
 
-int
-socket_pending( conn_t *sock )
+static void
+do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn )
 {
-       int num = -1;
+       buff_chunk_t *bc;
 
-       if (ioctl( sock->fd, FIONREAD, &num ) < 0)
-               return -1;
-       if (num > 0)
-               return num;
-#if HAVE_LIBSSL
-       if (sock->use_ssl)
-               return SSL_pending( sock->ssl );
-#endif
-       return 0;
+       if (takeOwn == GiveOwn) {
+               bc = nfmalloc( offsetof(buff_chunk_t, buf) );
+               bc->data = buf;
+       } else {
+               bc = nfmalloc( offsetof(buff_chunk_t, buf) + len );
+               bc->data = bc->buf;
+               memcpy( bc->data, buf, len );
+       }
+       bc->len = len;
+       *conn->write_buf_append = bc;
+       conn->write_buf_append = &bc->next;
 }
 
-/* simple line buffering */
 int
-buffer_gets( conn_t *b, char **s )
+socket_write( conn_t *conn, char *buf, int len, ownership_t takeOwn )
 {
-       int n;
-       int start = b->offset;
-
-       *s = b->buf + start;
+       if (conn->write_buf) {
+               do_append( conn, buf, len, takeOwn );
+               return len;
+       } else {
+               int n = do_write( conn, buf, len );
+               if (n != len && n > 0) {
+                       conn->write_offset = n;
+                       do_append( conn, buf, len, takeOwn );
+               } else if (takeOwn) {
+                       free( buf );
+               }
+               return n;
+       }
+}
 
-       for (;;) {
-               /* make sure we have enough data to read the \r\n sequence */
-               if (b->offset + 1 >= b->bytes) {
-                       if (start) {
-                               /* shift down used bytes */
-                               *s = b->buf;
+void
+socket_delimit( conn_t *conn )
+{
+       if (conn->write_buf)
+               do_append( conn, 0, 0, GiveOwn );
+}
 
-                               assert( start <= b->bytes );
-                               n = b->bytes - start;
+void
+socket_cancel_delimited( conn_t *conn )
+{
+       buff_chunk_t **bcp;
 
-                               if (n)
-                                       memmove( b->buf, b->buf + start, n );
-                               b->offset -= start;
-                               b->bytes = n;
-                               start = 0;
-                       }
+       for (bcp = &conn->write_buf; *bcp && (*bcp)->data; bcp = &(*bcp)->next) 
;
+       dispose_chunks( conn, bcp );
+}
 
-                       n = socket_read( b, b->buf + b->bytes,
-                                        sizeof(b->buf) - b->bytes );
+int
+socket_read( conn_t *conn, char *buf, int len )
+{
+       int n = conn->bytes;
+       if (n > len)
+               n = len;
+       memcpy( buf, conn->buf + conn->offset, n );
+       conn->offset += n;
+       conn->bytes -= n;
+       return n;
+}
 
-                       if (n <= 0)
-                               return -1;
+char *
+socket_read_line( conn_t *b )
+{
+       char *p, *s;
+       int n;
 
-                       b->bytes += n;
+       s = b->buf + b->offset;
+       p = memchr( s + b->scanoff, '\n', b->bytes - b->scanoff );
+       if (!p) {
+               b->scanoff = b->bytes;
+               if (b->offset + b->bytes == sizeof(b->buf)) {
+                       memmove( b->buf, b->buf + b->offset, b->bytes );
+                       b->offset = 0;
                }
+               return 0;
+       }
+       n = p + 1 - s;
+       b->offset += n;
+       b->bytes -= n;
+       b->scanoff = 0;
+       if (p != s && p[-1] == '\r')
+               p--;
+       *p = 0;
+       if (DFlags & VERBOSE)
+               puts( s );
+       return s;
+}
 
-               if (b->buf[b->offset] == '\r') {
-                       assert( b->offset + 1 < b->bytes );
-                       if (b->buf[b->offset + 1] == '\n') {
-                               b->buf[b->offset] = 0;  /* terminate the string 
*/
-                               b->offset += 2; /* next line */
-                               if (DFlags & VERBOSE)
-                                       puts( *s );
-                               return 0;
-                       }
+static int
+do_read( conn_t *conn, char *buf, int len )
+{
+       int n;
+
+       assert( conn->fd >= 0 );
+#if HAVE_LIBSSL
+       if (conn->use_ssl) {
+               n = SSL_read( conn->ssl, buf, len );
+               switch (ssl_return( "read", conn, n )) {
+               case 1:
+                       break;
+               default: /* error (already called back) */
+               case 0: /* want something */
+                       return 0;
                }
+       } else
+#endif
+               n = read( conn->fd, buf, len );
+       if (n <= 0)
+               socket_perror( "read", conn, n );
+       return n;
+}
+
+static void
+buffered_read( conn_t *conn )
+{
+       int n = conn->offset + conn->bytes;
+       n = do_read( conn, conn->buf + n, sizeof(conn->buf) - n );
+       if (n > 0) {
+               conn->bytes += n;
+               conn->read_callback( conn->callback_aux );
+       }
+}
+
+static void
+socket_fd_cb( int events, void *aux )
+{
+       conn_t *conn = (conn_t *)aux;
 
-               b->offset++;
+       if (conn->state == SCK_CONNECTING) {
+               socket_connected( conn );
+               return;
+       }
+
+#if HAVE_LIBSSL
+       if (conn->state == SCK_STARTTLS) {
+               start_tls_p2( conn );
+               return;
+       }
+       if (conn->use_ssl) {
+               conf_fd( conn->fd, POLLIN, 0 );
+               if (conn->write_buf)
+                       if (do_queued_write( conn ) < 0)
+                               return;
+               buffered_read( conn );
+               return;
+       }
+#endif
+
+       if (events & POLLOUT) {
+               if (do_queued_write( conn ) < 0)
+                       return;
+               if (conn->fd >= 0 && !conn->write_buf)
+                       conf_fd( conn->fd, POLLIN, 0 );
        }
-       /* not reached */
+       if (events & POLLIN)
+               buffered_read( conn );
 }
 
 #if HAVE_LIBSSL

------------------------------------------------------------------------------
Colocation vs. Managed Hosting
A question and answer guide to determining the best fit
for your organization - today and in the future.
http://p.sf.net/sfu/internap-sfd2d
_______________________________________________
isync-devel mailing list
isync-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/isync-devel

Reply via email to