commit ac66c19a81164f02505e0829278c8446b301d4e8
Author: Oswald Buddenhagen <[email protected]>
Date: Sun Mar 27 12:59:44 2011 +0200
fully asynchronous IMAP operation
- asynchronous sockets using an event loop
- connect & starttls have completion callback parameters
- callbacks for notification about filled input buffer and emptied
output buffer
- unsent imap command queue
- used when
- socket output buffer is non-empty
- number of commands in flight exceeds limit
- last sent command requires round-trip
- command has a dependency on completion of previous command
- trashnc is tri-state so only a single "scout" trash APPEND/COPY is
sent at first. a possibly resulting CREATE is injected in front of
the remaining trash commands, so they can succeed (or be cancel()d
if it fails).
- queue's presence necessitates imap_cancel implementation
configure.in | 2 +-
src/drv_imap.c | 344 ++++++++++++++++++++++++-----------------
src/isync.h | 34 ++++-
src/mbsync.1 | 4 +-
src/socket.c | 401 +++++++++++++++++++++++++++++++++++++----------
5 files changed, 547 insertions(+), 238 deletions(-)
diff --git a/configure.in b/configure.in
index f63081a..36b07d0 100644
--- a/configure.in
+++ b/configure.in
@@ -9,7 +9,7 @@ if test "$GCC" = yes; then
CFLAGS="$CFLAGS -pipe -W -Wall -Wshadow -Wstrict-prototypes"
fi
-AC_CHECK_HEADERS(sys/filio.h sys/poll.h sys/select.h)
+AC_CHECK_HEADERS(sys/poll.h sys/select.h)
AC_CHECK_FUNCS(vasprintf)
AC_CHECK_LIB(socket, socket, [SOCK_LIBS="-lsocket"])
diff --git a/src/drv_imap.c b/src/drv_imap.c
index da9cbc3..5912e15 100644
--- a/src/drv_imap.c
+++ b/src/drv_imap.c
@@ -80,7 +80,8 @@ typedef struct imap_store {
store_t gen;
const char *prefix;
int uidnext; /* from SELECT responses */
- unsigned trashnc:1; /* trash folder's existence is not confirmed yet */
+ /* trash folder's existence is not confirmed yet */
+ enum { TrashUnknown, TrashChecking, TrashKnown } trashnc;
unsigned got_namespace:1;
list_t *ns_personal, *ns_other, *ns_shared; /* NAMESPACE info */
message_t **msgapp; /* FETCH results */
@@ -88,12 +89,15 @@ typedef struct imap_store {
parse_list_state_t parse_list_sts;
/* command queue */
int nexttag, num_in_progress, literal_pending;
+ struct imap_cmd *pending, **pending_append;
struct imap_cmd *in_progress, **in_progress_append;
/* Used during sequential operations like connect */
enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth }
greeting;
+ int canceling; /* imap_cancel() is in progress */
union {
int (*imap_open)( store_t *srv, void *aux );
+ void (*imap_cancel)( void *aux );
} callbacks;
void *callback_aux;
@@ -114,6 +118,7 @@ struct imap_cmd {
int data_len;
int uid; /* to identify fetch responses */
unsigned
+ high_prio:1, /* if command is queued, put it at the
front of the queue. */
to_trash:1, /* we are storing to trash, not current. */
create:1, /* create the mailbox if we get an error ...
*/
trycreate:1; /* ... but only if this is true or the
server says so. */
@@ -178,9 +183,6 @@ static const char *cap_list[] = {
#define RESP_NO 1
#define RESP_CANCEL 2
-static int get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd );
-
-
static const char *Flags[] = {
"Draft",
"Flagged",
@@ -217,30 +219,18 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
int response )
return ret;
}
-static struct imap_cmd *
-v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd,
- const char *fmt, va_list ap )
+static int
+send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
{
int bufl, litplus;
const char *buffmt;
char buf[1024];
- assert( ctx );
- assert( ctx->gen.bad_callback );
- assert( cmd );
- assert( cmd->param.done );
-
- while (ctx->literal_pending)
- if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
- goto bail;
-
cmd->tag = ++ctx->nexttag;
- if (fmt)
- nfvasprintf( &cmd->cmd, fmt, ap );
if (!cmd->param.data) {
buffmt = "%d %s\r\n";
litplus = 0;
- } else if ((cmd->param.to_trash && ctx->trashnc) || !CAP(LITERALPLUS)) {
+ } else if ((cmd->param.to_trash && ctx->trashnc == TrashUnknown) ||
!CAP(LITERALPLUS)) {
buffmt = "%d %s{%d}\r\n";
litplus = 0;
} else {
@@ -268,27 +258,52 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd
*cmd,
} else if (cmd->param.cont || cmd->param.data) {
ctx->literal_pending = 1;
}
+ if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
+ ctx->trashnc = TrashChecking;
cmd->next = 0;
*ctx->in_progress_append = cmd;
ctx->in_progress_append = &cmd->next;
ctx->num_in_progress++;
- return cmd;
+ return 0;
bail:
done_imap_cmd( ctx, cmd, RESP_CANCEL );
- return NULL;
+ return -1;
}
-static struct imap_cmd *
-submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, const char *fmt, ...
)
+static int
+cmd_submittable( imap_store_t *ctx, struct imap_cmd *cmd )
{
- struct imap_cmd *ret;
- va_list ap;
+ return !ctx->conn.write_buf &&
+ !ctx->literal_pending &&
+ !(cmd->param.to_trash && ctx->trashnc == TrashChecking) &&
+ ctx->num_in_progress < ((imap_store_conf_t
*)ctx->gen.conf)->server->max_in_progress;
+}
- va_start( ap, fmt );
- ret = v_submit_imap_cmd( ctx, cmd, fmt, ap );
- va_end( ap );
- return ret;
+static int
+flush_imap_cmds( imap_store_t *ctx )
+{
+ struct imap_cmd *cmd;
+
+ while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
+ if (!(ctx->pending = cmd->next))
+ ctx->pending_append = &ctx->pending;
+ if (send_imap_cmd( ctx, cmd ) < 0)
+ return -1;
+ }
+ return 0;
+}
+
+static void
+cancel_pending_imap_cmds( imap_store_t *ctx )
+{
+ struct imap_cmd *cmd;
+
+ while ((cmd = ctx->pending)) {
+ if (!(ctx->pending = cmd->next))
+ ctx->pending_append = &ctx->pending;
+ done_imap_cmd( ctx, cmd, RESP_CANCEL );
+ }
}
static void
@@ -304,6 +319,29 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
}
static int
+submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
+{
+ assert( ctx );
+ assert( ctx->gen.bad_callback );
+ assert( cmd );
+ assert( cmd->param.done );
+
+ if ((ctx->pending && !cmd->param.high_prio) || !cmd_submittable( ctx,
cmd )) {
+ if (ctx->pending && cmd->param.high_prio) {
+ cmd->next = ctx->pending;
+ ctx->pending = cmd;
+ } else {
+ cmd->next = 0;
+ *ctx->pending_append = cmd;
+ ctx->pending_append = &cmd->next;
+ }
+ return 0;
+ }
+
+ return send_imap_cmd( ctx, cmd );
+}
+
+static int
imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
int (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response
),
const char *fmt, ... )
@@ -314,12 +352,9 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
cmdp = new_imap_cmd( sizeof(*cmdp) );
cmdp->param.done = done;
va_start( ap, fmt );
- cmdp = v_submit_imap_cmd( ctx, cmdp, fmt, ap );
+ nfvasprintf( &cmdp->cmd, fmt, ap );
va_end( ap );
- if (!cmdp)
- return RESP_CANCEL;
-
- return get_cmd_result( ctx, cmdp );
+ return submit_imap_cmd( ctx, cmdp );
}
static void
@@ -368,7 +403,7 @@ imap_refcounted_new_state( int (*cb)( int, void * ), void
*aux )
struct imap_cmd_refcounted_state *sts = nfmalloc( sizeof(*sts) );
sts->callback = cb;
sts->callback_aux = aux;
- sts->ref_count = 1; /* so forced sync does not cause an early exit */
+ sts->ref_count = 0;
sts->ret_val = DRV_OK;
return sts;
}
@@ -390,25 +425,6 @@ imap_refcounted_done( struct imap_cmd_refcounted_state
*sts )
return ret;
}
-/*
-static void
-drain_imap_replies( imap_store_t *ctx )
-{
- while (ctx->num_in_progress)
- get_cmd_result( ctx, 0 );
-}
-*/
-
-static int
-process_imap_replies( imap_store_t *ctx )
-{
- while (ctx->num_in_progress > ((imap_store_conf_t
*)ctx->gen.conf)->server->max_in_progress ||
- socket_pending( &ctx->conn ))
- if (get_cmd_result( ctx, 0 ) == RESP_CANCEL)
- return RESP_CANCEL;
- return RESP_OK;
-}
-
static int
is_atom( list_t *list )
{
@@ -795,20 +811,22 @@ struct imap_cmd_trycreate {
static int imap_open_store_greeted( imap_store_t * );
static int get_cmd_result_p2( imap_store_t *, struct imap_cmd *, int );
-static int
-get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
+static void
+imap_socket_read( void *aux )
{
+ imap_store_t *ctx = (imap_store_t *)aux;
struct imap_cmd *cmdp, **pcmdp;
char *cmd, *arg, *arg1, *p;
int resp, resp2, tag, greeted;
greeted = ctx->greeting;
+ if (ctx->parse_list_sts.level) {
+ cmd = 0;
+ goto do_fetch;
+ }
for (;;) {
- if (!(cmd = socket_read_line( &ctx->conn ))) {
- if (socket_fill( &ctx->conn ) < 0)
- return RESP_CANCEL;
- continue;
- }
+ if (!(cmd = socket_read_line( &ctx->conn )))
+ return;
arg = next_arg( &cmd );
if (*arg == '*') {
@@ -847,11 +865,8 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
do_fetch:
if ((resp = parse_imap_list( ctx, &cmd,
&ctx->parse_list_sts )) == LIST_BAD)
break; /* stream is likely to
be useless now */
- if (resp == LIST_PARTIAL) {
- if (socket_fill( &ctx->conn ) <
0)
- return RESP_CANCEL;
- goto do_fetch;
- }
+ if (resp == LIST_PARTIAL)
+ return;
if (parse_fetch( ctx,
ctx->parse_list_sts.head ) < 0)
break; /* this may mean
anything, so prefer not to spam the log */
}
@@ -860,7 +875,9 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
break; /* this may mean anything, so prefer not
to spam the log */
}
if (greeted == GreetingPending)
- return imap_open_store_greeted( ctx );
+ if (imap_open_store_greeted( ctx ) < 0)
+ return;
+ continue;
} else if (!ctx->in_progress) {
error( "IMAP error: unexpected reply: %s %s\n", arg,
cmd ? cmd : "" );
break; /* this may mean anything, so prefer not to spam
the log */
@@ -870,24 +887,22 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
cmdp = ctx->in_progress;
if (cmdp->param.data) {
if (cmdp->param.to_trash)
- ctx->trashnc = 0; /* Can't get NO
[TRYCREATE] any more. */
+ ctx->trashnc = TrashKnown; /* Can't get
NO [TRYCREATE] any more. */
p = cmdp->param.data;
cmdp->param.data = 0;
if (socket_write( &ctx->conn, p,
cmdp->param.data_len, GiveOwn ) < 0)
- return RESP_CANCEL;
+ return;
} else if (cmdp->param.cont) {
if (cmdp->param.cont( ctx, cmdp, cmd ) < 0)
- return RESP_CANCEL;
+ return;
} else {
error( "IMAP error: unexpected command
continuation request\n" );
break;
}
if (socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0)
- return RESP_CANCEL;
+ return;
if (!cmdp->param.cont)
ctx->literal_pending = 0;
- if (!tcmd)
- return RESP_OK;
} else {
tag = atoi( arg );
for (pcmdp = &ctx->in_progress; (cmdp = *pcmdp); pcmdp
= &cmdp->next)
@@ -904,7 +919,7 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
arg = next_arg( &cmd );
if (!strcmp( "OK", arg )) {
if (cmdp->param.to_trash)
- ctx->trashnc = 0; /* Can't get NO
[TRYCREATE] any more. */
+ ctx->trashnc = TrashKnown; /* Can't get
NO [TRYCREATE] any more. */
resp = RESP_OK;
} else {
if (!strcmp( "NO", arg )) {
@@ -915,10 +930,11 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
struct imap_cmd_trycreate *cmd2
=
(struct
imap_cmd_trycreate *)new_imap_cmd( sizeof(*cmd2) );
cmd2->orig_cmd = cmdp;
- cmd2->gen.param.done =
get_cmd_result_p2;
+ cmd2->gen.param.high_prio = 1;
p = strchr( cmdp->cmd, '"' );
- if (!submit_imap_cmd( ctx,
&cmd2->gen, "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p ))
- return RESP_CANCEL;
+ if (imap_exec( ctx, &cmd2->gen,
get_cmd_result_p2,
+ "CREATE %.*s",
strchr( p + 1, '"' ) - p + 1, p ) < 0)
+ return;
continue;
}
resp = RESP_NO;
@@ -933,13 +949,19 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd )
if (resp == RESP_CANCEL)
ctx->gen.bad_callback(
ctx->gen.bad_callback_aux );
if (done_imap_cmd( ctx, cmdp, resp ) < 0)
- resp = RESP_CANCEL;
- if (resp == RESP_CANCEL || !tcmd || tcmd == cmdp)
- return resp;
+ return;
+ if (resp == RESP_CANCEL)
+ return;
+ if (ctx->canceling && !ctx->in_progress) {
+ ctx->canceling = 0;
+ ctx->callbacks.imap_cancel( ctx->callback_aux );
+ return;
+ }
}
+ if (flush_imap_cmds( ctx ) < 0)
+ return;
}
ctx->gen.bad_callback( ctx->gen.bad_callback_aux );
- return RESP_CANCEL;
}
static int
@@ -952,8 +974,11 @@ get_cmd_result_p2( imap_store_t *ctx, struct imap_cmd
*cmd, int response )
return done_imap_cmd( ctx, ocmd, response );
} else {
ctx->uidnext = 0;
+ if (ocmd->param.to_trash)
+ ctx->trashnc = TrashKnown;
ocmd->param.create = 0;
- return submit_imap_cmd( ctx, ocmd, 0 ) ? 0 : -1;
+ ocmd->param.high_prio = 1;
+ return submit_imap_cmd( ctx, ocmd );
}
}
@@ -975,6 +1000,7 @@ imap_cancel_store( store_t *gctx )
socket_close( &ctx->conn );
cancel_submitted_imap_cmds( ctx );
+ cancel_pending_imap_cmds( ctx );
free_generic_messages( gctx->msgs );
free_string_list( ctx->gen.boxes );
free_list( ctx->ns_personal );
@@ -1031,7 +1057,7 @@ imap_cleanup_p2( imap_store_t *ctx,
{
if (response != RESP_CANCEL)
imap_cancel_store( &ctx->gen );
- return 0;
+ return -1;
}
/******************* imap_open_store *******************/
@@ -1054,10 +1080,15 @@ do_cram_auth( imap_store_t *ctx, struct imap_cmd *cmdp,
const char *prompt )
}
#endif
+static void imap_open_store_connected( int, void * );
+#ifdef HAVE_LIBSSL
+static int imap_open_store_tlsstarted1( int, void * );
+#endif
static int imap_open_store_p2( imap_store_t *, struct imap_cmd *, int );
static int imap_open_store_authenticate( imap_store_t * );
#ifdef HAVE_LIBSSL
static int imap_open_store_authenticate_p2( imap_store_t *, struct imap_cmd *,
int );
+static int imap_open_store_tlsstarted2( int, void * );
static int imap_open_store_authenticate_p3( imap_store_t *, struct imap_cmd *,
int );
#endif
static int imap_open_store_authenticate2( imap_store_t * );
@@ -1102,26 +1133,45 @@ imap_open_store( store_conf_t *conf,
ctx->callback_aux = aux;
set_bad_callback( &ctx->gen, (void (*)(void *))imap_open_store_bail,
ctx );
ctx->in_progress_append = &ctx->in_progress;
+ ctx->pending_append = &ctx->pending;
- socket_init( &ctx->conn, imap_socket_fail, ctx );
-
- if (!socket_connect( &srvc->sconf, &ctx->conn ))
- goto bail;
+ socket_init( &ctx->conn, &srvc->sconf,
+ imap_socket_fail, imap_socket_read, (int (*)(void
*))flush_imap_cmds, ctx );
+ socket_connect( &ctx->conn, imap_open_store_connected );
+}
+static void
+imap_open_store_connected( int ok, void *aux )
+{
+ imap_store_t *ctx = (imap_store_t *)aux;
#ifdef HAVE_LIBSSL
- if (srvc->sconf.use_imaps) {
- if (socket_start_tls( &srvc->sconf, &ctx->conn )) {
- imap_open_store_ssl_bail( ctx );
- return;
- }
+ imap_store_conf_t *cfg = (imap_store_conf_t *)ctx->gen.conf;
+ imap_server_conf_t *srvc = cfg->server;
+#endif
+
+ if (!ok) {
+ imap_open_store_bail( ctx );
+ return;
}
+
+#ifdef HAVE_LIBSSL
+ if (srvc->sconf.use_imaps)
+ socket_start_tls( &ctx->conn, imap_open_store_tlsstarted1 );
#endif
- get_cmd_result( ctx, 0 );
- return;
+}
- bail:
- imap_open_store_bail( ctx );
+#ifdef HAVE_LIBSSL
+static int
+imap_open_store_tlsstarted1( int ok, void *aux )
+{
+ imap_store_t *ctx = (imap_store_t *)aux;
+
+ if (!ok)
+ return imap_open_store_ssl_bail( ctx );
+
+ return 0;
}
+#endif
static int
imap_open_store_greeted( imap_store_t *ctx )
@@ -1142,8 +1192,8 @@ imap_open_store_p2( imap_store_t *ctx, struct imap_cmd
*cmd ATTR_UNUSED, int res
{
if (response != RESP_OK)
return imap_open_store_bail( ctx );
- else
- return imap_open_store_authenticate( ctx );
+
+ return imap_open_store_authenticate( ctx );
}
static int
@@ -1181,10 +1231,19 @@ imap_open_store_authenticate_p2( imap_store_t *ctx,
struct imap_cmd *cmd ATTR_UN
{
if (response != RESP_OK)
return imap_open_store_bail( ctx );
- else if (socket_start_tls( &((imap_server_conf_t
*)ctx->gen.conf)->sconf, &ctx->conn ))
+
+ return socket_start_tls( &ctx->conn, imap_open_store_tlsstarted2 );
+}
+
+static int
+imap_open_store_tlsstarted2( int ok, void *aux )
+{
+ imap_store_t *ctx = (imap_store_t *)aux;
+
+ if (!ok)
return imap_open_store_ssl_bail( ctx );
- else
- return imap_exec( ctx, 0, imap_open_store_authenticate_p3,
"CAPABILITY" );
+
+ return imap_exec( ctx, 0, imap_open_store_authenticate_p3, "CAPABILITY"
);
}
static int
@@ -1192,8 +1251,8 @@ imap_open_store_authenticate_p3( imap_store_t *ctx,
struct imap_cmd *cmd ATTR_UN
{
if (response != RESP_OK)
return imap_open_store_bail( ctx );
- else
- return imap_open_store_authenticate2( ctx );
+
+ return imap_open_store_authenticate2( ctx );
}
#endif
@@ -1260,8 +1319,8 @@ imap_open_store_authenticate2_p2( imap_store_t *ctx,
struct imap_cmd *cmd ATTR_U
{
if (response != RESP_OK)
return imap_open_store_bail( ctx );
- else
- return imap_open_store_namespace( ctx );
+
+ return imap_open_store_namespace( ctx );
}
static int
@@ -1285,12 +1344,11 @@ imap_open_store_namespace( imap_store_t *ctx )
static int
imap_open_store_namespace_p2( imap_store_t *ctx, struct imap_cmd *cmd
ATTR_UNUSED, int response )
{
- if (response != RESP_OK) {
+ if (response != RESP_OK)
return imap_open_store_bail( ctx );
- } else {
- ctx->got_namespace = 1;
- return imap_open_store_namespace2( ctx );
- }
+
+ ctx->got_namespace = 1;
+ return imap_open_store_namespace2( ctx );
}
static int
@@ -1308,9 +1366,8 @@ static int
imap_open_store_finalize( imap_store_t *ctx )
{
set_bad_callback( &ctx->gen, 0, 0 );
- ctx->trashnc = 1;
- ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux );
- return 0;
+ ctx->trashnc = TrashUnknown;
+ return ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux );
}
#ifdef HAVE_LIBSSL
@@ -1358,8 +1415,7 @@ struct imap_cmd_select {
};
static int imap_select_p2( imap_store_t *, struct imap_cmd *, int );
-static int imap_submit_select2( imap_store_t *, const char *, struct
imap_cmd_refcounted_state *,
- struct imap_cmd ** );
+static int imap_submit_select2( imap_store_t *, const char *, struct
imap_cmd_refcounted_state * );
static int imap_select2_p2( imap_store_t *, struct imap_cmd *, int );
static int
@@ -1401,7 +1457,6 @@ imap_select_p2( imap_store_t *ctx, struct imap_cmd *cmd,
int response )
free( cmdp->excs );
return cmdp->gen.callback( response, cmdp->gen.callback_aux );
} else {
- struct imap_cmd *cmd2 = 0;
struct imap_cmd_refcounted_state *sts =
imap_refcounted_new_state( cmdp->gen.callback,
cmdp->gen.callback_aux );
@@ -1417,35 +1472,32 @@ imap_select_p2( imap_store_t *ctx, struct imap_cmd
*cmd, int response )
if (i != j)
bl += sprintf( buf + bl, ":%d",
cmdp->excs[i] );
}
- if (imap_submit_select2( ctx, buf, sts, &cmd2 ) < 0)
- goto done;
+ if (imap_submit_select2( ctx, buf, sts ) < 0) {
+ free( cmdp->excs );
+ return -1;
+ }
}
if (cmdp->maxuid == INT_MAX)
cmdp->maxuid = ctx->uidnext >= 0 ? ctx->uidnext - 1 :
1000000000;
if (cmdp->maxuid >= cmdp->minuid) {
sprintf( buf, "%d:%d", cmdp->minuid, cmdp->maxuid );
- imap_submit_select2( ctx, buf, sts, &cmd2 );
+ if (imap_submit_select2( ctx, buf, sts ) < 0) {
+ free( cmdp->excs );
+ return -1;
+ }
}
- done:
free( cmdp->excs );
- if (!--sts->ref_count)
- return imap_refcounted_done( sts );
- else
- return get_cmd_result( ctx, cmd2 ) == RESP_CANCEL ? -1
: 0;
+ return 0;
}
}
static int
-imap_submit_select2( imap_store_t *ctx, const char *buf, struct
imap_cmd_refcounted_state *sts,
- struct imap_cmd **cmdp )
+imap_submit_select2( imap_store_t *ctx, const char *buf, struct
imap_cmd_refcounted_state *sts )
{
- struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
- cmd->param.done = imap_select2_p2;
- *cmdp = cmd;
- return submit_imap_cmd( ctx, cmd,
- "UID FETCH %s (UID%s%s)", buf,
- (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
- (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" :
"" ) ? 0 : -1;
+ return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_select2_p2,
+ "UID FETCH %s (UID%s%s)", buf,
+ (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
+ (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" );
}
static int
@@ -1511,12 +1563,9 @@ imap_flags_helper( imap_store_t *ctx, int uid, char
what, int flags,
{
char buf[256];
- struct imap_cmd *cmd = imap_refcounted_new_cmd( sts );
- cmd->param.done = imap_set_flags_p2;
buf[imap_make_flags( flags, buf )] = 0;
- if (!submit_imap_cmd( ctx, cmd, "UID STORE %d %cFLAGS.SILENT %s", uid,
what, buf ))
- return -1;
- return process_imap_replies( ctx ) == RESP_CANCEL ? -1 : 0;
+ return imap_exec( ctx, imap_refcounted_new_cmd( sts ),
imap_set_flags_p2,
+ "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
}
static int
@@ -1535,9 +1584,8 @@ imap_set_flags( store_t *gctx, message_t *msg, int uid,
int add, int del,
if (add || del) {
struct imap_cmd_refcounted_state *sts =
imap_refcounted_new_state( cb, aux );
if ((add && imap_flags_helper( ctx, uid, '+', add, sts ) < 0) ||
- (del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0))
{}
- if (!--sts->ref_count)
- return imap_refcounted_done( sts );
+ (del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0))
+ return -1;
return 0;
} else {
return cb( DRV_OK, aux );
@@ -1709,8 +1757,16 @@ static void
imap_cancel( store_t *gctx,
void (*cb)( void *aux ), void *aux )
{
- (void)gctx;
- cb( aux );
+ imap_store_t *ctx = (imap_store_t *)gctx;
+
+ cancel_pending_imap_cmds( ctx );
+ if (ctx->in_progress) {
+ ctx->canceling = 1;
+ ctx->callbacks.imap_cancel = cb;
+ ctx->callback_aux = aux;
+ } else {
+ cb( aux );
+ }
}
/******************* imap_commit *******************/
@@ -1757,7 +1813,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep,
int *err )
server->require_ssl = 1;
server->sconf.use_tlsv1 = 1;
#endif
- server->max_in_progress = 50;
+ server->max_in_progress = INT_MAX;
while (getcline( cfg ) && cfg->cmd) {
if (!strcasecmp( "Host", cfg->cmd )) {
diff --git a/src/isync.h b/src/isync.h
index afefd49..9aac733 100644
--- a/src/isync.h
+++ b/src/isync.h
@@ -73,15 +73,36 @@ typedef struct server_conf {
#endif
} server_conf_t;
+typedef struct buff_chunk {
+ struct buff_chunk *next;
+ char *data;
+ int len;
+ char buf[1];
+} buff_chunk_t;
+
typedef struct {
+ /* connection */
int fd;
+ int state;
+ const server_conf_t *conf; /* needed during connect */
#ifdef HAVE_LIBSSL
SSL *ssl;
#endif
void (*bad_callback)( void *aux ); /* async fail while sending or
listening */
+ void (*read_callback)( void *aux ); /* data available for reading */
+ int (*write_callback)( void *aux ); /* all *queued* data was sent */
+ union {
+ void (*connect)( int ok, void *aux );
+ int (*starttls)( int ok, void *aux );
+ } callbacks;
void *callback_aux;
+ /* writing */
+ buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
+ int write_offset; /* offset into buffer head */
+
+ /* reading */
int offset; /* start of filled bytes in buffer */
int bytes; /* number of filled bytes in buffer */
int scanoff; /* offset to continue scanning for newline at, relative to
'offset' */
@@ -338,22 +359,27 @@ extern const char *Home;
/* call this before doing anything with the socket */
static INLINE void socket_init( conn_t *conn,
+ const server_conf_t *conf,
void (*bad_callback)( void *aux ),
+ void (*read_callback)( void *aux ),
+ int (*write_callback)( void *aux ),
void *aux )
{
+ conn->conf = conf;
conn->bad_callback = bad_callback;
+ conn->read_callback = read_callback;
+ conn->write_callback = write_callback;
conn->callback_aux = aux;
conn->fd = -1;
+ conn->write_buf_append = &conn->write_buf;
}
-int socket_connect( const server_conf_t *conf, conn_t *sock );
-int socket_start_tls( const server_conf_t *conf, conn_t *sock );
+void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) );
+int socket_start_tls( conn_t *conn, int (*cb)( int ok, void *aux ) );
void socket_close( conn_t *sock );
-int socket_fill( conn_t *sock );
int socket_read( conn_t *sock, char *buf, int len ); /* never waits */
char *socket_read_line( conn_t *sock ); /* don't free return value; never
waits */
typedef enum { KeepOwn = 0, GiveOwn } ownership_t;
int socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn );
-int socket_pending( conn_t *sock );
void cram( const char *challenge, const char *user, const char *pass,
char **_final, int *_finallen );
diff --git a/src/mbsync.1 b/src/mbsync.1
index 4f47668..e8cbabb 100644
--- a/src/mbsync.1
+++ b/src/mbsync.1
@@ -281,10 +281,8 @@ Use TLSv1 for communication with the IMAP server over SSL?
\fBPipelineDepth\fR \fIdepth\fR
Maximum number of IMAP commands which can be simultaneously in flight.
Setting this to \fI1\fR disables pipelining.
-Setting it to a too big value may deadlock isync.
-Currently, this affects only a few commands.
This is mostly a debugging only option.
-(Default: \fI50\fR)
+(Default: \fIunlimited\fR)
..
.SS IMAP Stores
The reference point for relative \fBPath\fRs is whatever the server likes it
diff --git a/src/socket.c b/src/socket.c
index ec1dcd2..abb3fac 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -36,56 +36,67 @@
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
+#include <stddef.h>
#include <errno.h>
#include <string.h>
+#include <fcntl.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
-#ifdef HAVE_SYS_FILIO_H
-# include <sys/filio.h>
-#endif
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
+enum {
+ SCK_CONNECTING,
+#ifdef HAVE_LIBSSL
+ SCK_STARTTLS,
+#endif
+ SCK_READY
+};
+
static void
socket_fail( conn_t *conn )
{
conn->bad_callback( conn->callback_aux );
}
-static void
-socket_perror( const char *func, conn_t *sock, int ret )
-{
#ifdef HAVE_LIBSSL
+static int
+ssl_return( const char *func, conn_t *conn, int ret )
+{
int err;
- if (sock->ssl) {
- switch ((err = SSL_get_error( sock->ssl, ret ))) {
- case SSL_ERROR_SYSCALL:
- case SSL_ERROR_SSL:
- if ((err = ERR_get_error()) == 0) {
- if (ret == 0)
- error( "SSL_%s: got EOF\n", func );
- else
- error( "SSL_%s: %s\n", func,
strerror(errno) );
- } else
- error( "SSL_%s: %s\n", func, ERR_error_string(
err, 0 ) );
- break;
- default:
- error( "SSL_%s: unhandled SSL error %d\n", func, err );
- break;
+ switch ((err = SSL_get_error( conn->ssl, ret ))) {
+ case SSL_ERROR_NONE:
+ return ret;
+ case SSL_ERROR_WANT_WRITE:
+ conf_fd( conn->fd, POLLIN, POLLOUT );
+ /* fallthrough */
+ case SSL_ERROR_WANT_READ:
+ return 0;
+ case SSL_ERROR_SYSCALL:
+ case SSL_ERROR_SSL:
+ if (!(err = ERR_get_error())) {
+ if (ret == 0)
+ error( "SSL_%s: unexpected EOF\n", func );
+ else
+ error( "SSL_%s: %s\n", func, strerror( errno )
);
+ } else {
+ error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 )
);
}
- } else
-#endif
- if (ret < 0)
- perror( func );
+ break;
+ default:
+ error( "SSL_%s: unhandled SSL error %d\n", func, err );
+ break;
+ }
+ if (conn->state == SCK_STARTTLS)
+ conn->callbacks.starttls( 0, conn->callback_aux );
else
- error( "%s: unexpected EOF\n", func );
- socket_fail( sock );
+ socket_fail( conn );
+ return -1;
}
-#ifdef HAVE_LIBSSL
/* Some of this code is inspired by / lifted from mutt. */
static int
@@ -245,45 +256,83 @@ init_ssl_ctx( const server_conf_t *conf )
return 0;
}
+static int start_tls_p2( conn_t * );
+static int start_tls_p3( conn_t *, int );
+
int
-socket_start_tls( const server_conf_t *conf, conn_t *sock )
+socket_start_tls( conn_t *conn, int (*cb)( int ok, void *aux ) )
{
- int ret;
static int ssl_inited;
+ conn->callbacks.starttls = cb;
+
if (!ssl_inited) {
SSL_library_init();
SSL_load_error_strings();
ssl_inited = 1;
}
- if (!conf->SSLContext && init_ssl_ctx( conf ))
- return 1;
+ if (!conn->conf->SSLContext && init_ssl_ctx( conn->conf ))
+ return start_tls_p3( conn, 0 );
- sock->ssl = SSL_new( ((server_conf_t *)conf)->SSLContext );
- SSL_set_fd( sock->ssl, sock->fd );
- if ((ret = SSL_connect( sock->ssl )) <= 0) {
- socket_perror( "connect", sock, ret );
- return 1;
- }
+ conn->ssl = SSL_new( ((server_conf_t *)conn->conf)->SSLContext );
+ SSL_set_fd( conn->ssl, conn->fd );
+ SSL_set_mode( conn->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER );
+ return start_tls_p2( conn );
+}
- /* verify the server certificate */
- if (verify_cert( conf, sock ))
- return 1;
+static int
+start_tls_p2( conn_t *conn )
+{
+ switch (ssl_return( "connect", conn, SSL_connect( conn->ssl ) )) {
+ case -1:
+ start_tls_p3( conn, 0 );
+ return -1;
+ case 0:
+ return 0;
+ default:
+ /* verify the server certificate */
+ if (verify_cert( conn->conf, conn )) {
+ return start_tls_p3( conn, 0 );
+ } else {
+ info( "Connection is now encrypted\n" );
+ return start_tls_p3( conn, 1 );
+ }
+ }
+}
- info( "Connection is now encrypted\n" );
- return 0;
+static int
+start_tls_p3( conn_t *conn, int ok )
+{
+ conn->state = SCK_READY;
+ return conn->callbacks.starttls( ok, conn->callback_aux );
}
#endif /* HAVE_LIBSSL */
-int
-socket_connect( const server_conf_t *conf, conn_t *sock )
+static void socket_fd_cb( int, void * );
+
+static void socket_connected2( conn_t * );
+static void socket_connect_bail( conn_t * );
+
+static void
+socket_close_internal( conn_t *sock )
+{
+ del_fd( sock->fd );
+ close( sock->fd );
+ sock->fd = -1;
+}
+
+void
+socket_connect( conn_t *sock, void (*cb)( int ok, void *aux ) )
{
+ const server_conf_t *conf = sock->conf;
struct hostent *he;
struct sockaddr_in addr;
int s, a[2];
+ sock->callbacks.connect = cb;
+
/* open connection to IMAP server */
if (conf->tunnel) {
infon( "Starting tunnel '%s'... ", conf->tunnel );
@@ -304,6 +353,10 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
close( a[0] );
sock->fd = a[1];
+
+ fcntl( a[1], F_SETFL, O_NONBLOCK );
+ add_fd( a[1], socket_fd_cb, sock );
+
} else {
memset( &addr, 0, sizeof(addr) );
addr.sin_port = conf->port ? htons( conf->port ) :
@@ -317,7 +370,7 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
he = gethostbyname( conf->host );
if (!he) {
error( "IMAP error: Cannot resolve server '%s'\n",
conf->host );
- return -1;
+ goto bail;
}
info( "ok\n" );
@@ -328,36 +381,87 @@ socket_connect( const server_conf_t *conf, conn_t *sock )
perror( "socket" );
exit( 1 );
}
+ sock->fd = s;
+ fcntl( s, F_SETFL, O_NONBLOCK );
+ add_fd( s, socket_fd_cb, sock );
- infon( "Connecting to %s:%hu... ", inet_ntoa( addr.sin_addr ),
ntohs( addr.sin_port ) );
+ infon( "Connecting to %s (%s:%hu) ... ",
+ conf->host, inet_ntoa( addr.sin_addr ), ntohs(
addr.sin_port ) );
if (connect( s, (struct sockaddr *)&addr, sizeof(addr) )) {
- close( s );
- perror( "connect" );
- return -1;
+ if (errno != EINPROGRESS) {
+ perror( "connect" );
+ socket_close_internal( sock );
+ goto bail;
+ }
+ conf_fd( s, 0, POLLOUT );
+ sock->state = SCK_CONNECTING;
+ info( "\n" );
+ return;
}
- sock->fd = s;
}
info( "ok\n" );
- return 0;
+ socket_connected2( sock );
+ return;
+
+ bail:
+ socket_connect_bail( sock );
+}
+
+static void
+socket_connected( conn_t *conn )
+{
+ int soerr;
+ socklen_t selen = sizeof(soerr);
+
+ infon( "Connecting to %s: ", conn->conf->host );
+ if (getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, &soerr, &selen )) {
+ perror( "getsockopt" );
+ exit( 1 );
+ }
+ if (soerr) {
+ errno = soerr;
+ perror( "connect" );
+ socket_close_internal( conn );
+ socket_connect_bail( conn );
+ return;
+ }
+ info( "ok\n" );
+ socket_connected2( conn );
+}
+
+static void
+socket_connected2( conn_t *conn )
+{
+ conf_fd( conn->fd, 0, POLLIN );
+ conn->state = SCK_READY;
+ conn->callbacks.connect( 1, conn->callback_aux );
}
+static void
+socket_connect_bail( conn_t *conn )
+{
+ conn->callbacks.connect( 0, conn->callback_aux );
+}
+
+static void dispose_chunk( conn_t *conn );
+
void
socket_close( conn_t *sock )
{
- if (sock->fd >= 0) {
- close( sock->fd );
- sock->fd = -1;
- }
+ if (sock->fd >= 0)
+ socket_close_internal( sock );
#ifdef HAVE_LIBSSL
if (sock->ssl) {
SSL_free( sock->ssl );
sock->ssl = 0;
}
#endif
+ while (sock->write_buf)
+ dispose_chunk( sock );
}
-int
+static void
socket_fill( conn_t *sock )
{
char *buf;
@@ -366,22 +470,46 @@ socket_fill( conn_t *sock )
if (!len) {
error( "Socket error: receive buffer full. Probably protocol
error.\n" );
socket_fail( sock );
- return -1;
+ return;
}
assert( sock->fd >= 0 );
buf = sock->buf + n;
- n =
#ifdef HAVE_LIBSSL
- sock->ssl ? SSL_read( sock->ssl, buf, len ) :
+ if (sock->ssl) {
+ int any = 0;
+ for (;;) {
+ if ((n = ssl_return( "read", sock, SSL_read( sock->ssl,
buf, len ) )) < 0)
+ return;
+ if (!n) {
+ if (!any)
+ return;
+ break;
+ }
+ sock->bytes += n;
+ buf += n;
+ len -= n;
+ if (!len) {
+ if (SSL_pending( sock->ssl ))
+ fake_fd( sock->fd, POLLIN );
+ break;
+ }
+ any = 1;
+ }
+ } else
#endif
- read( sock->fd, buf, len );
- if (n <= 0) {
- socket_perror( "read", sock, n );
- return -1;
- } else {
+ {
+ if ((n = read( sock->fd, buf, len )) < 0) {
+ perror( "read" );
+ socket_fail( sock );
+ return;
+ } else if (!n) {
+ error( "read: unexpected EOF\n" );
+ socket_fail( sock );
+ return;
+ }
sock->bytes += n;
- return 0;
}
+ sock->read_callback( sock->callback_aux );
}
int
@@ -426,40 +554,141 @@ socket_read_line( conn_t *b )
return s;
}
-int
-socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn )
+static int
+do_write( conn_t *sock, char *buf, int len )
{
int n;
assert( sock->fd >= 0 );
- n =
#ifdef HAVE_LIBSSL
- sock->ssl ? SSL_write( sock->ssl, buf, len ) :
+ if (sock->ssl)
+ return ssl_return( "write", sock, SSL_write( sock->ssl, buf,
len ) );
#endif
- write( sock->fd, buf, len );
- if (takeOwn == GiveOwn)
- free( buf );
- if (n != len) {
- socket_perror( "write", sock, n );
- return -1;
+ n = write( sock->fd, buf, len );
+ if (n < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ perror( "write" );
+ socket_fail( sock );
+ } else {
+ n = 0;
+ conf_fd( sock->fd, POLLIN, POLLOUT );
+ }
+ } else if (n != len) {
+ conf_fd( sock->fd, POLLIN, POLLOUT );
}
- return 0;
+ return n;
+}
+
+static void
+dispose_chunk( conn_t *conn )
+{
+ buff_chunk_t *bc = conn->write_buf;
+ if (!(conn->write_buf = bc->next))
+ conn->write_buf_append = &conn->write_buf;
+ if (bc->data != bc->buf)
+ free( bc->data );
+ free( bc );
+}
+
+static int
+do_queued_write( conn_t *conn )
+{
+ buff_chunk_t *bc;
+
+ if (!conn->write_buf)
+ return 0;
+
+ while ((bc = conn->write_buf)) {
+ int n, len = bc->len - conn->write_offset;
+ if ((n = do_write( conn, bc->data + conn->write_offset, len ))
< 0)
+ return -1;
+ if (n != len) {
+ conn->write_offset += n;
+ return 0;
+ }
+ conn->write_offset = 0;
+ dispose_chunk( conn );
+ }
+#ifdef HAVE_LIBSSL
+ if (conn->ssl && SSL_pending( conn->ssl ))
+ fake_fd( conn->fd, POLLIN );
+#endif
+ return conn->write_callback( conn->callback_aux );
+}
+
+static void
+do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+ buff_chunk_t *bc;
+
+ if (takeOwn == GiveOwn) {
+ bc = nfmalloc( offsetof(buff_chunk_t, buf) );
+ bc->data = buf;
+ } else {
+ bc = nfmalloc( offsetof(buff_chunk_t, buf) + len );
+ bc->data = bc->buf;
+ memcpy( bc->data, buf, len );
+ }
+ bc->len = len;
+ bc->next = 0;
+ *conn->write_buf_append = bc;
+ conn->write_buf_append = &bc->next;
}
int
-socket_pending( conn_t *sock )
+socket_write( conn_t *conn, char *buf, int len, ownership_t takeOwn )
+{
+ if (conn->write_buf) {
+ do_append( conn, buf, len, takeOwn );
+ return len;
+ } else {
+ int n = do_write( conn, buf, len );
+ if (n != len && n >= 0) {
+ conn->write_offset = n;
+ do_append( conn, buf, len, takeOwn );
+ } else if (takeOwn) {
+ free( buf );
+ }
+ return n;
+ }
+}
+
+static void
+socket_fd_cb( int events, void *aux )
{
- int num = -1;
+ conn_t *conn = (conn_t *)aux;
+
+ if (events & POLLERR) {
+ error( "Unidentified socket error.\n" );
+ socket_fail( conn );
+ return;
+ }
+
+ if (conn->state == SCK_CONNECTING) {
+ socket_connected( conn );
+ return;
+ }
+
+ if (events & POLLOUT)
+ conf_fd( conn->fd, POLLIN, 0 );
- if (ioctl( sock->fd, FIONREAD, &num ) < 0)
- return -1;
- if (num > 0)
- return num;
#ifdef HAVE_LIBSSL
- if (sock->ssl)
- return SSL_pending( sock->ssl );
+ if (conn->state == SCK_STARTTLS) {
+ start_tls_p2( conn );
+ return;
+ }
+ if (conn->ssl) {
+ if (do_queued_write( conn ) < 0)
+ return;
+ socket_fill( conn );
+ return;
+ }
#endif
- return 0;
+
+ if ((events & POLLOUT) && do_queued_write( conn ) < 0)
+ return;
+ if (events & POLLIN)
+ socket_fill( conn );
}
#ifdef HAVE_LIBSSL
------------------------------------------------------------------------------
Xperia(TM) PLAY
It's a major breakthrough. An authentic gaming
smartphone on the nation's most reliable network.
And it wants your games.
http://p.sf.net/sfu/verizon-sfdev
_______________________________________________
isync-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/isync-devel