attached are two patches for the HTTP-timeout thing. The first one uses a threaded-approach and the second one works without threads. As some tests have shown, the threaded version only performs better in high-load situations, and even then only marginally. Thus, I would like you to test vote on one of four possibilities:
1. commit thread-solution
2. commit non-thread-solution
3. merge both and make it selectable with a precompiler-directive
4. delete the whole thing.
Regards, David
----------------------------------------------------------------- Wapme Systems AG Vogelsanger Weg 80 40470 Düsseldorf
Tel.: + 49 -211-7 48 45 - 2708 Fax: + 49 -211-80-6-06-2801
E-Mail: [EMAIL PROTECTED] Internet: http://www.wapme-systems.de
Index: gwlib/conn.c =================================================================== RCS file: /home/cvs/gateway/gwlib/conn.c,v retrieving revision 1.68 diff -u -b -r1.68 conn.c --- gwlib/conn.c 30 Nov 2003 21:30:09 -0000 1.68 +++ gwlib/conn.c 1 Dec 2003 14:39:12 -0000 @@ -119,6 +119,9 @@ /* socket state */ enum {yes,no} connected; + /* time of last usage */ + unsigned long last_ts; + /* Protected by outlock */ Octstr *outbuf; long outbufpos; /* start of unwritten data in outbuf */ @@ -493,6 +496,17 @@ return -1; } +void conn_update_usage(Connection *conn) { + conn->last_ts = time(NULL); +} + +int conn_is_expired(Connection *conn, unsigned long secs) { + if ((time(NULL) - (conn->last_ts + secs)) < 0) { + return -1; + } + return 0; +} + int conn_get_connect_result(Connection *conn) { int err,len; @@ -901,7 +915,8 @@ return 0; } -int conn_flush(Connection *conn) + +int conn_flush_real(Connection *conn, double tout_seconds) { int ret; int revents; @@ -918,7 +933,7 @@ fd = conn->fd; unlock_out(conn); - revents = gwthread_pollfd(fd, POLLOUT, -1.0); + revents = gwthread_pollfd(fd, POLLOUT, tout_seconds); /* Note: Make sure we have the "out" lock when * going through the loop again, because the @@ -956,6 +971,7 @@ return 0; } + int conn_write(Connection *conn, Octstr *data) { Index: gwlib/conn.h =================================================================== RCS file: /home/cvs/gateway/gwlib/conn.h,v retrieving revision 1.27 diff -u -b -r1.27 conn.h --- gwlib/conn.h 30 Nov 2003 21:30:09 -0000 1.27 +++ gwlib/conn.h 1 Dec 2003 14:39:12 -0000 @@ -220,7 +220,8 @@ * is done, or until the thread is interrupted or woken up. Return 0 * if it worked, 1 if there was an interruption, or -1 if the connection * is broken. */ -int conn_flush(Connection *conn); +int conn_flush_real(Connection *conn, double tout_seconds); +#define conn_flush(conn) conn_flush_real(conn, -1.0) /* Output functions. Each of these takes an open connection and some * data, formats the data and queues it for sending. It may also @@ -323,3 +324,10 @@ #endif /* HAVE_LIBSSL */ int conn_get_id(Connection *conn); + +/* Update the time of last usage for this connection */ +void conn_update_usage(Connection *conn); + +/* Check for expired connection. Returns 0 if connection has not been used + * more than secs seconds, -1 overwise */ +int conn_is_expired(Connection *conn, unsigned long secs); Index: gwlib/http.c =================================================================== RCS file: /home/cvs/gateway/gwlib/http.c,v retrieving revision 1.212 diff -u -b -r1.212 http.c --- gwlib/http.c 26 Nov 2003 17:00:37 -0000 1.212 +++ gwlib/http.c 1 Dec 2003 14:39:12 -0000 @@ -95,11 +95,14 @@ /* - * Default port to connect to for HTTP connections. +* Default port to connect to for HTTP connections. */ enum { HTTP_PORT = 80, HTTPS_PORT = 443 }; +static Dict *active_clients; +static Dict *active_servers; +static Mutex *active_clients_lock; /* * Status of this module. @@ -111,6 +114,8 @@ } run_status = limbo; +static void purge_clients_on_port(int port); +Octstr *pointer_to_octstr(void *p); /* * Read some headers, i.e., until the first empty line (read and discard * the empty line as well). Return -1 for error, 0 for all headers read, @@ -174,7 +179,6 @@ return 1; } - /*********************************************************************** * Proxy support. */ @@ -674,10 +678,9 @@ trans->follow_remaining = follow_remaining; trans->certkeyfile = certkeyfile; trans->ssl = 0; + return trans; } - - static void server_destroy(void *p) { HTTPServer *trans; @@ -696,14 +699,13 @@ gw_free(trans); } - /* * Pool of open, but unused connections to servers or proxies. Key is * "servername:port", value is List with Connection objects. */ static Dict *conn_pool = NULL; static Mutex *conn_pool_lock = NULL; - +/*static long pool_control_thread_id = -1;*/ static void conn_pool_item_destroy(void *item) { @@ -720,20 +722,11 @@ conn_pool_lock = mutex_create(); } - -static void conn_pool_shutdown(void) -{ - dict_destroy(conn_pool); - mutex_destroy(conn_pool_lock); -} - - static Octstr *conn_pool_key(Octstr *host, int port) { return octstr_format("%S:%d", host, port); } - static Connection *conn_pool_get(Octstr *host, int port, int ssl, Octstr *certkeyfile, Octstr *our_host) { @@ -784,7 +777,6 @@ { Octstr *key; List *list; - mutex_lock(conn_pool_lock); key = conn_pool_key(host, port); list = dict_get(conn_pool, key); @@ -798,6 +790,11 @@ } #endif +static void conn_pool_shutdown(void) +{ + dict_destroy(conn_pool); + mutex_destroy(conn_pool_lock); +} /* * Internal lists of completely unhandled requests and requests for which @@ -943,6 +940,7 @@ goto error; } + if (trans->method == HTTP_METHOD_POST) { /* * Add a Content-Length header. Override an existing one, if @@ -1041,6 +1039,7 @@ #ifdef USE_KEEPALIVE if (trans->persistent) { + conn_update_usage(trans->conn); if (proxy_used_for_host(trans->host)) conn_pool_put(trans->conn, proxy_hostname, proxy_port); else @@ -1593,6 +1592,7 @@ if (!client_threads_are_running) { client_fdset = fdset_create(); gwthread_create(write_request_thread, NULL); + /*pool_control_thread_id = gwthread_create(pool_control_thread, NULL);*/ client_threads_are_running = 1; } mutex_unlock(client_thread_lock); @@ -1693,6 +1693,16 @@ fdset_destroy(client_fdset); } +Octstr *pointer_to_octstr(void *p) { + return octstr_format("%ld", p); + /*Octstr *tmp_oct; + char *tmp = (char *) gw_malloc(sizeof(char) * 255); + sprintf(tmp, "%ld", p); + tmp_oct = octstr_create(tmp); + gw_free(tmp); + return tmp_oct;*/ +}; + /*********************************************************************** * HTTP server interface. @@ -1710,19 +1720,25 @@ reading_request_line, reading_request, request_is_being_handled, - sending_reply + sending_reply, + to_be_destroyed } state; int method; /* HTTP_METHOD_ value */ Octstr *url; int use_version_1_0; int persistent_conn; - unsigned long conn_time; /* store time for timeouting */ + time_t conn_time; /* store time for timeouting */ HTTPEntity *request; + Counter *ref_counter; }; - +/* + * creates a new HTTPClient. + * Furthermore the client is registered in the active-clients dictionary. + */ static HTTPClient *client_create(int port, Connection *conn, Octstr *ip) { + Octstr *key; HTTPClient *p; #ifdef HAVE_LIBSSL @@ -1743,18 +1759,29 @@ p->persistent_conn = 1; p->conn_time = time(NULL); p->request = NULL; + p->ref_counter = counter_create(); + + key = pointer_to_octstr(p); + dict_put(active_clients, key, p); + octstr_destroy(key); return p; } - -static void client_destroy(void *client) -{ +/* + * frees resources allocated for a client. + */ +static void client_no_lock_destroy(void *client) { HTTPClient *p; if (client == NULL) return; - p = client; + + if (counter_value(p->ref_counter) > 0) { + debug("gwlib.http", 0, "Will not kill client! Client %p is still referenced.", client); + return; + }; + debug("gwlib.http", 0, "HTTP: Destroying HTTPClient area %p.", p); gw_assert_allocated(p, __FILE__, __LINE__, __func__); debug("gwlib.http", 0, "HTTP: Destroying HTTPClient for `%s'.", @@ -1763,17 +1790,42 @@ octstr_destroy(p->ip); octstr_destroy(p->url); entity_destroy(p->request); + counter_destroy(p->ref_counter); gw_free(p); -} + p = NULL; +}; +/* + * destroys the client and removes it from the active-clients-dictionary. + */ +static void client_destroy(void *client) { + Octstr *key; + debug("gwlib.http", 0, "client_destroy"); + mutex_lock(active_clients_lock); + key = pointer_to_octstr(client); + if (dict_remove(active_clients, key) == NULL) { + octstr_destroy(key); + mutex_unlock(active_clients_lock); + return; + }; + octstr_destroy(key); + client_no_lock_destroy(client); + mutex_unlock(active_clients_lock); +}; static void client_reset(HTTPClient *p) { + Octstr *key; + key = pointer_to_octstr(p); + debug("gwlib.http", 0, "HTTP: Resetting HTTPClient for `%s'.", octstr_get_cstr(p->ip)); p->state = reading_request_line; p->conn_time = time(NULL); gw_assert(p->request == NULL); + counter_set(p->ref_counter, 0L); + dict_put(active_clients, key, p); + octstr_destroy(key); } @@ -1982,24 +2034,33 @@ return -1; } - +/* + * Callback routine for http-requests. + * Implements a state machine: + * reading_request_line -> reading_request + * -> request_is_being_handled -> sending_reply + * + * To avoid the destruction of active clients waiting for a reply, + * those clients are removed from the active-clients dictionary + * upon a reading_request -> request_is_being_handled transition. + */ static void receive_request(Connection *conn, void *data) { HTTPClient *client; Octstr *line; int ret; + client = data; if (run_status != running) { conn_unregister(conn); return; } - - client = data; - for (;;) { + switch (client->state) { case reading_request_line: line = conn_read_line(conn); + client->conn_time = time(NULL); if (line == NULL) { if (conn_eof(conn) || conn_read_error(conn)) goto error; @@ -2008,8 +2069,10 @@ ret = parse_request_line(&client->method, &client->url, &client->use_version_1_0, line); octstr_destroy(line); - if (ret == -1) + if (ret == -1) { goto error; + } + /* * RFC2616 (4.3) says we should read a message body if there * is one, even on GET requests. @@ -2020,9 +2083,13 @@ case reading_request: ret = entity_read(client->request, conn); - if (ret < 0) + client->conn_time = time(NULL); + if (ret < 0) { goto error; + } if (ret == 0) { + /*dict_remove(active_clients, pointer_to_octstr(client));*/ + counter_increase(client->ref_counter); client->state = request_is_being_handled; conn_unregister(conn); port_put_request(client); @@ -2030,10 +2097,12 @@ return; case sending_reply: - if (conn_outbuf_len(conn) > 0) + if (conn_outbuf_len(conn) > 0) { return; + } /* Reply has been sent completely */ if (!client->persistent_conn) { + counter_decrease(client->ref_counter); conn_unregister(conn); client_destroy(client); return; @@ -2043,7 +2112,7 @@ break; default: - panic(0, "Internal error: HTTPClient state is wrong."); + return; } } @@ -2058,12 +2127,159 @@ int ssl; }; +/* check_timeout. Checks for timedout connections. + * each active clients (determined by using the active_clients-dictionary) + * is handled based on its state. Depending on the state different timeout- + * thresholds are used. + * + * NOTE: Some state transitions happen elsewhere in this module. + * + * state | timeout used | action + * """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + * reading_request_line | CONNECTION_TIME_OUT | - the client has not submitted a + * request within the timeout-period. + * this is used for first-time + * communication only. + * - unregister the client. + * STATE = to_be_destroyed. + * reading_request | IDLE_TIME_OUT | - the communication has started, but + * there has not been any input within the + * timeout period. + * - unregister the client. + * STATE = to_be_destroyed. + * request_is_being_handled | none | - A valid request has been submitted, + * thus the client will not be interrupted. + * STATE = request_is_being_handled. + * sending_reply | none | - An answer is being sent to the client, + * thus the client will not be interrupted. + * STATE = sending_reply. + * to_be_destroyed | none | - The client has already been unregistered and + * is now queued for destruction. + * - the client is removed from the list of active + * clients and subsequently destroyed. + */ + +static void process_clients() +{ + List *keys; + int i; + HTTPClient *tmp_client; + time_t now; + double timediff; + now = time(NULL); + + debug("gwlib.http", 0, "check client timeout"); + keys = dict_keys(active_clients); + for (i = 0; run_status == running && i < list_len(keys); i++) { + tmp_client = dict_get(active_clients, list_get(keys, i)); + if (tmp_client == NULL) + continue; + + if (counter_value(tmp_client->ref_counter) > 0) + continue; + timediff = difftime(now, tmp_client->conn_time); + + switch (tmp_client->state) { + case reading_request_line: + if (timediff > CONNECTION_TIME_OUT) { + debug("gwlib.http", 0, "CONNECT TIMEOUT"); + tmp_client->state = to_be_destroyed; + conn_unregister(tmp_client->conn); + } + break; + case reading_request: + if (timediff > IDLE_TIME_OUT) { + debug("gwlib.http", 0, "IDLE TIMEOUT"); + tmp_client->state = to_be_destroyed; + conn_unregister(tmp_client->conn); + } + break; + case request_is_being_handled: + debug("gwlib.http", 0, "State: request_is_being_handled [ignored]"); + break; + case sending_reply: + debug("gwlib.http", 0, "State: sending_reply [ignored]"); + break; + case to_be_destroyed: + dict_remove(active_clients, list_get(keys, i)); + client_no_lock_destroy(tmp_client); + break; + default: + panic(0, "Invalid state %d", tmp_client->state); + }; + } +}; + +/* + * checks and frees all timedout server connections. + */ +static void process_servers(void) +{ + List *keys; + int i; + time_t now; + List *list; + Connection *conn; + Octstr *key; + + debug("gwlib.http", 0, "check server timeout"); + now = time(NULL); + + keys = dict_keys(conn_pool); + while ((key = list_extract_first(keys)) != NULL) { + list = dict_get(conn_pool, key); + i = 0; + while (i < list_len(list)) { + conn = list_get(list, i); + + conn_wait(conn, 0); + if (conn_eof(conn) || conn_read_error(conn)) { + debug("gwlib.http", 0, "Delete connection with fd %d.",conn_get_id(conn)); + conn_destroy(conn); + list_delete(list, i, 1); + } else { + if (conn_is_expired(conn, KEEPALIVE_TIMEOUT) == 0) { + debug("gwlib.http", 0, "Delete connection with fd %d. (timeout)", + conn_get_id(conn)); + conn_destroy(conn); + list_delete(list, i, 1); + } else { + debug("gwlib.http", 0, "Fd %d is alive.",conn_get_id(conn)); + i++; + } + } + } + octstr_destroy(key); + } + + list_destroy(keys, NULL); +}; + +/* + * checks timeout for all avtive clients and for all server-connections. + */ +static void check_timeout(void) +{ + /* + * for each active client check status. + * - based on status check either CONNECTION or IDLE timeout. + * - if the client is timedout, then unregister the callback and + * - set the client's state to "to_be_destroyed" + */ + mutex_lock(active_clients_lock); + process_clients(); + mutex_unlock(active_clients_lock); + mutex_lock(conn_pool_lock); + process_servers(); + mutex_unlock(conn_pool_lock); +}; static void server_thread(void *dummy) { struct pollfd tab[MAX_SERVERS]; int ports[MAX_SERVERS]; int ssl[MAX_SERVERS]; + long i, j, n, fd; int *portno; struct server *p; @@ -2073,9 +2289,15 @@ HTTPClient *client; int ret; + time_t last_check; + time_t now; + int timedout; + n = 0; + last_check = time(NULL); while (run_status == running && keep_servers_open) { - + now = time(NULL); + timedout = 0; if (n == 0 || (n < MAX_SERVERS && list_len(new_server_sockets) > 0)) { p = list_consume(new_server_sockets); if (p == NULL) { @@ -2090,10 +2312,27 @@ gw_free(p); } - if ((ret = gwthread_poll(tab, n, -1.0)) == -1) { + /* poll returns -1 on error and 0 on timeout. we'll handle both */ + if ((ret = gwthread_poll(tab, n, MIN_TIME_OUT)) < 1) { + switch(ret) { + case 0: /* timeout */ + check_timeout(); + last_check = now; + continue; + break; + case -1: /* error */ if (errno != EINTR) /* a signal was caught during poll() function */ warning(0, "HTTP: gwthread_poll failed."); continue; + break; + default: /* should not happen */ + panic(0, "This is not supposed to happen!"); + } + } + + if ((ret == 0) || (difftime(now, last_check) > MIN_TIME_OUT)) { + check_timeout(); + last_check = time(NULL); } for (i = 0; i < n; ++i) { @@ -2108,19 +2347,17 @@ ports[i] = -1; ssl[i] = 0; } else { - Octstr *client_ip = host_ip(addr); /* * Be aware that conn_wrap_fd() will return NULL if SSL * handshake has failed, so we only client_create() if * there is an conn. */ if ((conn = conn_wrap_fd(fd, ssl[i]))) { - client = client_create(ports[i], conn, client_ip); + client = client_create(ports[i], conn, host_ip(addr)); conn_register(conn, server_fdset, receive_request, client); } else { error(0, "HTTP: unsuccessfull SSL handshake for client `%s'", - octstr_get_cstr(client_ip)); - octstr_destroy(client_ip); + octstr_get_cstr(host_ip(addr))); } } } @@ -2129,6 +2366,7 @@ while ((portno = list_extract_first(closed_server_sockets)) != NULL) { for (i = 0; i < n; ++i) { if (ports[i] == *portno) { + purge_clients_on_port(ports[i]); (void) close(tab[i].fd); port_remove(ports[i]); tab[i].fd = -1; @@ -2153,6 +2391,7 @@ /* make sure we close all ports */ for (i = 0; i < n; ++i) { + purge_clients_on_port(ports[i]); (void) close(tab[i].fd); port_remove(ports[i]); } @@ -2180,7 +2419,6 @@ } } - int http_open_port_if(int port, int ssl, Octstr *interface) { struct server *p; @@ -2290,8 +2528,8 @@ List **cgivars) { HTTPClient *client; - client = port_get_request(port); + if (client == NULL) { debug("gwlib.http", 0, "HTTP: No clients with requests, quitting."); return NULL; @@ -2316,7 +2554,6 @@ client->request->body = NULL; entity_destroy(client->request); client->request = NULL; - return client; } @@ -2361,6 +2598,7 @@ if (ret == 0) { /* HTTP/1.0 or 1.1, hence keep-alive or keep-alive */ if (!client->persistent_conn) { + counter_decrease(client->ref_counter); client_destroy(client); } else { /* XXX mark this HTTPClient in the keep-alive cleaner thread */ @@ -2375,6 +2613,7 @@ } /* error while sending response */ else { + counter_decrease(client->ref_counter); client_destroy(client); } } @@ -3181,6 +3420,10 @@ { gw_assert(run_status == limbo); + active_clients = dict_create(1024, NULL); + active_servers = dict_create(1024, NULL); + active_clients_lock = mutex_create(); + #ifdef HAVE_LIBSSL openssl_init_locks(); conn_init_ssl(); @@ -3198,17 +3441,43 @@ } +/* + * unregisters and destroys all clients depending on this port. + * port - the portnumer whose clients shall be flushed + */ +static void purge_clients_on_port(int port) +{ + List *keys; + HTTPClient *client; + int i; + + gw_assert(active_clients != NULL && active_clients_lock != NULL); + debug("gwlib.http", 0, "Purging..."); + keys = dict_keys(active_clients); + mutex_lock(active_clients_lock); + for (i = 0; i < list_len(keys); ++i) { + client = dict_get(active_clients, list_get(keys, i)); + if (client->port == port) { + client->state = to_be_destroyed; + conn_unregister(client->conn); + client_no_lock_destroy(client); + } + }; + mutex_unlock(active_clients_lock); + debug("gwlib.http", 0, "...finished purging."); +}; + void http_shutdown(void) { gwlib_assert_init(); gw_assert(run_status == running); - run_status = terminating; conn_pool_shutdown(); port_shutdown(); client_shutdown(); server_shutdown(); + dict_destroy(active_clients); proxy_shutdown(); #ifdef HAVE_LIBSSL openssl_shutdown_locks(); Index: gwlib/http.h =================================================================== RCS file: /home/cvs/gateway/gwlib/http.h,v retrieving revision 1.60 diff -u -b -r1.60 http.h --- gwlib/http.h 15 Nov 2003 13:14:23 -0000 1.60 +++ gwlib/http.h 1 Dec 2003 14:39:12 -0000 @@ -194,13 +194,21 @@ Octstr *value; } HTTPCGIVar; +/* + * timeout-values in seconds. + */ +enum { + MIN_TIME_OUT=10, /* this is the sleep-duration for the timeout-thread */ + CONNECTION_TIME_OUT=50, + KEEPALIVE_TIMEOUT=600, + IDLE_TIME_OUT=10 +}; /* * Initialization function. This MUST be called before any other function * declared in this header file. */ void http_init(void); - /* * Shutdown function. This MUST be called when no other function
Index: gwlib/conn.c =================================================================== RCS file: /home/cvs/gateway/gwlib/conn.c,v retrieving revision 1.68 diff -u -b -r1.68 conn.c --- gwlib/conn.c 30 Nov 2003 21:30:09 -0000 1.68 +++ gwlib/conn.c 1 Dec 2003 15:17:08 -0000 @@ -119,6 +119,9 @@ /* socket state */ enum {yes,no} connected; + /* time of last usage */ + unsigned long last_ts; + /* Protected by outlock */ Octstr *outbuf; long outbufpos; /* start of unwritten data in outbuf */ @@ -493,6 +496,17 @@ return -1; } +void conn_update_usage(Connection *conn) { + conn->last_ts = time(NULL); +} + +int conn_is_expired(Connection *conn, unsigned long secs) { + if ((time(NULL) - (conn->last_ts + secs)) < 0) { + return -1; + } + return 0; +} + int conn_get_connect_result(Connection *conn) { int err,len; @@ -901,7 +915,8 @@ return 0; } -int conn_flush(Connection *conn) + +int conn_flush_real(Connection *conn, double tout_seconds) { int ret; int revents; @@ -918,7 +933,7 @@ fd = conn->fd; unlock_out(conn); - revents = gwthread_pollfd(fd, POLLOUT, -1.0); + revents = gwthread_pollfd(fd, POLLOUT, tout_seconds); /* Note: Make sure we have the "out" lock when * going through the loop again, because the @@ -956,6 +971,7 @@ return 0; } + int conn_write(Connection *conn, Octstr *data) { Index: gwlib/conn.h =================================================================== RCS file: /home/cvs/gateway/gwlib/conn.h,v retrieving revision 1.27 diff -u -b -r1.27 conn.h --- gwlib/conn.h 30 Nov 2003 21:30:09 -0000 1.27 +++ gwlib/conn.h 1 Dec 2003 15:17:08 -0000 @@ -220,7 +220,8 @@ * is done, or until the thread is interrupted or woken up. Return 0 * if it worked, 1 if there was an interruption, or -1 if the connection * is broken. */ -int conn_flush(Connection *conn); +int conn_flush_real(Connection *conn, double tout_seconds); +#define conn_flush(conn) conn_flush_real(conn, -1.0) /* Output functions. Each of these takes an open connection and some * data, formats the data and queues it for sending. It may also @@ -323,3 +324,10 @@ #endif /* HAVE_LIBSSL */ int conn_get_id(Connection *conn); + +/* Update the time of last usage for this connection */ +void conn_update_usage(Connection *conn); + +/* Check for expired connection. Returns 0 if connection has not been used + * more than secs seconds, -1 overwise */ +int conn_is_expired(Connection *conn, unsigned long secs); Index: gwlib/http.c =================================================================== RCS file: /home/cvs/gateway/gwlib/http.c,v retrieving revision 1.212 diff -u -b -r1.212 http.c --- gwlib/http.c 26 Nov 2003 17:00:37 -0000 1.212 +++ gwlib/http.c 1 Dec 2003 15:17:08 -0000 @@ -95,11 +95,15 @@ /* - * Default port to connect to for HTTP connections. +* Default port to connect to for HTTP connections. */ enum { HTTP_PORT = 80, HTTPS_PORT = 443 }; +static Dict *active_clients; +static Dict *active_servers; +static Mutex *active_clients_lock; +static void check_timeout(void); /* * Status of this module. @@ -111,6 +115,8 @@ } run_status = limbo; +static void purge_clients_on_port(int port); +Octstr *pointer_to_octstr(void *p); /* * Read some headers, i.e., until the first empty line (read and discard * the empty line as well). Return -1 for error, 0 for all headers read, @@ -174,7 +180,6 @@ return 1; } - /*********************************************************************** * Proxy support. */ @@ -674,10 +679,9 @@ trans->follow_remaining = follow_remaining; trans->certkeyfile = certkeyfile; trans->ssl = 0; + return trans; } - - static void server_destroy(void *p) { HTTPServer *trans; @@ -696,14 +700,15 @@ gw_free(trans); } - /* * Pool of open, but unused connections to servers or proxies. Key is * "servername:port", value is List with Connection objects. */ static Dict *conn_pool = NULL; static Mutex *conn_pool_lock = NULL; - +static long timeout_control_thread_id = -1; +static Mutex *timeout_thread_lock = NULL; +static volatile sig_atomic_t timeout_thread_is_running = 0; static void conn_pool_item_destroy(void *item) { @@ -721,12 +726,6 @@ } -static void conn_pool_shutdown(void) -{ - dict_destroy(conn_pool); - mutex_destroy(conn_pool_lock); -} - static Octstr *conn_pool_key(Octstr *host, int port) { @@ -784,7 +783,6 @@ { Octstr *key; List *list; - mutex_lock(conn_pool_lock); key = conn_pool_key(host, port); list = dict_get(conn_pool, key); @@ -798,6 +796,56 @@ } #endif +static void timeout_control_thread(void *arg) { + while (run_status == running) { + debug("",0, "timeout_control sleeping"); + gwthread_sleep(MIN_TIME_OUT); + debug("",0, "timeout_control awake"); + check_timeout(); + } + timeout_control_thread_id = -1; +} + +static void start_timeout_control_thread() { + if (!timeout_thread_is_running) { + /* + * To be really certain, we must repeat the test, but use the + * lock first. If the test failed, however, we _know_ we've + * already initialized. This strategy of double testing avoids + * using the lock more than a few times at startup. + */ + mutex_lock(timeout_thread_lock); + if (!timeout_thread_is_running) { + timeout_control_thread_id = gwthread_create(timeout_control_thread, NULL); + timeout_thread_is_running = 1; + } + mutex_unlock(timeout_thread_lock); + } +}; + + +static void timeout_control_shutdown() { + mutex_lock(timeout_thread_lock); + if (timeout_control_thread_id >= 0) + gwthread_wakeup(timeout_control_thread_id); + gwthread_join_every(timeout_control_thread); + mutex_unlock(timeout_thread_lock); + mutex_destroy(timeout_thread_lock); +}; + +static void timeout_control_init(void) +{ + timeout_thread_lock = mutex_create(); + active_clients = dict_create(1024, NULL); + active_servers = dict_create(1024, NULL); + active_clients_lock = mutex_create(); +}; + +static void conn_pool_shutdown(void) +{ + dict_destroy(conn_pool); + mutex_destroy(conn_pool_lock); +} /* * Internal lists of completely unhandled requests and requests for which @@ -805,7 +853,6 @@ */ static List *pending_requests = NULL; - /* * Have background threads been started? */ @@ -943,6 +990,7 @@ goto error; } + if (trans->method == HTTP_METHOD_POST) { /* * Add a Content-Length header. Override an existing one, if @@ -1041,6 +1089,7 @@ #ifdef USE_KEEPALIVE if (trans->persistent) { + conn_update_usage(trans->conn); if (proxy_used_for_host(trans->host)) conn_pool_put(trans->conn, proxy_hostname, proxy_port); else @@ -1593,6 +1642,7 @@ if (!client_threads_are_running) { client_fdset = fdset_create(); gwthread_create(write_request_thread, NULL); + start_timeout_control_thread(); client_threads_are_running = 1; } mutex_unlock(client_thread_lock); @@ -1693,6 +1743,10 @@ fdset_destroy(client_fdset); } +Octstr *pointer_to_octstr(void *p) { + return octstr_format("%ld", p); +}; + /*********************************************************************** * HTTP server interface. @@ -1710,21 +1764,29 @@ reading_request_line, reading_request, request_is_being_handled, - sending_reply + sending_reply, + to_be_destroyed } state; int method; /* HTTP_METHOD_ value */ Octstr *url; int use_version_1_0; int persistent_conn; - unsigned long conn_time; /* store time for timeouting */ + time_t conn_time; /* store time for timeouting */ HTTPEntity *request; + Counter *ref_counter; }; - +/* + * creates a new HTTPClient. + * Furthermore the client is registered in the active-clients dictionary. + */ static HTTPClient *client_create(int port, Connection *conn, Octstr *ip) { + Octstr *key; HTTPClient *p; + debug("", 0, "client_create"); + #ifdef HAVE_LIBSSL if (conn_get_ssl(conn)) debug("gwlib.http", 0, "HTTP: Creating SSL-enabled HTTPClient for `%s', using cipher '%s'.", @@ -1743,18 +1805,30 @@ p->persistent_conn = 1; p->conn_time = time(NULL); p->request = NULL; + p->ref_counter = counter_create(); + + key = pointer_to_octstr(p); + dict_put(active_clients, key, p); + octstr_destroy(key); return p; } - -static void client_destroy(void *client) -{ +/* + * frees resources allocated for a client. + */ +static void client_no_lock_destroy(void *client) { HTTPClient *p; + debug("", 0, "client_no_lock_destroy"); if (client == NULL) return; - p = client; + + if (counter_value(p->ref_counter) > 0) { + debug("", 0, "Will not kill client! Client %p is still referenced.", client); + return; + }; + debug("gwlib.http", 0, "HTTP: Destroying HTTPClient area %p.", p); gw_assert_allocated(p, __FILE__, __LINE__, __func__); debug("gwlib.http", 0, "HTTP: Destroying HTTPClient for `%s'.", @@ -1763,17 +1837,42 @@ octstr_destroy(p->ip); octstr_destroy(p->url); entity_destroy(p->request); + counter_destroy(p->ref_counter); gw_free(p); -} + p = NULL; +}; +/* + * destroys the client and removes it from the active-clients-dictionary. + */ +static void client_destroy(void *client) { + Octstr *key; + debug("", 0, "client_destroy"); + mutex_lock(active_clients_lock); + key = pointer_to_octstr(client); + if (dict_remove(active_clients, key) == NULL) { + octstr_destroy(key); + mutex_unlock(active_clients_lock); + return; + }; + octstr_destroy(key); + client_no_lock_destroy(client); + mutex_unlock(active_clients_lock); +}; static void client_reset(HTTPClient *p) { + Octstr *key; + key = pointer_to_octstr(p); + debug("gwlib.http", 0, "HTTP: Resetting HTTPClient for `%s'.", octstr_get_cstr(p->ip)); p->state = reading_request_line; p->conn_time = time(NULL); gw_assert(p->request == NULL); + counter_set(p->ref_counter, 0L); + dict_put(active_clients, key, p); + octstr_destroy(key); } @@ -1982,24 +2081,33 @@ return -1; } - +/* + * Callback routine for http-requests. + * Implements a state machine: + * reading_request_line -> reading_request + * -> request_is_being_handled -> sending_reply + * + * To avoid the destruction of active clients waiting for a reply, + * those clients are removed from the active-clients dictionary + * upon a reading_request -> request_is_being_handled transition. + */ static void receive_request(Connection *conn, void *data) { HTTPClient *client; Octstr *line; int ret; + client = data; if (run_status != running) { conn_unregister(conn); return; } - - client = data; - for (;;) { + switch (client->state) { case reading_request_line: line = conn_read_line(conn); + client->conn_time = time(NULL); if (line == NULL) { if (conn_eof(conn) || conn_read_error(conn)) goto error; @@ -2008,8 +2116,10 @@ ret = parse_request_line(&client->method, &client->url, &client->use_version_1_0, line); octstr_destroy(line); - if (ret == -1) + if (ret == -1) { goto error; + } + /* * RFC2616 (4.3) says we should read a message body if there * is one, even on GET requests. @@ -2020,9 +2130,13 @@ case reading_request: ret = entity_read(client->request, conn); - if (ret < 0) + client->conn_time = time(NULL); + if (ret < 0) { goto error; + } if (ret == 0) { + /*dict_remove(active_clients, pointer_to_octstr(client));*/ + counter_increase(client->ref_counter); client->state = request_is_being_handled; conn_unregister(conn); port_put_request(client); @@ -2030,10 +2144,12 @@ return; case sending_reply: - if (conn_outbuf_len(conn) > 0) + if (conn_outbuf_len(conn) > 0) { return; + } /* Reply has been sent completely */ if (!client->persistent_conn) { + counter_decrease(client->ref_counter); conn_unregister(conn); client_destroy(client); return; @@ -2043,7 +2159,7 @@ break; default: - panic(0, "Internal error: HTTPClient state is wrong."); + return; } } @@ -2058,12 +2174,153 @@ int ssl; }; +/* check_timeout. Checks for timedout connections. + * each active clients (determined by using the active_clients-dictionary) + * is handled based on its state. Depending on the state different timeout- + * thresholds are used. + * + * NOTE: Some state transitions happen elsewhere in this module. + * + * state | timeout used | action + * """""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + * reading_request_line | CONNECTION_TIME_OUT | - the client has not submitted a + * request within the timeout-period. + * this is used for first-time + * communication only. + * - unregister the client. + * STATE = to_be_destroyed. + * reading_request | IDLE_TIME_OUT | - the communication has started, but + * there has not been any input within the + * timeout period. + * - unregister the client. + * STATE = to_be_destroyed. + * request_is_being_handled | none | - A valid request has been submitted, + * thus the client will not be interrupted. + * STATE = request_is_being_handled. + * sending_reply | none | - An answer is being sent to the client, + * thus the client will not be interrupted. + * STATE = sending_reply. + * to_be_destroyed | none | - The client has already been unregistered and + * is now queued for destruction. + * - the client is removed from the list of active + * clients and subsequently destroyed. + */ + +static void process_clients() +{ + List *keys; + int i; + HTTPClient *tmp_client; + time_t now; + double timediff; + now = time(NULL); + + debug("", 0, "check client timeout"); + keys = dict_keys(active_clients); + for (i = 0; run_status == running && i < list_len(keys); i++) { + tmp_client = dict_get(active_clients, list_get(keys, i)); + if (tmp_client == NULL) + continue; + + if (counter_value(tmp_client->ref_counter) > 0) + continue; + timediff = difftime(now, tmp_client->conn_time); + + switch (tmp_client->state) { + case reading_request_line: + if (timediff > CONNECTION_TIME_OUT) { + debug("", 0, "CONNECT TIMEOUT"); + tmp_client->state = to_be_destroyed; + conn_unregister(tmp_client->conn); + } + break; + case reading_request: + if (timediff > IDLE_TIME_OUT) { + debug("", 0, "IDLE TIMEOUT"); + tmp_client->state = to_be_destroyed; + conn_unregister(tmp_client->conn); + } + break; + case request_is_being_handled: + debug("", 0, "State: request_is_being_handled [ignored]"); + break; + case sending_reply: + debug("", 0, "State: sending_reply [ignored]"); + break; + case to_be_destroyed: + dict_remove(active_clients, list_get(keys, i)); + client_no_lock_destroy(tmp_client); + break; + default: + panic(0, "Invalid state %d", tmp_client->state); + }; + } +}; + +static void process_servers(void) +{ + List *keys; + int i; + time_t now; + List *list; + Connection *conn; + Octstr *key; + + debug("", 0, "check server timeout"); + now = time(NULL); + + keys = dict_keys(conn_pool); + while ((key = list_extract_first(keys)) != NULL) { + list = dict_get(conn_pool, key); + i = 0; + while (i < list_len(list)) { + conn = list_get(list, i); + + conn_wait(conn, 0); + if (conn_eof(conn) || conn_read_error(conn)) { + debug("gwlib.http", 0, "Delete connection with fd %d.",conn_get_id(conn)); + conn_destroy(conn); + list_delete(list, i, 1); + } else { + if (conn_is_expired(conn, KEEPALIVE_TIMEOUT) == 0) { + debug("gwlib.http", 0, "Delete connection with fd %d. (timeout)", + conn_get_id(conn)); + conn_destroy(conn); + list_delete(list, i, 1); + } else { + debug("gwlib.http", 0, "Fd %d is alive.",conn_get_id(conn)); + i++; + } + } + } + octstr_destroy(key); + } + + list_destroy(keys, NULL); +}; + +static void check_timeout(void) +{ + /* + * for each active client check status. + * - based on status check either CONNECTION or IDLE timeout. + * - if the client is timedout, then unregister the callback and + * - set the client's state to "to_be_destroyed" + */ + mutex_lock(active_clients_lock); + process_clients(); + mutex_unlock(active_clients_lock); + mutex_lock(conn_pool_lock); + process_servers(); + mutex_unlock(conn_pool_lock); +}; static void server_thread(void *dummy) { struct pollfd tab[MAX_SERVERS]; int ports[MAX_SERVERS]; int ssl[MAX_SERVERS]; + long i, j, n, fd; int *portno; struct server *p; @@ -2073,9 +2330,13 @@ HTTPClient *client; int ret; + time_t last_check; + time_t now; + n = 0; + last_check = time(NULL); while (run_status == running && keep_servers_open) { - + now = time(NULL); if (n == 0 || (n < MAX_SERVERS && list_len(new_server_sockets) > 0)) { p = list_consume(new_server_sockets); if (p == NULL) { @@ -2090,10 +2351,19 @@ gw_free(p); } - if ((ret = gwthread_poll(tab, n, -1.0)) == -1) { + /* poll returns -1 on error and 0 on timeout. we'll handle both */ + if ((ret = gwthread_poll(tab, n, MIN_TIME_OUT)) < 1) { + switch(ret) { + case 0: /* timeout */ + break; + case -1: /* error */ if (errno != EINTR) /* a signal was caught during poll() function */ warning(0, "HTTP: gwthread_poll failed."); continue; + break; + default: /* should not happen */ + panic(0, "This is not supposed to happen!"); + } } for (i = 0; i < n; ++i) { @@ -2129,6 +2399,7 @@ while ((portno = list_extract_first(closed_server_sockets)) != NULL) { for (i = 0; i < n; ++i) { if (ports[i] == *portno) { + purge_clients_on_port(ports[i]); (void) close(tab[i].fd); port_remove(ports[i]); tab[i].fd = -1; @@ -2153,6 +2424,7 @@ /* make sure we close all ports */ for (i = 0; i < n; ++i) { + purge_clients_on_port(ports[i]); (void) close(tab[i].fd); port_remove(ports[i]); } @@ -2174,13 +2446,13 @@ if (!server_thread_is_running) { server_fdset = fdset_create(); server_thread_id = gwthread_create(server_thread, NULL); + start_timeout_control_thread(); server_thread_is_running = 1; } mutex_unlock(server_thread_lock); } } - int http_open_port_if(int port, int ssl, Octstr *interface) { struct server *p; @@ -2290,8 +2562,8 @@ List **cgivars) { HTTPClient *client; - client = port_get_request(port); + if (client == NULL) { debug("gwlib.http", 0, "HTTP: No clients with requests, quitting."); return NULL; @@ -2316,7 +2588,6 @@ client->request->body = NULL; entity_destroy(client->request); client->request = NULL; - return client; } @@ -2361,6 +2632,7 @@ if (ret == 0) { /* HTTP/1.0 or 1.1, hence keep-alive or keep-alive */ if (!client->persistent_conn) { + counter_decrease(client->ref_counter); client_destroy(client); } else { /* XXX mark this HTTPClient in the keep-alive cleaner thread */ @@ -2375,6 +2647,7 @@ } /* error while sending response */ else { + counter_decrease(client->ref_counter); client_destroy(client); } } @@ -3181,6 +3454,7 @@ { gw_assert(run_status == limbo); + #ifdef HAVE_LIBSSL openssl_init_locks(); conn_init_ssl(); @@ -3189,6 +3463,7 @@ client_init(); conn_pool_init(); server_init(); + timeout_control_init(); #ifdef HAVE_LIBSSL server_ssl_init(); #endif /* HAVE_LIBSSL */ @@ -3198,17 +3473,44 @@ } +/* + * unregisters and destroys all clients depending on this port. + * port - the portnumer whose clients shall be flushed + */ +static void purge_clients_on_port(int port) +{ + List *keys; + HTTPClient *client; + int i; + + gw_assert(active_clients != NULL && active_clients_lock != NULL); + debug("", 0, "Purging..."); + keys = dict_keys(active_clients); + mutex_lock(active_clients_lock); + for (i = 0; i < list_len(keys); ++i) { + client = dict_get(active_clients, list_get(keys, i)); + if (client->port == port) { + client->state = to_be_destroyed; + conn_unregister(client->conn); + client_no_lock_destroy(client); + } + }; + mutex_unlock(active_clients_lock); + debug("", 0, "...finished purging."); +}; + void http_shutdown(void) { gwlib_assert_init(); gw_assert(run_status == running); - run_status = terminating; + timeout_control_shutdown(); conn_pool_shutdown(); port_shutdown(); client_shutdown(); server_shutdown(); + dict_destroy(active_clients); proxy_shutdown(); #ifdef HAVE_LIBSSL openssl_shutdown_locks(); Index: gwlib/http.h =================================================================== RCS file: /home/cvs/gateway/gwlib/http.h,v retrieving revision 1.60 diff -u -b -r1.60 http.h --- gwlib/http.h 15 Nov 2003 13:14:23 -0000 1.60 +++ gwlib/http.h 1 Dec 2003 15:17:08 -0000 @@ -194,13 +194,21 @@ Octstr *value; } HTTPCGIVar; +/* + * timeout-values in seconds. + */ +enum { + MIN_TIME_OUT=10, /* this is the sleep-duration for the timeout-thread */ + CONNECTION_TIME_OUT=50, + KEEPALIVE_TIMEOUT=600, + IDLE_TIME_OUT=10 +}; /* * Initialization function. This MUST be called before any other function * declared in this header file. */ void http_init(void); - /* * Shutdown function. This MUST be called when no other function