Hi all, for the more easily testing of smpp timeout patches, here is the combined one. for more details, what every patch does, please see appropriate mail threads: 1) [Patch] To add timeouts to SMPP connections 2) [PATCH] adds wait-ack to smpp
Please review and test it. Comments and votes are very welcome... -- Best regards / Mit besten Grüßen aus Düsseldorf Dipl.-Ing. Alexander Malysh ___________________________________ Centrium GmbH Vogelsanger Weg 80 40470 Düsseldorf Fon: +49 (0211) 74 84 51 80 Fax: +49 (0211) 277 49 109 email: a.malysh at centrium.de web: http://www.centrium.de msn: olek2002 at hotmail.com icq: 98063111 ___________________________________________ Please avoid sending me Word or PowerPoint attachments. See http://www.fsf.org/philosophy/no-word-attachments.html
Index: doc/userguide/userguide.xml =================================================================== RCS file: /home/cvs/gateway/doc/userguide/userguide.xml,v retrieving revision 1.241 diff -a -u -r1.241 userguide.xml --- doc/userguide/userguide.xml 11 Aug 2003 20:58:29 -0000 1.241 +++ doc/userguide/userguide.xml 20 Aug 2003 17:56:26 -0000 @@ -2928,6 +2928,33 @@ should be presented for this directive. </entry></row> + <row><entry><literal>connection-timeout</literal></entry> + <entry><literal>number (seconds)</literal></entry> + <entry valign="bottom"> + This timer specifies the maximum time lapse allowed + between transactions , after which period of inactivity, an SMPP driver may + assume that the session is no longer active and does reconnect. + Defaults to 300 seconds, to disable set it to 0. + </entry></row> + + <row><entry><literal>wait-ack</literal></entry> + <entry><literal>number (seconds)</literal></entry> + <entry valign="bottom"> + A message is resent if the acknowledge from SMSC takes more than + this time. Defaults to 60 seconds. + </entry></row> + + <row><entry><literal>wait-ack-expire</literal></entry> + <entry><literal>number</literal></entry> + <entry valign="bottom"> + Defines what kind of action should be taken if the ack of + a message expires. The options for this value are: + 0x00 - disconnect/reconnect, (default) 0x01 - as is now, requeue, + but this could potentially result in the msg arriving twice + 0x02 - just carry on waiting (given that the wait-ack should never + expire this is the mst accurate) + </entry></row> + </tbody></tgroup></informaltable> </sect2> Index: gwlib/cfg.def =================================================================== RCS file: /home/cvs/gateway/gwlib/cfg.def,v retrieving revision 1.92 diff -a -u -r1.92 cfg.def --- gwlib/cfg.def 8 Aug 2003 14:24:38 -0000 1.92 +++ gwlib/cfg.def 20 Aug 2003 17:56:26 -0000 @@ -277,6 +277,7 @@ OCTSTR(notification-addr) OCTSTR(msg-id-type) OCTSTR(no-dlr) + OCTSTR(connection-timeout) ) Index: gw/smsc/smsc_smpp.c =================================================================== RCS file: /home/cvs/gateway/gw/smsc/smsc_smpp.c,v retrieving revision 1.46 diff -a -u -r1.46 smsc_smpp.c --- gw/smsc/smsc_smpp.c 3 Aug 2003 22:36:35 -0000 1.46 +++ gw/smsc/smsc_smpp.c 20 Aug 2003 17:56:27 -0000 @@ -54,8 +54,17 @@ #define SMPP_DEFAULT_VERSION 0x34 #define SMPP_DEFAULT_PRIORITY 0 #define SMPP_THROTTLING_SLEEP_TIME 15 +#define SMPP_DEFAULT_CONNECTION_TIMEOUT 10 * SMPP_ENQUIRE_LINK_INTERVAL +#define SMPP_DEFAULT_WAITACK 60 +/* + * Some defines + */ +#define SMPP_WAITACK_RECONNECT 0X00 +#define SMPP_WAITACK_REQUEUE 0X01 +#define SMPP_WAITACK_NEVER_EXPIRE 0x02 + /*********************************************************************** * Implementation of the actual SMPP protocol: reading and writing * PDUs in the correct order. @@ -91,9 +100,49 @@ int smpp_msg_id_type; /* msg id in C string, hex or decimal */ int autodetect_addr; Octstr *alt_charset; + long connection_timeout; + long wait_ack; + int wait_ack_action; SMSCConn *conn; } SMPP; + +struct smpp_msg { + time_t sent_time; + Msg *msg; +}; + + +/* + * create smpp_msg struct + */ +static struct smpp_msg* smpp_msg_create(Msg *msg) +{ + struct smpp_msg *result = gw_malloc(sizeof(struct smpp_msg)); + + gw_assert(result != NULL); + result->sent_time = time(NULL); + result->msg = msg; + + return result; +} + + +/* + * destroy smpp_msg struct. If destroy_msg flag is set, then message will be freed as well + */ +static void smpp_msg_destroy(struct smpp_msg *msg, int destroy_msg) +{ + /* sanity check */ + if (msg == NULL) + return; + + if (destroy_msg && msg->msg != NULL) + msg_destroy(msg->msg); + + gw_free(msg); +} + static SMPP *smpp_create(SMSCConn *conn, Octstr *host, int transmit_port, int receive_port, Octstr *system_type, @@ -105,7 +154,8 @@ int max_pending_submits, int version, int priority, Octstr *my_number, int smpp_msg_id_type, int autodetect_addr, Octstr *alt_charset, - Octstr *service_type) + Octstr *service_type, long connection_timeout, + long wait_ack, int wait_ack_action) { SMPP *smpp; @@ -113,7 +163,7 @@ smpp->transmitter = -1; smpp->receiver = -1; smpp->msgs_to_send = list_create(); - smpp->sent_msgs = dict_create(16, NULL); + smpp->sent_msgs = dict_create(max_pending_submits, NULL); list_add_producer(smpp->msgs_to_send); smpp->received_msgs = list_create(); smpp->message_id_counter = counter_create(); @@ -141,6 +191,9 @@ smpp->smpp_msg_id_type = smpp_msg_id_type; smpp->autodetect_addr = autodetect_addr; smpp->alt_charset = octstr_duplicate(alt_charset); + smpp->connection_timeout = connection_timeout; + smpp->wait_ack = wait_ack; + smpp->wait_ack_action = wait_ack_action; return smpp; } @@ -599,7 +652,7 @@ if (*pending_submits == -1) return; - if (smpp->conn->throughput) { + if (smpp->conn->throughput > 0) { delay = 1.0 / smpp->conn->throughput; } @@ -616,9 +669,10 @@ continue; } /* check for write errors */ - if (send_pdu(conn, smpp->conn->id, pdu) != -1) { + if (send_pdu(conn, smpp->conn->id, pdu) == 0) { + struct smpp_msg *smpp_msg = smpp_msg_create(msg); os = octstr_format("%ld", pdu->u.submit_sm.sequence_number); - dict_put(smpp->sent_msgs, os, msg); + dict_put(smpp->sent_msgs, os, smpp_msg); smpp_pdu_destroy(pdu); octstr_destroy(os); ++(*pending_submits); @@ -630,7 +684,7 @@ } else { /* write error occurs */ smpp_pdu_destroy(pdu); - list_produce(smpp->msgs_to_send, msg); + bb_smscconn_send_failed(smpp->conn, msg, SMSCCONN_FAILED_TEMPORARILY, NULL); break; } } @@ -751,6 +805,7 @@ Msg *msg, *dlrmsg = NULL; long reason; long cmd_stat; + struct smpp_msg *smpp_msg = NULL; resp = NULL; @@ -935,14 +990,18 @@ case submit_sm_resp: os = octstr_format("%ld", pdu->u.submit_sm_resp.sequence_number); - msg = dict_remove(smpp->sent_msgs, os); + smpp_msg = dict_remove(smpp->sent_msgs, os); octstr_destroy(os); - if (msg == NULL) { + if (smpp_msg == NULL) { warning(0, "SMPP[%s]: SMSC sent submit_sm_resp " "with wrong sequence number 0x%08lx", octstr_get_cstr(smpp->conn->id), pdu->u.submit_sm_resp.sequence_number); - } else if (pdu->u.submit_sm_resp.command_status != 0) { + break; + } + msg = smpp_msg->msg; + smpp_msg_destroy(smpp_msg, 0); + if (pdu->u.submit_sm_resp.command_status != 0) { error(0, "SMPP[%s]: SMSC returned error code 0x%08lx (%s) " "in response to submit_sm.", octstr_get_cstr(smpp->conn->id), @@ -1062,16 +1121,19 @@ cmd_stat = pdu->u.generic_nack.command_status; os = octstr_format("%ld", pdu->u.generic_nack.sequence_number); - msg = dict_remove(smpp->sent_msgs, os); + smpp_msg = dict_remove(smpp->sent_msgs, os); octstr_destroy(os); - if (msg == NULL) { + if (smpp_msg == NULL) { error(0, "SMPP[%s]: SMSC rejected last command" "code 0x%08lx (%s).", octstr_get_cstr(smpp->conn->id), cmd_stat, smpp_error_to_string(cmd_stat)); } else { + msg = smpp_msg->msg; + smpp_msg_destroy(smpp_msg, 0); + error(0, "SMPP[%s]: SMSC returned error code 0x%08lx (%s) " "in response to submit_sm.", octstr_get_cstr(smpp->conn->id), @@ -1130,6 +1192,67 @@ } +/* + * sent queue cleanup. + * @return 1 if io_thread should reconnect; 0 if not + */ +static int do_queue_cleanup(SMPP *smpp, long *pending_submits, int action) +{ + List *keys; + Octstr *key; + struct smpp_msg *smpp_msg; + time_t now = time(NULL); + + if (*pending_submits <= 0) + return 0; + + /* check if action set to wait ack for ever */ + if (action == SMPP_WAITACK_NEVER_EXPIRE) + return 0; + + keys = dict_keys(smpp->sent_msgs); + if (keys == NULL) + return 0; + + while ((key = list_extract_first(keys)) != NULL) { + smpp_msg = dict_get(smpp->sent_msgs, key); + if (smpp_msg != NULL && difftime(now, smpp_msg->sent_time) > smpp->wait_ack) { + switch(action) { + case SMPP_WAITACK_RECONNECT: /* reconnect */ + /* found at least one not acked msg */ + warning(0, "SMPP[%s]: Not ACKED message found, reconnecting.", + octstr_get_cstr(smpp->conn->id)); + octstr_destroy(key); + list_destroy(keys, octstr_destroy_item); + return 1; /* io_thread will reconnect */ + case SMPP_WAITACK_REQUEUE: /* requeue */ + smpp_msg = dict_remove(smpp->sent_msgs, key); + if (smpp_msg != NULL) { + warning(0, "SMPP[%s]: Not ACKED message found, will retransmit." + " SENT<%ld>sec. ago, SEQ<%s>, DST<%s>", + octstr_get_cstr(smpp->conn->id), + (long)difftime(now, smpp_msg->sent_time) , + octstr_get_cstr(key), + octstr_get_cstr(smpp_msg->msg->sms.receiver)); + bb_smscconn_send_failed(smpp->conn, smpp_msg->msg, SMSCCONN_FAILED_TEMPORARILY,NULL); + smpp_msg_destroy(smpp_msg, 0); + (*pending_submits)--; + } + default: + error(0, "SMPP[%s] Unknown clenup action defined %xd.", + octstr_get_cstr(smpp->conn->id), action); + octstr_destroy(key); + list_destroy(keys, octstr_destroy_item); + return 0; + } + } + octstr_destroy(key); + } + list_destroy(keys, octstr_destroy_item); + + return 0; +} + /* * This is the main function for the background thread for doing I/O on @@ -1149,6 +1272,7 @@ long len; SMPP_PDU *pdu; double timeout; + time_t last_response, last_cleanup; io_arg = arg; smpp = io_arg->smpp; @@ -1170,7 +1294,7 @@ else conn = open_receiver(smpp); - last_enquire_sent = date_universal_now(); + last_enquire_sent = last_cleanup = last_response = date_universal_now(); pending_submits = -1; len = 0; smpp->throttling_err_time = 0; @@ -1199,6 +1323,7 @@ send_enquire_link(smpp, conn, &last_enquire_sent); while ((ret = read_pdu(smpp, conn, &len, &pdu)) == 1) { + last_response = time(NULL); /* Deal with the PDU we just got */ dump_pdu("Got PDU:", smpp->conn->id, pdu); handle_pdu(smpp, conn, pdu, &pending_submits); @@ -1229,7 +1354,23 @@ octstr_get_cstr(smpp->conn->id)); break; } + + /* if no PDU was received and connection timeout was set and over the limit */ + if (ret == 0 && smpp->connection_timeout > 0 && + difftime(time(NULL), last_response) > smpp->connection_timeout) { + error(0, "SMPP[%s]: No responses from SMSC within %ld sec. Reconnecting.", + octstr_get_cstr(smpp->conn->id), smpp->connection_timeout); + break; + } + + /* cleanup sent queue */ + if (transmitter && difftime(time(NULL), last_cleanup) > smpp->wait_ack) { + if (do_queue_cleanup(smpp, &pending_submits, smpp->wait_ack_action)) + break; /* reconnect */ + last_cleanup = time(NULL); + } + if (transmitter && difftime(time(NULL), smpp->throttling_err_time) > SMPP_THROTTLING_SLEEP_TIME) { smpp->throttling_err_time = 0; send_messages(smpp, conn, &pending_submits); @@ -1247,6 +1388,7 @@ */ if (transmitter) { Msg *msg; + struct smpp_msg *smpp_msg; List *noresp; Octstr *key; @@ -1257,9 +1399,10 @@ noresp = dict_keys(smpp->sent_msgs); while((key = list_extract_first(noresp)) != NULL) { - msg = dict_remove(smpp->sent_msgs, key); - if (msg != NULL) { - bb_smscconn_send_failed(smpp->conn, msg, reason, NULL); + smpp_msg = dict_remove(smpp->sent_msgs, key); + if (smpp_msg != NULL && smpp_msg->msg) { + bb_smscconn_send_failed(smpp->conn, smpp_msg->msg, reason, NULL); + smpp_msg_destroy(smpp_msg, 0); } octstr_destroy(key); } @@ -1372,6 +1515,7 @@ long smpp_msg_id_type; int autodetect_addr; Octstr *alt_charset; + long connection_timeout, wait_ack, wait_ack_action; my_number = alt_charset = NULL; transceiver_mode = 0; @@ -1479,6 +1623,20 @@ /* check for an alternative charset */ alt_charset = cfg_get(grp, octstr_imm("alt-charset")); + + /* check for connection timeout */ + if (cfg_get_integer(&connection_timeout, grp, octstr_imm("connection-timeout")) == -1) + connection_timeout = SMPP_DEFAULT_CONNECTION_TIMEOUT; + + /* check if wait-ack timeout set */ + if (cfg_get_integer(&wait_ack, grp, octstr_imm("wait-ack")) == -1) + wait_ack = SMPP_DEFAULT_WAITACK; + + if (cfg_get_integer(&wait_ack_action, grp, octstr_imm("wait-ack-expire")) == -1) + wait_ack_action = SMPP_WAITACK_REQUEUE; + + if (wait_ack_action > 0x03 || wait_ack_action < 0) + panic(0, "SMPP: Invalid wait-ack-expire directive in configuration."); smpp = smpp_create(conn, host, port, receive_port, system_type, username, password, address_range, @@ -1486,7 +1644,7 @@ dest_addr_npi, enquire_link_interval, max_pending_submits, version, priority, my_number, smpp_msg_id_type, autodetect_addr, alt_charset, - service_type); + service_type, connection_timeout, wait_ack, wait_ack_action); conn->data = smpp; conn->name = octstr_format("SMPP:%S:%d/%d:%S:%S",