These patches restructure the mechanism for queueing incoming transport
messages.  In addition, the patch to transport_client.c rearranges the
logic a bit in client_recv().

1. A transport_message now carries a pointer to be used in a linked list.
It is initialized to NULL when the message is created.  We no longer use
a separately allocated list node to carry the message.

2. The queue of transport_messages no longer starts with a dummy node.

3. Instead of finding the tail of the queue by traversing the list from
the head, we maintain a separate pointer to the tail node.  Thus the
enqueuing operation occurs in constant time instead of linear time.

4. In client_recv: we now have the dequeueing code in a single place,
instead of duplicating it.

5. In client_recv: I eliminated some conditional compilation that made
no real difference, since both branches of the #ifdef were effectively
identical.

6. In client_recv: changed both loops from while loops to do-while
loops, since in each case we want to perform at least one iteration.

----------------

The changes to the message queue follow what I outlined in a previous
post.

I understand Mike's discomfort with conflating the container with the
contained, but I have three counter-argruments.  First, it is inherent
in messages that they be queued.  The message itself might as well 
provide a suitable hook.  Second, the transport_message, transport_client,
and transport_session are all so tightly coupled to each other anyway 
that there's not much point in maintaining a pretense of independence.
Third, we have the venerable precedent of osrfMessage, which carries a
similar linkage pointer for the convenience of other levels of code.

The other changes to client_recv() were a case of one thing leading to
another.

First, it annoyed me to have the dequeueing logic in two different 
places.  I rearranged the logic so that we don't dequeue anything until
we have made every effort to make sure that the queue isn't empty.
Then we can dequeue in just one spot.

(An old classmate of mine once remarked that "queue" is the only word
in the English language that is 80% superfluous.)

Next I eliminated the conditional compilation, which in this case didn't
accomplish anything, and which probably wouldn't work very well anyway
if you use shared libraries for this stuff.

What was wrapped in the #ifdef was a break statement, subject to an if,
and it looked a little awkward.  I think the intent was to  make sure
that we perform at least one iteration even if the timeout is zero.  A
more graceful way to do that is to use a do-while loop instead of a
while loop, so that's what I did.

If the timeout parameter is zero, the results will be the same. Otherwise
there will be a subtle difference.  With the old logic, even when the
time remaining reaches zero, we'll still make one last attempt to read
a message.  With the new logic we won't.  My sense of the matter is that
this difference is too slight to worry about, especially since the same
timeout is applied by the underlying select() call.  If it matters, we
can go back to the old logic with a break statement.

Then I looked at the other loop, where timeout = -1, and changed it to
a do-while as well.  Since we have already determined that the queue is
empty, we don't need to check the queue again at the top of the loop.
More importantly, the code is easier to read if both loops have almost
the same structure.

Scott McKellar
http://home.swbell.net/mck9/ct/

Developer's Certificate of Origin 1.1 By making a contribution to
this project, I certify that:

(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license indicated
in the file; or

(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source license
and I have the right under that license to submit that work with
modifications, whether created in whole or in part by me, under the
same open source license (unless I am permitted to submit under a
different license), as indicated in the file; or

(c) The contribution was provided directly to me by some other person
who certified (a), (b) or (c) and I have not modified it; and

(d) In the case of each of (a), (b), or (c), I understand and agree
that this project and the contribution are public and that a record
of the contribution (including all personal information I submit
with it, including my sign-off) is maintained indefinitely and may
be redistributed consistent with this project or the open source
license indicated in the file.
*** ./trunk/include/opensrf/transport_message.h	2008-12-07 08:07:52.000000000 -0600
--- ./trunk-mod/include/opensrf/transport_message.h	2008-12-07 15:55:48.000000000 -0600
***************
*** 34,39 ****
--- 34,40 ----
  	int error_code;
  	int broadcast;
  	char* msg_xml; /* the entire message as XML complete with entity encoding */
+ 	struct transport_message_struct* next;
  };
  typedef struct transport_message_struct transport_message;
  
*** ./trunk/src/libopensrf/transport_message.c	2008-12-07 08:07:55.000000000 -0600
--- ./trunk-mod/src/libopensrf/transport_message.c	2008-12-07 15:55:10.000000000 -0600
***************
*** 45,50 ****
--- 45,51 ----
  	msg->error_code     = 0;
  	msg->broadcast      = 0;
  	msg->msg_xml        = NULL;
+ 	msg->next           = NULL;
  
  	return msg;
  }
***************
*** 72,77 ****
--- 73,79 ----
  	new_msg->error_code     = 0;
  	new_msg->broadcast      = 0;
  	new_msg->msg_xml        = NULL;
+ 	new_msg->next           = NULL;
  
  	xmlKeepBlanksDefault(0);
  	xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 );
***************
*** 160,167 ****
  		new_msg->body = strdup("");
  
  	new_msg->msg_xml = xmlDocToString(msg_doc, 0);
!    xmlFreeDoc(msg_doc);
!    xmlCleanupParser();
  
  	return new_msg;
  }
--- 162,169 ----
  		new_msg->body = strdup("");
  
  	new_msg->msg_xml = xmlDocToString(msg_doc, 0);
! 	xmlFreeDoc(msg_doc);
! 	xmlCleanupParser();
  
  	return new_msg;
  }
*** ./trunk/include/opensrf/transport_client.h	2008-12-07 08:07:52.000000000 -0600
--- ./trunk-mod/include/opensrf/transport_client.h	2008-12-07 15:56:29.000000000 -0600
***************
*** 12,18 ****
  // Our client struct.  We manage a list of messages and a controlling session
  // ---------------------------------------------------------------------------
  struct transport_client_struct {
! 	struct message_list_struct* m_list;
  	transport_session* session;
  	int error;
  };
--- 12,19 ----
  // Our client struct.  We manage a list of messages and a controlling session
  // ---------------------------------------------------------------------------
  struct transport_client_struct {
! 	transport_message* msg_q_head;
! 	transport_message* msg_q_tail;
  	transport_session* session;
  	int error;
  };
*** ./trunk/src/libopensrf/transport_client.c	2008-12-07 08:07:55.000000000 -0600
--- ./trunk-mod/src/libopensrf/transport_client.c	2008-12-07 15:56:07.000000000 -0600
***************
*** 1,21 ****
  #include <opensrf/transport_client.h>
  
- #define MESSAGE_LIST_HEAD 1
- #define MESSAGE_LIST_ITEM 2
- 
- // ---------------------------------------------------------------------------
- // Represents a node in a linked list.  The node holds a pointer to the next
- // node (which is null unless set), a pointer to a transport_message, and
- // and a type variable (which is not really curently necessary).
- // ---------------------------------------------------------------------------
- struct message_list_struct {
- 	struct message_list_struct* next;
- 	transport_message* message;
- 	int type;
- };
- typedef struct message_list_struct transport_message_list;
- typedef struct message_list_struct transport_message_node;
- 
  static void client_message_handler( void* client, transport_message* msg );
  
  //int main( int argc, char** argv );
--- 1,5 ----
***************
*** 69,83 ****
  	/* build and clear the client object */
  	transport_client* client = safe_malloc( sizeof( transport_client) );
  
! 	/* build and clear the message list */
! 	client->m_list = safe_malloc( sizeof( transport_message_list ) );
! 
! 	client->m_list->next = NULL;
! 	client->m_list->message = NULL;
! 	client->m_list->type = MESSAGE_LIST_HEAD;
! 
! 	/* build the session */
  	
  	client->session = init_transport( server, port, unix_path, client, component );
  
  	client->session->message_callback = client_message_handler;
--- 53,63 ----
  	/* build and clear the client object */
  	transport_client* client = safe_malloc( sizeof( transport_client) );
  
! 	/* start with an empty message queue */
! 	client->msg_q_head = NULL;
! 	client->msg_q_tail = NULL;
  	
+ 	/* build the session */
  	client->session = init_transport( server, port, unix_path, client, component );
  
  	client->session->message_callback = client_message_handler;
***************
*** 116,200 ****
  transport_message* client_recv( transport_client* client, int timeout ) {
  	if( client == NULL ) { return NULL; }
  
! 	transport_message_node* node;
! 	transport_message* msg;
  
  
! 	/* see if there are any message in the messages queue */
! 	if( client->m_list->next != NULL ) {
! 		/* pop off the first one... */
! 		node = client->m_list->next;
! 		client->m_list->next = node->next;
! 		msg = node->message;
! 		free( node );
! 		return msg;
! 	}
  
- 	if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
- 
- 		while( client->m_list->next == NULL ) {
- 		//	if( ! session_wait( client->session, -1 ) ) {
  			int x;
! 			if( (x = session_wait( client->session, -1 )) ) {
! 				osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
! 				client->error = 1;
! 				return NULL;
! 			}
! 		}
  
! 	} else { /* wait at most timeout seconds */
! 
! 	
! 		/* if not, loop up to 'timeout' seconds waiting for data to arrive */
! 		time_t start = time(NULL);	
! 		time_t remaining = (time_t) timeout;
! 
! 		int counter = 0;
! 
! 		int wait_ret;
! 		while( client->m_list->next == NULL && remaining >= 0 ) {
! 
! 			if( (wait_ret= session_wait( client->session, remaining)) ) {
! 				client->error = 1;
! 				osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret);
! 				return NULL;
! 			}
! 
! 			++counter;
! 
! #ifdef _ROUTER
! 			// session_wait returns -1 if there is no more data and we're a router
! 			if( remaining == 0 ) { // && wait_ret == -1 ) {
! 				break;
! 			}
! #else
! 			if( remaining == 0 ) // or infinite loop
! 				break;
! #endif
! 
! 			remaining -= (int) (time(NULL) - start);
  		}
- 
  	}
  
! 	/* again, see if there are any messages in the message queue */
! 	if( client->m_list->next != NULL ) {
! 		/* pop off the first one... */
! 		node = client->m_list->next;
! 		client->m_list->next = node->next;
! 		msg = node->message;
! 		free( node );
! 		return msg;
! 
! 	} else {
! 		return NULL;
  	}
  }
  
  // ---------------------------------------------------------------------------
  // This is the message handler required by transport_session.  This handler
! // takes all incoming messages and puts them into the back of a linked list
! // of messages.
  // ---------------------------------------------------------------------------
  static void client_message_handler( void* client, transport_message* msg ){
  
--- 96,158 ----
  transport_message* client_recv( transport_client* client, int timeout ) {
  	if( client == NULL ) { return NULL; }
  
! 	int error = 0;  /* boolean */
  
+ 	if( NULL == client->msg_q_head ) {
  
! 		/* no messaage available?  try to get one */
! 		if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
  
  			int x;
! 			do {
! 				if( (x = session_wait( client->session, -1 )) ) {
! 					osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
! 					error = 1;
! 					break;
! 				}
! 			} while( client->msg_q_head == NULL );
! 
! 		} else {    /* loop up to 'timeout' seconds waiting for data to arrive  */
! 
! 			/* This loop assumes that a time_t is denominated in seconds -- not */
! 			/* guaranteed by Standard C, but a fair bet for Linux or UNIX       */
! 
! 			time_t start = time(NULL);
! 			time_t remaining = (time_t) timeout;
! 
! 			int wait_ret;
! 			do {
! 				if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
! 					error = 1;
! 					osrfLogDebug(OSRF_LOG_MARK,
! 						"session_wait returned failure code %d: setting error=1\n", wait_ret);
! 					break;
! 				}
  
! 				remaining -= time(NULL) - start;
! 			} while( NULL == client->msg_q_head && remaining > 0 );
  		}
  	}
+ 	
+ 	transport_message* msg = NULL;
  
! 	if( error )
! 		client->error = 1;
! 	else if( client->msg_q_head != NULL ) {
! 		/* got message(s); dequeue the oldest one */
! 		msg = client->msg_q_head;
! 		client->msg_q_head = msg->next;
! 		msg->next = NULL;  /* shouldn't be necessary; nullify for good hygiene */
! 		if( NULL == client->msg_q_head )
! 			client->msg_q_tail = NULL;
  	}
+ 
+ 	return msg;
  }
  
  // ---------------------------------------------------------------------------
  // This is the message handler required by transport_session.  This handler
! // takes an incoming message and adds it to the tail of a message queue.
  // ---------------------------------------------------------------------------
  static void client_message_handler( void* client, transport_message* msg ){
  
***************
*** 203,223 ****
  
  	transport_client* cli = (transport_client*) client;
  
! 	transport_message_node* node = safe_malloc( sizeof( transport_message_node) );
! 	node->next = NULL;
! 	node->type = MESSAGE_LIST_ITEM;
! 	node->message = msg;
! 
! 
! 	/* find the last node and put this onto the end */
! 	transport_message_node* tail = cli->m_list;
! 	transport_message_node* current = tail->next;
! 
! 	while( current != NULL ) {
! 		tail = current;
! 		current = current->next;
  	}
! 	tail->next = node;
  }
  
  
--- 161,174 ----
  
  	transport_client* cli = (transport_client*) client;
  
! 	/* add the new message to the tail of the queue */
! 	if( NULL == cli->msg_q_head )
! 		cli->msg_q_tail = cli->msg_q_head = msg;
! 	else {
! 		cli->msg_q_tail->next = msg;
! 		cli->msg_q_tail = msg;
  	}
! 	msg->next = NULL;
  }
  
  
***************
*** 225,242 ****
  	if(client == NULL) return 0; 
  
  	session_free( client->session );
! 	transport_message_node* current = client->m_list->next;
! 	transport_message_node* next;
  
  	/* deallocate the list of messages */
  	while( current != NULL ) {
  		next = current->next;
! 		message_free( current->message );
! 		free(current);
  		current = next;
  	}
  
- 	free( client->m_list );
  	free( client );
  	return 1;
  }
--- 176,191 ----
  	if(client == NULL) return 0; 
  
  	session_free( client->session );
! 	transport_message* current = client->msg_q_head;
! 	transport_message* next;
  
  	/* deallocate the list of messages */
  	while( current != NULL ) {
  		next = current->next;
! 		message_free( current );
  		current = next;
  	}
  
  	free( client );
  	return 1;
  }

Reply via email to