Hi,

attached is a patch that make usage of priority queue and priority flag in
smsc_at & smsc_emi. This patch makes it possible to prioritize messages and
to ensure that older messages will be sent first.

Comments/votes please!

-- 
Thanks,
Alex
Index: gw/sms.c
===================================================================
RCS file: /home/cvs/gateway/gw/sms.c,v
retrieving revision 1.17
diff -a -u -p -r1.17 sms.c
--- gw/sms.c	7 Aug 2004 18:12:27 -0000	1.17
+++ gw/sms.c	3 Sep 2004 12:48:09 -0000
@@ -384,3 +384,27 @@ List *sms_split(Msg *orig, Octstr *heade
     return list;
 }
 
+
+int sms_priority_compare(const void *a, const void *b)
+{
+    int ret;
+    Msg *msg1 = (Msg*)a, *msg2 = (Msg*)b;
+    gw_assert(msg_type(msg1) == sms);
+    gw_assert(msg_type(msg2) == sms);
+    
+    if (msg1->sms.priority > msg2->sms.priority)
+        ret = 1;
+    else if (msg1->sms.priority < msg2->sms.priority)
+        ret = -1;
+    else {
+        if (msg1->sms.time > msg2->sms.time)
+            ret = 1;
+        else if (msg1->sms.time < msg2->sms.time)
+            ret = -1;
+        else
+            ret = 0;
+    }
+    
+    return ret;
+}
+
Index: gw/sms.h
===================================================================
RCS file: /home/cvs/gateway/gw/sms.h,v
retrieving revision 1.12
diff -a -u -p -r1.12 sms.h
--- gw/sms.h	28 Aug 2004 18:09:19 -0000	1.12
+++ gw/sms.h	3 Sep 2004 12:48:10 -0000
@@ -191,4 +191,10 @@ List *sms_split(Msg *orig, Octstr *heade
                 Octstr *nonlast_suffix, Octstr *split_chars, int catenate,
                 unsigned long msg_sequence, int max_messages, int max_octets);
 
+/**
+ * Compare priority and time of two sms's.
+ * @return -1 of a < b; 0 a = b; 1 a > b
+ */
+int sms_priority_compare(const void *a, const void *b);
+
 #endif
Index: gw/smsc/smsc_at.c
===================================================================
RCS file: /home/cvs/gateway/gw/smsc/smsc_at.c,v
retrieving revision 1.15
diff -a -u -p -r1.15 smsc_at.c
--- gw/smsc/smsc_at.c	19 Aug 2004 23:06:47 -0000	1.15
+++ gw/smsc/smsc_at.c	3 Sep 2004 12:48:11 -0000
@@ -1108,7 +1108,7 @@ reconnect:
 
     idle_timeout = 0;
     while (!privdata->shutdown) {
-        l = list_len(privdata->outgoing_queue);
+        l = gw_prioqueue_len(privdata->outgoing_queue);
         if (l > 0) {
             at2_send_messages(privdata);
             idle_timeout = time(NULL);
@@ -1155,7 +1155,7 @@ reconnect:
     octstr_destroy(privdata->sms_center);
     octstr_destroy(privdata->name);
     octstr_destroy(privdata->configfile);
-    list_destroy(privdata->outgoing_queue, NULL);
+    gw_prioqueue_destroy(privdata->outgoing_queue, NULL);
     list_destroy(privdata->pending_incoming_messages, octstr_destroy_item);
     gw_free(conn->data);
     conn->data = NULL;
@@ -1187,7 +1187,7 @@ int at2_shutdown_cb(SMSCConn *conn, int 
      */
     if (finish_sending == 0) {
         Msg *msg;
-        while ((msg = list_extract_first(privdata->outgoing_queue)) != NULL) {
+        while ((msg = gw_prioqueue_remove(privdata->outgoing_queue)) != NULL) {
             bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN, NULL);
         }
     }
@@ -1205,7 +1205,7 @@ long at2_queued_cb(SMSCConn *conn)
     if (conn->status == SMSCCONN_DEAD) /* I'm dead, why would you care ? */
 	return -1;
 
-    ret = list_len(privdata->outgoing_queue);
+    ret = gw_prioqueue_len(privdata->outgoing_queue);
 
     /* use internal queue as load, maybe something else later */
 
@@ -1232,7 +1232,7 @@ int at2_add_msg_cb(SMSCConn *conn, Msg *
     Msg *copy;
 
     copy = msg_duplicate(sms);
-    list_produce(privdata->outgoing_queue, copy);
+    gw_prioqueue_produce(privdata->outgoing_queue, copy);
     gwthread_wakeup(privdata->device_thread);
     return 0;
 }
@@ -1244,7 +1244,7 @@ int smsc_at2_create(SMSCConn *conn, CfgG
     Octstr *modem_type_string;
 
     privdata = gw_malloc(sizeof(PrivAT2data));
-    privdata->outgoing_queue = list_create();
+    privdata->outgoing_queue = gw_prioqueue_create(sms_priority_compare);
     privdata->pending_incoming_messages = list_create();
 
     privdata->configfile = cfg_get_configfile(cfg);
@@ -1341,7 +1341,7 @@ error:
     error(0, "AT2[%s]: Failed to create at2 smsc connection",
           octstr_len(privdata->name) ? octstr_get_cstr(privdata->name) : "");
     if (privdata != NULL) {
-        list_destroy(privdata->outgoing_queue, NULL);
+        gw_prioqueue_destroy(privdata->outgoing_queue, NULL);
     }
     gw_free(privdata);
     conn->why_killed = SMSCCONN_KILLED_CANNOT_CONNECT;
@@ -1817,10 +1817,10 @@ void at2_send_messages(PrivAT2data *priv
 
     do {
         if (privdata->modem->enable_mms && 
-			list_len(privdata->outgoing_queue) > 1)
+			gw_prioqueue_len(privdata->outgoing_queue) > 1)
             at2_send_modem_command(privdata, "AT+CMMS=2", 0, 0);
 
-        if ((msg = list_extract_first(privdata->outgoing_queue)))
+        if ((msg = gw_prioqueue_remove(privdata->outgoing_queue)))
             at2_send_one_message(privdata, msg);
     } while (msg);
 }
Index: gw/smsc/smsc_at.h
===================================================================
RCS file: /home/cvs/gateway/gw/smsc/smsc_at.h,v
retrieving revision 1.6
diff -a -u -p -r1.6 smsc_at.h
--- gw/smsc/smsc_at.h	22 Jan 2004 14:08:24 -0000	1.6
+++ gw/smsc/smsc_at.h	3 Sep 2004 12:48:11 -0000
@@ -115,7 +115,7 @@ typedef struct ModemDef {
 } ModemDef;
 
 typedef struct PrivAT2data {
-    List *outgoing_queue;
+    gw_prioqueue_t *outgoing_queue;
     ModemDef *modem;
     long device_thread;
     int	shutdown; /* Internal signal to shut down */
Index: gw/smsc/smsc_emi.c
===================================================================
RCS file: /home/cvs/gateway/gw/smsc/smsc_emi.c,v
retrieving revision 1.12
diff -a -u -p -r1.12 smsc_emi.c
--- gw/smsc/smsc_emi.c	11 Aug 2004 16:41:29 -0000	1.12
+++ gw/smsc/smsc_emi.c	3 Sep 2004 12:48:12 -0000
@@ -93,7 +93,7 @@
 
 typedef struct privdata {
     Octstr	*name;
-    List	*outgoing_queue;
+    gw_prioqueue_t *outgoing_queue;
     long	receiver_thread;
     long	sender_thread;
     int		shutdown;	  /* Internal signal to shut down */
@@ -284,7 +284,7 @@ static Connection *open_send_connection(
 
     while (!privdata->shutdown) {
 
-    while ((msg = list_extract_first(privdata->outgoing_queue))) {
+    while ((msg = gw_prioqueue_remove(privdata->outgoing_queue))) {
         bb_smscconn_send_failed(conn, msg,
                          SMSCCONN_FAILED_TEMPORARILY, NULL);
     }
@@ -909,7 +909,7 @@ static void clear_sent(PrivData *privdat
 	  octstr_get_cstr(privdata->name));
     for (i = 0; i < EMI2_MAX_TRN; i++) {
 	if (privdata->slots[i].sendtime && privdata->slots[i].sendtype == 51)
-	    list_produce(privdata->outgoing_queue, privdata->slots[i].sendmsg);
+	    gw_prioqueue_produce(privdata->outgoing_queue, privdata->slots[i].sendmsg);
 	privdata->slots[i].sendtime = 0;
     }
     privdata->unacked = 0;
@@ -922,19 +922,19 @@ static void clear_sent(PrivData *privdat
  */
 static EMI2Event emi2_wait (SMSCConn *conn, Connection *server, double seconds)
 {
-    if (emi2_can_send(conn) && list_len(PRIVDATA(conn)->outgoing_queue)) {
+    if (emi2_can_send(conn) && gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue)) {
 	return EMI2_SENDREQ;
     }
     
     if (server != NULL) {
 	switch (conn_wait(server, seconds)) {
-	case 1: return list_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT;
+	case 1: return gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT;
 	case 0: return EMI2_SMSCREQ;
 	default: return EMI2_CONNERR;
 	}
     } else {
 	gwthread_sleep(seconds);
-	return list_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT;
+	return gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT;
     }
 }
 
@@ -998,7 +998,7 @@ static int emi2_do_send(SMSCConn *conn, 
     
     /* Send messages if there's room in the sending window */
     while (emi2_can_send(conn) &&
-           (msg = list_extract_first(PRIVDATA(conn)->outgoing_queue)) != NULL) {
+           (msg = gw_prioqueue_remove(PRIVDATA(conn)->outgoing_queue)) != NULL) {
         int nexttrn = emi2_next_trn(conn);
 
         if (conn->throughput)
@@ -1205,7 +1205,7 @@ static void emi2_idleprocessing(SMSCConn
 		    warning(0, "EMI2[%s]: received neither ACK nor NACK for message %d " 
 			    "in %d seconds, resending message", octstr_get_cstr(privdata->name),
 			    i, PRIVDATA(conn)->waitack);
-		    list_produce(PRIVDATA(conn)->outgoing_queue,
+		    gw_prioqueue_produce(PRIVDATA(conn)->outgoing_queue,
 				 PRIVDATA(conn)->slots[i].sendmsg);
                         PRIVDATA(conn)->slots[i].sendtime = 0;
                         PRIVDATA(conn)->unacked--;
@@ -1363,7 +1363,7 @@ static void emi2_sender(void *arg)
 	}
     }
 
-    while((msg = list_extract_first(privdata->outgoing_queue)) != NULL)
+    while((msg = gw_prioqueue_remove(privdata->outgoing_queue)) != NULL)
 	bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN, NULL);
     if (privdata->rport > 0)
 	gwthread_join(privdata->receiver_thread);
@@ -1374,7 +1374,7 @@ static void emi2_sender(void *arg)
     debug("bb.sms", 0, "EMI2[%s]: connection has completed shutdown.",
 	  octstr_get_cstr(privdata->name));
 
-    list_destroy(privdata->outgoing_queue, NULL);
+    gw_prioqueue_destroy(privdata->outgoing_queue, NULL);
     octstr_destroy(privdata->name);
     octstr_destroy(privdata->allow_ip);
     octstr_destroy(privdata->deny_ip);
@@ -1533,7 +1533,7 @@ static int add_msg_cb(SMSCConn *conn, Ms
     Msg *copy;
 
     copy = msg_duplicate(sms);
-    list_produce(privdata->outgoing_queue, copy);
+    gw_prioqueue_produce(privdata->outgoing_queue, copy);
     gwthread_wakeup(privdata->sender_thread);
 
     return 0;
@@ -1556,7 +1556,7 @@ static int shutdown_cb(SMSCConn *conn, i
 
     if (finish_sending == 0) {
 	Msg *msg;
-	while((msg = list_extract_first(privdata->outgoing_queue)) != NULL) {
+	while((msg = gw_prioqueue_remove(privdata->outgoing_queue)) != NULL) {
 	    bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN, NULL);
 	}
     }
@@ -1584,7 +1584,7 @@ static long queued_cb(SMSCConn *conn)
     PrivData *privdata = conn->data;
     long ret;
 
-    ret = (privdata ? list_len(privdata->outgoing_queue) : 0);
+    ret = (privdata ? gw_prioqueue_len(privdata->outgoing_queue) : 0);
 
     /* use internal queue as load, maybe something else later */
 
@@ -1606,7 +1606,7 @@ int smsc_emi2_create(SMSCConn *conn, Cfg
     allow_ip = deny_ip = host = alt_host = NULL; 
 
     privdata = gw_malloc(sizeof(PrivData));
-    privdata->outgoing_queue = list_create();
+    privdata->outgoing_queue = gw_prioqueue_create(sms_priority_compare);
     privdata->listening_socket = -1;
     privdata->can_write = 1;
     privdata->priv_nexttrn = 0;
@@ -1800,7 +1800,7 @@ error:
     error(0, "EMI2[%s]: Failed to create emi2 smsc connection",
 	  octstr_get_cstr(privdata->name));
     if (privdata != NULL) {
-	list_destroy(privdata->outgoing_queue, NULL);
+	gw_prioqueue_destroy(privdata->outgoing_queue, NULL);
     }
     gw_free(privdata);
     octstr_destroy(allow_ip);

Reply via email to