Hi:

I reattached the following files with some confusing debugging stuff removed and
a dump using them follows.

* main.c
* network_io.c
* rx_fsm.c
* async.c

Again this is the relevant code segment.

    case T_POLL_CONFIG_REQ:
        //
        // Main process request to start Libuv/epoll() operation.
        //
        poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));

        if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
          {

fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s",
                 r, uv_err_name(r));
          }

This is the crash diagnostic.

IO_TASK: POLL_PROXY -  Polling Initialization Error -17, EEXIST
Aborted (core dumped)

And the is the gdb backtrace and is typical of the problem.

Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
Core was generated by `./pexd'.
Program terminated with signal SIGABRT, Aborted.
#0 0x00007ff1da90b267 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
55    ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007ff1da90b267 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1  0x00007ff1da90ceca in __GI_abort () at abort.c:89
#2  0x000000000040d379 in fatal_error (log_prio=2,
fmt=0x424ce0 "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s") at misc.c:135 #3 0x0000000000405b21 in poll_proxy (handle=0x641f80 <IO_Task_Async_Handle>) at network_io.c:287 #4 0x000000000041142b in uv__async_io (loop=0xc26800 <Poll_Loop>, w=<optimized out>, events=<optimized out>)
    at src/unix/async.c:163
#5 0x000000000041d1b5 in uv__io_poll (loop=loop@entry=0xc26800 <Poll_Loop>, timeout=<optimized out>)
    at src/unix/linux-core.c:462
#6 0x0000000000411ba8 in uv_run (loop=0xc26800 <Poll_Loop>, mode=UV_RUN_DEFAULT) at src/unix/core.c:385
#7  0x000000000040580f in IO_Task (arg=0x0) at network_io.c:100
#8 0x00007ff1daca76aa in start_thread (arg=0x7ff1d5a7c700) at pthread_create.c:333 #9 0x00007ff1da9dceed in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

At this point cdesc->fd is 22 and Poll_Loop.watchers[22] = 0x17ef458, a valid address ?

On 04/29/2021 09:09 AM, Paul Romero wrote:
Hi:

The most common symptom of the problem is that uv_poll_init() fails which means the problem must be related to the uv_poll_t Poll_Loop. The relevant code segment is as follows.

   poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));

   if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
     {
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s",
                             r, uv_err_name(r));
      }

The following files illustrate the problem and are attached to this message. Please advise me
how to attach files to the list directly.

*  trace_badf.txt: A stack trace when the error code is EBADF.
*  trace_eexist.txt: A stack trace when the error code is EEXIST.
* network_io.c: The IO_Task() thread code. The problem occurs in the poll_proxy() routine. * main.c: The main() process/thread code where most initialization occurs.
* transmit.c: The Transmit_Task() thread.
* rx_fsm.c: The heart of the protocol task code.


You are right that only the IO_Task() performs polling related operations.
However, please note that the Transmit_Task() thread uses the underlying TCP socket descriptor to send data to the network with Linux send(). (See the TX_DATA() routine.) Also,
USE_SSL and USE_SCRAM are not defined.

I will send more information if am able to produce the problem at another point. Also, I have seen the issue you mentioned before and suspect it may be relevant.
Please let me know if you need anything else.


Best Regards,

Paul R.






Best Regards,

Paul R.


On 04/28/2021 08:52 PM, Jameson Nash wrote:
I'm assuming from your description that you've ensured all libuv calls happen from exactly one thread (except uv_async_send). It sounds like you're describing the issue that https://github.com/libuv/libuv/commit/c9406ba0e3d67907c1973a71968b89a6bd83c63c was intended to fix, which was included in v1.41.0

Note that the Poll_Loop.watchers[N] is not always required to be NULL, so perhaps you've found a case where it is still incorrectly expecting that? You might want to file an issue with code to reproduce the problem, or at least an `rr` trace dump of the code that hits problem (note that `rr` could potentially upload all of your hard drive, so be careful what you are submitting).

On Wed, Apr 28, 2021 at 11:13 PM pa...@rcom-software.com <mailto:pa...@rcom-software.com> <pa...@rcom-software.com <mailto:pa...@rcom-software.com>> wrote:

    Hi Folks:

    I am experiencing an intermittent problem with uv_poll_init()
    after the successful establishment and release
    of multiple concurrent TCP connections. I am not sure if this
    problem is due to a bug, which may
    be corrected in another Libuv release, or if I am doing something
    wrong when releasing the poll handle.
    Do you have insight about the cause ? The details I consider most
    important follow.

    The uv_poll_t callback function reads incoming TCP data as follows.

        iff(events & UV_READABLE)
          {
            CONN_DESC *cdesc = (CONN_DESC *) poll_handle->data;
            n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE,
    MSG_DONTWAIT);

    NOTE: I am using Libuv version 1.41.0 running on Ubuntu Linux
    version 15.04.

    The problem is that uv_poll_init() fails, normally with the
    EEXIST or EBADF error code, and
    my investigation indicates the uv_loop_t Poll_Loop.watchers[N]
    field is not NULL when it should be,
    where N is TCP socket descriptor. It occurs immediately after the
    uv_poll_t poll_handle is allocated.
    (There is exactly one TCP socket descriptor per poll handle.) The
    call to uv_poll_init() is as follows
    and the socket descriptor is obtained with uv_fileno().

        if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
           {
            fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling
    Initialization Error %d, %s",
                 r, uv_err_name(r));
           }

    It occurs in the IO_Task() thread when there multiple TCP socket
    descriptors are in use. The IO_task
    releases the poll_handle with the following code sequence when it
    is notified that polling should stop
    via a Libuv async. message from the Protocol_Task() thread.

        if( (r = uv_poll_stop(poll_handle)) )
          {
            fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Poll Stop
    Error %d, %s",
                 r, uv_err_name(r) );
          }

        poll_handle->data = (void *) cdesc;
        uv_close((uv_handle_t *) poll_handle, async_close_callback);

    The actual release occurs in async_close_callback() as follows
    and the Protocol_Task() releases the
    TCP socket decriptor with a normal Linux close() after it
    receives the T_DISMANTLE response message.

        VFREE((UCHAR *) handle);
        //
        // Send a notification message to the Protocol_Task.
        //
        msg = MSG_ALLOC(0, FALSE);
        msg->class = C_NOTIFY;
        msg->type = T_DISMANTLE_RSP;
        msg->info = 0;

        SendProtoMsg(cdesc, msg);

    I think the underlying cause is that if there is new TCP
    connection that uses a the same TCP socket descriptor
    as one released with uv_poll_stop() and uv_close(), the call to
    uv_poll_init() occurs before the socket closure
    has propogated into the uv_loop_t Poll_Handle.

    Best Regards,

    Paul Romero
-- You received this message because you are subscribed to the
    Google Groups "libuv" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to libuv+unsubscr...@googlegroups.com
    <mailto:libuv+unsubscr...@googlegroups.com>.
    To view this discussion on the web visit
    
https://groups.google.com/d/msgid/libuv/c3290a76-ab6b-42ad-8540-33021c6188b9n%40googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "libuv" group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/libuv/_4ClQoaVPCg/unsubscribe. To unsubscribe from this group and all its topics, send an email to libuv+unsubscr...@googlegroups.com <mailto:libuv+unsubscr...@googlegroups.com>. To view this discussion on the web visit https://groups.google.com/d/msgid/libuv/CADnnjUXSsF7QwRnVqFojAm1W_o35CHKyMCPM%2BVw0FM0FjqN1XQ%40mail.gmail.com.

--


Paul Romero
-----------
RCOM Communications Software
EMAIL:pa...@rcom-software.com
PHONE: (510)482-2769





--


Paul Romero
-----------
RCOM Communications Software
EMAIL: pa...@rcom-software.com
PHONE: (510)482-2769




--
You received this message because you are subscribed to the Google Groups 
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to libuv+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/libuv/608B2FAA.2070509%40rcom-software.com.
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"

#include <sys/types.h>
#include <sys/socket.h>

//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE



//
// ***********************************************************************************************
// *************************************** DATA **************************************************
// ***********************************************************************************************
//

//
// Protects Connect_Task_Input_Q and IO_Task_Input_Q.
//
extern uv_mutex_t Async_Mutex;

//
// ***********************************************************************************************
// ************************************* PROTOTYPES  *********************************************
// ***********************************************************************************************
//

void send_async_msg(CONN_DESC *, const ASYNC_DATA *);
void async_close_callback(uv_handle_t *);

void SendProtoMsg(CONN_DESC *, MSG *);

void insert_msg(MSG_FIFO *, MSG *);

MSG *MSG_ALLOC(int, BOOL);
void VFREE(UCHAR *);


//
// Send a message to the target loop task. 
//
//
// FUNCTION - send_async_task(): Sends an async. message to the specified task.
//
// ARGUMENTS
// -----------
// cdesc: The connection descriptor corresponding to the connection from which message is sent.
// parm: Specifies the task to which the message will be sent, and the relevant task attributes.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -------
// Care must be taken to specify the following attributes correctly.
//
// * type: The message type.
// * async_handle: The Libuv async. handle used to convey the message.
// * async_q: The message input queue of the task.
// * object_handle: If this field is not NULL, it contains the Libuv handle necessary
//   for performing the operation conveyed by the message type.
//
// The message is handled by a proxy routine which already must be specified via
// a earlier call to uv_async_init().
//
ROUTINE void send_async_msg(CONN_DESC *cdesc, const ASYNC_DATA *parm)
{
	MSG *msg;
	uv_handle_t **ppdata;

	msg = MSG_ALLOC( sizeof(uv_handle_t *), FALSE );

	msg->class = C_ASYNC;
	msg->type = parm->type;
	msg->conn_desc = (void *) cdesc;
	//
	// Store the Libuv handle if it is specified.
	//
	if(parm->object_handle)
	   {
		ppdata = (uv_handle_t **) &msg->buffer[0];
		*ppdata = (uv_handle_t *) parm->object_handle;

if(*ppdata == NULL)
{
	fprintf(stderr, "BUGGG 2\n");
	abort();
}
	  }

ENTER_MUTEX(&Async_Mutex);
	insert_msg(parm->async_q, msg);
EXIT_MUTEX(&Async_Mutex);

	uv_async_send(parm->async_handle);

	return;
}

//
// FUNCTION - async_close_callback(): Frees a Libuv handle and notifies the task
// that initiated an async. operation that it is complete.
//
// ARGUMENTS
// -----------
// handle: The Libuv handle.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -------
// This is a Libuv specific callback routine whose execution is specified by a call to uv_close().
//
extern uv_loop_t	Poll_Loop;	// KLUDGE
ROUTINE void async_close_callback(uv_handle_t *handle)
{
	MSG *msg;

	CONN_DESC *cdesc = (CONN_DESC *) handle->data;

	VFREE((UCHAR *) handle);
	//
	// Send a notification message to the Protocol_Task.
	//
	msg = MSG_ALLOC(0, FALSE);
	msg->class = C_NOTIFY;
	msg->type = T_DISMANTLE_RSP;
	msg->info = 0;

	SendProtoMsg(cdesc, msg);

	return;
}

#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"

//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE


//
// *************************************************************************************
// ************************************ DATA *******************************************
// *************************************************************************************
//

//
// **************** Data Shared by multple Tasks ******************************
//


//
// Protects Connect_Task_Input_Q and IO_Task_Input_Q.
//
uv_mutex_t Async_Mutex;

//
// Protects protocol data shared by or accessible to  multiple tasks.
//
uv_mutex_t			Protocol_D_Mutex;

int N_Tasks;			// The number of initialized thread tasks.
int N_Sockets;			// The current number of connections accepted.

//
// A special purpose per connection Mutex that protects the binary seqment queue which
// is accessible to the Protocol_Task() and DB_Task().
//
uv_mutex_t			DataXfer_Mutex;

//
// Connection Descriptor Table.
//------------------------------------
// Using a statically allocated table like this is only appropriate for
// an implementation with a small maximum number of connections.
//
// TBD: In the case  of large maximum number of connections this table
// should be replaced by one that holds pointers to  dynamically allocated
// connection descriptors.
//
CONN_DESC Conn_Desc_Table[MAX_CONN_DESC];

//
// **************************** main() Process/Task Data *********************
//

//
// Loop for detecting incoming connections,
//
PRIVATE uv_loop_t	Connect_Loop;
//
// TRUE when the connection accept callback is executing.
//
PRIVATE BOOL		Connect_Accept_Busy = FALSE;
//
// TRUE when the connection plumbing proxy is executing.
//
PRIVATE BOOL		Connect_Proxy_Busy = FALSE;
//
// The main process async. message queue.
//
MSG_FIFO		Connect_Task_Input_Q;
//
// The main process async. operation channel.
//
uv_async_t		Connect_Task_Async_Handle;

//
// **************************** IO_Task() Data *****************************
//

//
// The IO_Task handles epoll() read I/O event with a poll loop of type uv_poll_t.
//
void IO_Task(void *arg);
uv_thread_t			IO_Task_Handle;
//
// IO_Task() async. opertion handle.
//
uv_async_t			IO_Task_Async_Handle;
//
// IO_Task() asyc. message queue.
//
MSG_FIFO			IO_Task_Input_Q;

//
// **************************** DB_Task() Data *************************
//

//
// The DB_Task performs slow database insertions.
//
void DB_Task(void *);
uv_thread_t			DB_Task_Handle;

//
// ***************************** Timer_Task() Data **********************
//

//
// The Time_Task() implements protocol timers and  handlles ticks every 1/10 of second in the Timer_Loop.
//
void Timer_Task(void *);
uv_thread_t			Timer_Task_Handle;

//
//  *************************  Trasnsit_Task() Data ******************************
//

//
// The tranmission task handle.
//
void Transmit_Task(void *);
uv_thread_t			Send_Task_Handle;


//
// ************************* Protocol_Task() Data *****************************
//

//
// The protocol task handles the arrival of messages on the Service Queue.
//
void Protocol_Task(void *arg);
uv_thread_t			Protocol_Task_Handle;
//
// The Service Queue is doubly linked  Circular queue containing
// only Connection Descriptors with SDUs on their Task Input Queue.
//
CONN_DESC *Service_Q;	// The head of the queue.
//
// Protects the Service_Q and its condition variable.
//
uv_mutex_t			Service_Q_Mutex;
//
// Service_Q message queue condition variable.
//
uv_cond_t			Service_Q_Cond;


//
// ******************************************************************************************
// ********************************** PROTOTYPES *********************************************
// ******************************************************************************************
//

void send_async_msg(CONN_DESC *, const ASYNC_DATA *);
void async_close_callback(uv_handle_t *);

PRIVATE void make_incoming_connection(uv_stream_t *, int );	// This is a callback routine.
PRIVATE void connect_proxy(uv_async_t * );		// This callback routine executes in the main() process.
PRIVATE void forced_close_callback(uv_handle_t *);

char *host_to_ip(const char *, char [], int );

#ifdef USE_SSL

SSL *Ssl_Accept(CONN_DESC *, int );
void Ssl_ReleaseConnect(CONN_DESC *, uv_mutex_t * );
void Ssl_Init();

#ifdef USE_CRYPTO_LOCK
ROUTINE void Ssl_InitLock(int );
#endif // USE_CRYPTO_LOCK

#endif // USE_SSL

void sig_hup_callback(uv_signal_t *, int );
void sig_term_callback(uv_signal_t *, int );
void sig_kill_callback(uv_signal_t *, int );

MSG *remove_msg(MSG_FIFO *);

CONN_DESC *ALLOC_CONN_DESC(int );
BOOL DELETE_CONN(CONN_DESC *);

void MSG_FREE(MSG *);

UCHAR *VALLOC(int n);
void VFREE(UCHAR *);
void VMEM_INIT();

#ifdef LIBUV_DETACH
int uv_thread_detached_create(uv_thread_t *, void (* )(void * ), void * );
#else // LIBUV_DETACH
int uv_thread_detach(uv_thread_t *);
#endif // LIBUV_DETACH

void fatal_error(int , const char *, ...);

int main()
{
	uv_tcp_t listen_handle;
	uv_signal_t sig_term_handle, sig_hup_handle, sig_int_handle;
	struct sockaddr_in addr;
	int r, k;

	//
	// Open the debug log.
	//
	openlog("pexd", LOG_PID, LOG_LOCAL1);
	syslog(LOG_ALERT, "PEXD DAEMON: VERSION %s STARTING", VERSION);
	syslog(LOG_INFO, "DAEMON: Initializing System");
	//
	// Initalize memory managment.
	//
	VMEM_INIT();
	//
	// Initilize global data.
	//
	N_Tasks = 0;
	N_Sockets = 0;
	bzero((void *) Conn_Desc_Table, sizeof(Conn_Desc_Table));
	for(k = 0; k < MAX_CONN_DESC; k++)
	   {
		Conn_Desc_Table[k].index = k;
		Conn_Desc_Table[k].fd = -1;
		Conn_Desc_Table[k].decoder.buf_ptr = Conn_Desc_Table[k].decoder.buffer;
	   }

	//
	//  Intialize the Libuv async. channel Mutex.
	//
	uv_mutex_init(&Async_Mutex);
	//
	// Initialize the Service_Q, used in the Protocol_Task() and
	// its Mutex and condition variable.
	//
	Service_Q = NULL;
	uv_mutex_init(&Service_Q_Mutex);
	uv_cond_init(&Service_Q_Cond);
#ifdef USE_SSL
	//
	// Initialize SSL connection infrastructure.
	//
	Ssl_Init();

#ifdef USE_CRYPTO_LOCK
	//
	// Protect internal crypto. operations.
	//
	Ssl_InitLock(MAIN_TASK);
#endif // USE_CRYPTO_LOCK

#endif // USE_SSL
	//
	// Initialize the protocol data DataXfer Mutexes.
	//
	uv_mutex_init(&Protocol_D_Mutex);
	uv_mutex_init(&DataXfer_Mutex);
	//
	// Launch the Timer_Task(), IO_Task(), DB_Task(), Protocol_Task(), and Transmit_Task().
	//
	// TBD: When a newer version of Libuv is available use uv_thread_detach() instead
	// of uv_thread_detached_create(). Note that uv_thread_detached_create() is a custom 
	// routine I added to the src/unix/thread.c code and not part of the official Libuv
	// package.
	//
	syslog(LOG_INFO, "DAEMON: Spawning Tasks");

#ifdef LIBUV_DETACH
	uv_thread_detached_create(&DB_Task_Handle, DB_Task, NULL);
	uv_thread_detached_create(&Protocol_Task_Handle, Protocol_Task, NULL);
	uv_thread_detached_create(&Send_Task_Handle, Transmit_Task, NULL);
	uv_thread_detached_create(&Timer_Task_Handle, Timer_Task, NULL);
	uv_thread_detached_create(&IO_Task_Handle, IO_Task, NULL);
#else // LIBUV_DETACH
	tid = uv_thread_create(&DB_Task_Handle, DB_Task, NULL);
	uv_thread_detach(DB_Task_Handle);
	tid = uv_thread_create(&Protocol_Task_Handle, Protocol_Task, NULL);
	uv_thread_detach(Protocol_Task_Handle);
	tid = uv_thread_create(&Send_Task_Handle, Transmit_Task, NULL);
	uv_thread_detach(Send_Task_Handle);
	tid = uv_thread_create(&Timer_Task_Handle, Timer_Task, NULL);
	uv_thread_detach(Timer_Task_Handle);
	tid = uv_thread_create(&IO_Task_Handle, IO_Task, NULL);
	uv_thread_detach(IO_Task_Handle);
#endif // LIBUV_DETACH
	//
	// Initialize the Connect_Loop and its data.
	//
	Connect_Accept_Busy = FALSE;
	Connect_Proxy_Busy = FALSE;

	Connect_Task_Input_Q.head = Connect_Task_Input_Q.tail = NULL;

	uv_loop_init(&Connect_Loop);
	uv_async_init(&Connect_Loop, &Connect_Task_Async_Handle, connect_proxy);
	//
	// Initialize Linux signal handling in this task.
	//
	uv_signal_init(&Connect_Loop, &sig_term_handle);
    	uv_signal_start(&sig_term_handle, sig_term_callback, SIGTERM);

	uv_signal_init(&Connect_Loop, &sig_hup_handle);
    	uv_signal_start(&sig_hup_handle, sig_hup_callback, SIGHUP);

	uv_signal_init(&Connect_Loop, &sig_int_handle);
    	uv_signal_start(&sig_int_handle, sig_kill_callback, SIGKILL);
	//
	// TBD: This signal should be disabled in real life.
	//
	uv_signal_init(&Connect_Loop, &sig_int_handle);
    	uv_signal_start(&sig_int_handle, sig_kill_callback, SIGINT);
	//
	// Initialize the incoming connection handle.
	//
	uv_tcp_init(&Connect_Loop, &listen_handle);
	//
	// Bind to a listen address.
	//
	uv_ip4_addr(SERVER_ADDRESS, SERVER_PORT, &addr);
	//
	// host_to_ip(SERVER_NAME, ip_addr_buf, sizeof(ip_addr_buf) );
	// uv_ip4_addr(ip_addr_buf, SERVER_PORT, &addr);
	//
	uv_tcp_bind(&listen_handle, (const struct sockaddr *) &addr, 0);

	//
	// Wait for all the thread tasks to be initialized.
	//
	for(;;)
	  {

ENTER_MUTEX(&Protocol_D_Mutex);
		if(N_Tasks == 5)
		   {
EXIT_MUTEX(&Protocol_D_Mutex);
			break;
		   }
EXIT_MUTEX(&Protocol_D_Mutex);

	 	pthread_yield();
	  }

	syslog(LOG_INFO, "DAEMON: All Spawned Tasks Running");

	//
	// Configure the listen socket.
	//
	if( (r = uv_listen((uv_stream_t*) &listen_handle, MAX_CONN_DESC, make_incoming_connection)) )
        	fatal_error(LOG_ERR, "MAIN_TASK: Can't Configure Listening Socket. Error %d: %s", r, uv_err_name(r));

	syslog(LOG_ALERT, "PEXD DAEMON: RUNNING");
	//
	// Wait for a connection.
	//
	for(;;)
	  {

    		r = uv_run(&Connect_Loop, UV_RUN_DEFAULT);
		if(r)
		  {
			// fprintf(stderr, "MAIN: Run Error %d: %s\n", uv_err_name(r));
			r = 0;
		  }
	  }

	return(r);

}

#ifdef LIBUV_DETACH

//
// FUNCTION - uv_thread_detach(): Create a detached thread task and put into execution.
//
// ARGUMENTS
// -----------
// tid: Points to memory for holding the Livuv thread ID.
// entry: The thread task.
// arg: Points to data to be passed to the task when it starts.
//
// RETURN VALUE
// --------------
// Returns 0 if successful and a Libuv error code otherwise.
//
// NOTES
// -------
// This routine is kludge which supplies the same functionality as the uv_thread_detach() API.
// It is necessary because some version of Libuv don't include this functionality.
//
ROUTINE int uv_thread_detached_create(uv_thread_t *tid, void (*entry )(void * ), void *arg )
{
	int rval;

	rval = uv_thread_create(tid, entry, arg);
	if(rval != 0)
		rval = pthread_detach(*tid);

	return(rval);
}

#endif // LIBUV_DETACH

//
// FUNCTION - make_incoming_connection(): A callback routine that executes when an incoming connection event occurs.
//
// ARGUMENTS
// -----------
// listen_handle: The TCP listen socket handle.
// status: The value is 0 when a connection detected without errors.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -----
// The execution of this routine must be preconfigured with Libuv uv_listen().
//
ROUTINE PRIVATE void make_incoming_connection(uv_stream_t *listen_handle, int status)
{
	ASYNC_DATA poll_data;
	CONN_DESC *cdesc;
	uv_tcp_t *conn_handle;
	int r;

	BOOL reject = FALSE;


	if (status == -1)
		fatal_error(LOG_ERR, "MAIN_TASK: Invalid Incoming Connection Status");

// printf("NEW_CONN ATTEMPT: N %d\n", N_Sockets);
// fflush(stdout);

	//
	// If this routine is called before the work from the last invocation is finished,
	// we have no choice but to reject the connection.
	//
	if(Connect_Accept_Busy)
		reject = TRUE;
	else
		Connect_Accept_Busy = TRUE;
	//
	// Initialize the connection handle.
	//
    	conn_handle = (uv_tcp_t *) VALLOC(sizeof(uv_tcp_t));
	uv_tcp_init(&Connect_Loop, conn_handle);

	if((r = uv_accept(listen_handle, (uv_stream_t *) conn_handle)) == 0)
	   {
		int nsock;
		//
		// A new connection occured.
		//
        	uv_os_fd_t fd;

ENTER_MUTEX(&Protocol_D_Mutex);
		nsock = N_Sockets++;
EXIT_MUTEX(&Protocol_D_Mutex);

		if(reject || nsock >= MAX_CONN_DESC)
		  {
			syslog(LOG_DEBUG, "MAIN_TASK: CONNECTION REJECTION - Transient, N Sockets = %d", nsock);

ENTER_MUTEX(&Protocol_D_Mutex);
			N_Sockets--;
EXIT_MUTEX(&Protocol_D_Mutex);
fprintf(stderr, "xxxxxxxxxxxxx REJECTION DONE %d xxxxxxxxxxxxxxxxxx\n", N_Sockets);
			uv_close((uv_handle_t *) conn_handle, forced_close_callback);
			return;
		  }
		//
		// Fetch the socket descriptor from the connection handle.
		//
		uv_fileno((const uv_handle_t*) conn_handle, &fd);
		//
		// Allocate the connection descriptor.
		//
		cdesc = ALLOC_CONN_DESC(fd);
		if( !cdesc )
		  {
			syslog(LOG_DEBUG, "MAIN_TASK: CONNECTION REJECTION - No Connection Descriptors, N =  %d", nsock);

ENTER_MUTEX(&Protocol_D_Mutex);
			N_Sockets--;

			close(fd);
EXIT_MUTEX(&Protocol_D_Mutex);
			uv_close((uv_handle_t *) conn_handle, forced_close_callback);
			return;
		  }

#ifdef USE_SSL
		if( !(cdesc->ssl_socket = Ssl_Accept(cdesc, fd)) )
		  {
ENTER_MUTEX(&Protocol_D_Mutex);
			N_Sockets--;

			close(cdesc->fd);
			cdesc->fd = -1;

			DELETE_CONN(cdesc);
EXIT_MUTEX(&Protocol_D_Mutex);
			uv_close((uv_handle_t *) conn_handle, forced_close_callback);
			return;
		  }
#endif // USE_SSL

		//
		// Save the connection handle and start polling.
		//
		cdesc->conn_handle = (uv_tcp_t *) conn_handle;

		syslog(LOG_INFO, "MAIN_TASK: NEW CONNECTION ESTABLISHED - CONN %d FD %d", cdesc->index, cdesc->fd);
		//
		// Set up epoll() plumbing by sending a message to IO_Task();
		//
		bzero((void *) &poll_data, sizeof(ASYNC_DATA));
		poll_data.type = T_POLL_CONFIG_REQ;
		poll_data.async_handle = &IO_Task_Async_Handle;
		poll_data.async_q = &IO_Task_Input_Q;
		poll_data.object_handle = NULL;

fprintf(stderr, "xxxxxxxxxxxxxxxx DOING CONNECT xxxxxxxxxxxxxxxxxxxxxx\n");
		//
		// The T_POLL_CONFIG_RSP message will be sent to the Protocol_Task() which
		// is in S_IDLE state.
		//
		send_async_msg(cdesc, &poll_data);
	   }
	else
		fatal_error(LOG_ERR, "MAIN_TASK: CONNECTION REJECTION - Libuv Accept Failure. Error %d: %s", r, uv_err_name(r));

	Connect_Accept_Busy = FALSE;

	return;
}


//
// FUNCTION - connect_proxy(): Dismantles and releases a Libuv connection handle on behalf of another task.
// The proxy routine executes in the main() process/task bound to the Connect_Loop.
//
// ARGUMENTS
//-----------
// handle: The async. channel handle. (Connect_Task_Async_Handle)
//
// RETURN VALUE
// -----------
// None
//
// NOTES
// --------
// Execution of the routine is triggered by  a call to async_send() in the Protocol_Task().
//
ROUTINE PRIVATE void connect_proxy(uv_async_t *handle)
{
	CONN_DESC *cdesc;
	MSG *msg;
	uv_tcp_t *conn_handle;
	uv_handle_t **ppdata;
	BOOL done;

	//
	// Avoid clobbering a previous invocation of this routine.
	//
	if(Connect_Proxy_Busy)
		return;

	Connect_Proxy_Busy = TRUE;

	//
	// Handle all messages from the Protocol_Task()
	//
	done = FALSE;
	while(done != TRUE)
	  {
ENTER_MUTEX(&Async_Mutex);
		msg = remove_msg(&Connect_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);

		if(msg)
		  {
			cdesc = (CONN_DESC *) msg->conn_desc;
			
			syslog(LOG_DEBUG, "MAIN_TASK(Proxy): CONN[%d] Type %d\n", cdesc->index, msg->type);

			switch(msg->type)
			{
			case T_CONN_DISMANTLE_REQ:
				//
				// Release a uv_tcp_t connection handle.
				// The protocol task is notified by async_close_callback()
				// when the operation is complete.
				//
				ppdata = (uv_handle_t **) &msg->buffer[0];
				conn_handle = (uv_tcp_t *) *ppdata;

				conn_handle->data = (void *) cdesc;
				uv_close((uv_handle_t *) conn_handle, async_close_callback);
				break;
			default:
				fatal_error(LOG_EMERG, "MAIN_TASK: CONNECT_PROXY - Invalid Message = %d", msg->type);
			}

			MSG_FREE(msg);
		  }
		else
			done = TRUE;
	  }

	Connect_Proxy_Busy = FALSE;

	return;
}


//
// FUNCTION - force_close_callback(): Deallocate a Libuv handle.
//
// ARGUMENTS
//-----------
// handle: The handle.
//
// NOTES
// ------
// Releases a uv_tcp_t handle in the case it was allocated before an incoming connection was rejected.
//
ROUTINE PRIVATE void forced_close_callback(uv_handle_t *handle)
{

	VFREE((UCHAR *) handle);

// printf("ASYNC[X]: REJECT COMPLETE\n");
// fflush(stdout);
	//
	// Setting this variable indicates the handle has been freed.
	//
	Connect_Accept_Busy = FALSE;

	return;
}

#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE

//
// **********************************************************
// ******************** DATA ***********************************
// **************************************************************
//

//
// This data is used to indicate when the task is initialized.
//
extern int			 N_Tasks;
extern uv_mutex_t		Protocol_D_Mutex;

//
// Protects the IO_Task_Input_Q.
//
extern uv_mutex_t		Async_Mutex;

//
// IO_Task() async. opertion handle.
//
extern uv_async_t		IO_Task_Async_Handle;
//
// IO_Task() asyc. message queue.
//
extern MSG_FIFO			IO_Task_Input_Q;

//
// Loop for handling epoll() read I/O events.
//
PRIVATE uv_loop_t	Poll_Loop;
//
// TRUE when the Poll proxy routine is executing.
//
PRIVATE BOOL		Poll_Proxy_Busy;

//
// *******************************************************
// *********************** PROTOTYPES *******************
// *****************************************************
//
PRIVATE void poll_callback(uv_poll_t *, int , int );
PRIVATE void poll_proxy(uv_async_t *);
void async_close_callback(uv_handle_t *);

#ifdef USE_SSL
int Ssl_RxData(SSL *, UCHAR *, int );
#endif // USE_SSL

DECODE_RESULT rx_sdu(CONN_DESC *, int);

void SendProtoMsg(CONN_DESC *, MSG *);

MSG *remove_msg(MSG_FIFO *);

MSG *MSG_ALLOC(int, BOOL);
void MSG_FREE(MSG *);

UCHAR *VALLOC(int n);
void VFREE(UCHAR *);

void fatal_error(int , const char *, ...);

//
// Monitor connections for incoming data with epoll()
//
TASK void IO_Task(void *arg)
{
	int r;

	uv_loop_init(&Poll_Loop);
	//
	// Initialize communications from the main() process and Protocol_Task() to the IO_Task().
	// The main() and Protocol_Task() tasks insert message  on the IO_Task_Input_Q just prior
	// to invoking uv_async_send().
	//
	IO_Task_Input_Q.head = IO_Task_Input_Q.tail = NULL;
	uv_async_init(&Poll_Loop, &IO_Task_Async_Handle, poll_proxy);

ENTER_MUTEX(&Protocol_D_Mutex);
	N_Tasks++;
EXIT_MUTEX(&Protocol_D_Mutex);

	syslog(LOG_INFO, "IO_TASK: Started");

	for(;;)
	  {
		r = uv_run(&Poll_Loop, UV_RUN_DEFAULT);
		if(r)
		  {
			syslog(LOG_ERR,  "IO_TASK: Run Error %d", r);
			r = 0;
		  }
	  }

	return;
}

//
// FUNCTION - poll_callback(): A callback routine that executes when incoming data is available.
// and reads the  data.
//
// ARGUMENTS
// ---------------
// poll_handle: The Libuv poll handle.
// status: The value is 0 if execution occurs due to incoming data.
// events: The value should always be UV_READABLE, which means incoming
// data is available, if status is 0.
//
// RETURNS
//--------
// None
//
// NOTES
// ------
// This routine is invoked by the Libuv epoll() mechanism.
//
ROUTINE PRIVATE void poll_callback(uv_poll_t *poll_handle, int status, int events)
{
	if(status == 0)
	  {
		if(events & UV_READABLE)
		   {
			//
			// Since the right connection descriptor is known, all you
			// neeed to to is read data into the connection descriptor buffer.
			//
			int n;
			MSG *sdu;

			CONN_DESC *cdesc = (CONN_DESC *)  poll_handle->data;

#ifdef USE_SSL
			n = Ssl_RxData(cdesc->ssl_socket, cdesc->rx_buf, RX_BUF_SIZE);
#else // USE_SSL
			n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE, MSG_DONTWAIT);
#endif // USE_SSL

			if(n > 0)
			   {
				DECODE_RESULT r;
				//
				// Recognize the SDU in the usual way.
				// If the SDU is complete, send the it to the Protocol_Task().
				//
				r = rx_sdu(cdesc, n);
				switch(r)
				{
				case RX_PARTIAL:
				case RX_COMPLETE:
				case RX_EMPTY:
					break;
				case RX_OVERFLOW:
				case RX_NOMEMORY:
					//
					// There is no choice but to send an abort, reinitialize everything,
					// and release the connection.
					//
					syslog(LOG_ERR, "IO_TASK: Irrecoverable SDU Reception Error %d", r);

					sdu = MSG_ALLOC(0, FALSE);
					sdu->class = C_NOTIFY;
					sdu->type = T_FAULT;
					sdu->info = (int) r;

					SendProtoMsg(cdesc, sdu);
					break;
				default:
					fatal_error(LOG_EMERG, "IO_TASK: Invalid SDU Recognition Code, %d", r);
				}

			   }
			else
			if(n < 0)
			  {
#ifdef USE_SSL
				MSG *msg;

				n = 0;
				syslog(LOG_INFO, "IO_TASK: RX DISCONNECT %d - %s !", errno, strerror(errno) );

				msg = MSG_ALLOC(0, FALSE);
				msg->class = C_NOTIFY;
				msg->type = T_DISCONNECT;
				msg->info = 0;
				SendProtoMsg(cdesc, msg);
#else // USE_SSL
				n = 0;
				if( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) )
				  {
					MSG *msg;

					syslog(LOG_INFO, "IO_TASK: RX DISCONNECT %d - %s !", errno, strerror(errno) );

					msg = MSG_ALLOC(0, FALSE);
					msg->class = C_NOTIFY;
					msg->type = T_DISCONNECT;
					msg->info = 0;
					SendProtoMsg(cdesc, msg);
				  }
#endif // USE_SSL
			  }
		   }
	  }
	else
		syslog(LOG_ERR, "IO_TASK: poll_callback - Bad Status %d", status);


	return;
}


//
// FUNCTION - poll_proxy(): The proxy routine executes in the IO_Task() Poll Loop when
// it has unserviced async. messages from other tasks.
//
// ARGUMENTS
// -----------
// handle: A Libuv async. operation handle.
//
// RETURNS
// -------
// Nothing
//
// NOTES
// -----
// The main() and Protocol_Tasks() use the async. handled to send messages to the IO_Task().
// They do so by inserting messages in the IO_Task_Input_Q which is in shared memory.
//
ROUTINE PRIVATE void poll_proxy(uv_async_t *handle)
{
	CONN_DESC *cdesc;
	MSG *msg, *rsp;
	uv_poll_t *poll_handle;
	uv_handle_t **ppdata;
	int r;
	BOOL done;

	//
	// Do nothing if execution is already in progress.
	//
	if(Poll_Proxy_Busy)
		return;

	Poll_Proxy_Busy = TRUE;

	//
	// Handle messages from other tasks.
	//
	done = FALSE;
	while(done != TRUE)
	  {

ENTER_MUTEX(&Async_Mutex);
		msg = remove_msg(&IO_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);

		if(msg)
		  {
			cdesc = (CONN_DESC *) msg->conn_desc;

			syslog(LOG_DEBUG, "IO_TASK(Proxy): RX EVENT - CONN[%d] Type %d\n", cdesc->index, msg->type);

			switch(msg->type)
			{
			case T_POLL_CONFIG_REQ:
				//
				// Main process request to start Libuv/epoll() operation.
				//
				poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));

				if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
				  {

					fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY -  Polling Initialization Error %d, %s",
					 	r, uv_err_name(r));
				  }


				if( !poll_handle->loop )
				  {
					fprintf(stderr, "BUG: POLL LOOP NOT SET\n");
					abort();
				  }

				poll_handle->data = (void *) cdesc;

				if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
				  {
					fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY -  Polling Initiation Error %d, %s",
						 r, uv_err_name(r));
				  }
				//
				// Notify the Protocol_Task(), that polling has started.
				//
				rsp = MSG_ALLOC(0, FALSE);

				rsp->class = C_NOTIFY;
				rsp->type = T_POLL_CONFIG_RSP;
				rsp->info = 0;
				rsp->conn_desc = (void *) poll_handle;

				SendProtoMsg(cdesc, rsp);

				break;
			case T_POLL_DISMANTLE_REQ:
				//
				// Protocol_Task() request to cease Libuv/epoll() operation and
				// release the poll handle and its resources.
				//
				ppdata = (uv_handle_t **) &msg->buffer[0];
				poll_handle = (uv_poll_t *) *ppdata;

				if( (r = uv_poll_stop(poll_handle)) )
				  {
					fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Poll Stop Error %d, %s",
						 r, uv_err_name(r) );
				  }
				//
				// The callback routine notifies the Protocol_Task() when everything is done.
				//
				poll_handle->data = (void *) cdesc;
				uv_close((uv_handle_t *) poll_handle, async_close_callback);

				break;
			default:
				fatal_error(LOG_EMERG, "IO_TASK: POLL_PROXY - Invalid Message = %d", msg->type);

			}

			MSG_FREE(msg);
		  }
		else
			done = TRUE;
	  }

	Poll_Proxy_Busy = FALSE;

	return;
}

#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"


//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE

#define MAX_RETRY	3


//
// ********************************************************************************
// ******************************** DATA ******************************************
// ********************************************************************************
//
extern int		N_Sockets;

extern uv_mutex_t	Service_Q_Mutex;
extern uv_mutex_t	Protocol_D_Mutex;
extern uv_mutex_t	DataXfer_Mutex;

//
// The main process async. operation channel.
//
extern uv_async_t	Connect_Task_Async_Handle;
//
// The main() process async. message input queue.
//
extern MSG_FIFO		Connect_Task_Input_Q;
//
// IO_Task() async. opertion handle.
//
extern uv_async_t	IO_Task_Async_Handle;
//
// IO_Task() async. message input queue.
//
extern MSG_FIFO		IO_Task_Input_Q;

//
// *************************************************************************************
// ***************************** PROTOTYPES *********************************************
// *************************************************************************************
//
void rx_fsm(CONN_DESC *, MSG *);
void init_rx_fsm(CONN_DESC *cdesc, int);

void SendTxMsg(CONN_DESC *, MSG *);
void SendDbMsg(MSG *);

PRIVATE void xfer_fsm(CONN_DESC *, MSG *);

#ifdef USE_SCRAM
void scram_fsm(CONN_DESC *, MSG *);
void Scram_Finish(CONN_DESC *);
#endif // USE_SCRAM

void send_response(CONN_DESC * , BOOL , RESULT );
void do_abort(CONN_DESC * , MSG *, RESULT );

void start_conn_release(CONN_DESC *);

#ifdef USE_SSL
void Ssl_ReleaseConnect(CONN_DESC * );
#endif // USE_SSL

void send_async_msg(CONN_DESC *, const ASYNC_DATA *);

BOOL DELETE_CONN(CONN_DESC *);

void insert_msg(MSG_FIFO *, MSG *);
void flush_msg(MSG_FIFO *, uv_mutex_t *);

MSG *build_svc_abort(RESULT );
MSG *build_sdu_response(RESULT , RESULT );

BOOL handle_sys_info(CONN_DESC *, MSG *);
BOOL handle_prof_info(CONN_DESC *, MSG *);
BOOL handle_parm_info(CONN_DESC *, MSG *);
BOOL handle_metric_info(CONN_DESC *, MSG *);
BOOL handle_binary_data(CONN_DESC *, MSG_FIFO *, BOOL );

BOOL PRIVATE handle_db_rsp(CONN_DESC *, MSG *);

RESULT parse_data_xfer(MSG *);
RESULT parse_sdu(MSG *);

TIMER_NODE *TIMER_START(CONN_DESC *, TIMER_DATA *);
void TIMER_STOP(TIMER_NODE *);
BOOL TIMER_CANCELED(MSG *);

MSG *MSG_ALLOC(int, BOOL );
void MSG_FREE(MSG * );

void fatal_error(int , const char *, ...);

//
// Handle protocol messages from all sources.
//
int NEvent = 0;		// Only for visual convenience.
//
// FUNCTION - rx_fsm(): Implements the state machine described by the PEX MOBILE/SERVER DATA TRANSFER FRAMEWORK
// protocol specification on a per connection basis. It handles, SDU messages, generated by the IO_Task(), and
// a variety of messages  generated by other Daemon Tasks including those for Libuv operational management and
// control.
//
// ARGUMENTS
// -----------
// cdesc: The connection descriptor to which invocation applies.
// sdu: A message.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -------
// The term SDU denotes a protocol message receieved from the network and are sent by the Protocol_Task().
// Other kinds of  messages handled by this routine are generated by the following tasks: Main Task/Process,
// IO_Task(), DB_Task(), and Timer_Task().
//
extern uv_loop_t Poll_Loop;	// KLUDGE
ROUTINE void rx_fsm(CONN_DESC *cdesc, MSG *sdu)
{
	ASYNC_DATA parm;
	TIMER_DATA tdata;
	RESULT r;
	BOOL done = FALSE;

	// printf("***********************************************************************\n");

	syslog(LOG_DEBUG, "PROTO_TASK: RX EVENT %d - CONN[%d] State %d Class %d Type %d", NEvent++,
		  cdesc->index, cdesc->state, sdu->class, sdu->type);

	//
	// First, parse the SDU
	//
	switch(sdu->class)
	{
	case C_SDU:
		switch(sdu->type)
		{
		case T_SVC_READY:
			//
			// Nothing to do - No fields.
			//
			r = R_SUCCESS;
			break;
		case T_DATA_XFER:
			//
			// Special case.
			//
			r = parse_data_xfer(sdu);
			if(r != R_SUCCESS)
			  {
				syslog(LOG_ERR, "PROTO_TASK: Corrupt SDU - DATA_XFER %d Error %d", sdu->type, r);

				do_abort(cdesc, sdu, r);
				MSG_FREE(sdu);
				return;
			  }
			break;
		default:
			//
			// For now just send an abort and reinitialize everything
			// if the SDU is corrupt.
			//
			r = parse_sdu(sdu);
			if(r != R_SUCCESS)
			  {
				syslog(LOG_ERR, "PROTO_TASK: Corrupt SDU - Type %d Error %d", sdu->type, r);

				do_abort(cdesc, sdu, r);
				MSG_FREE(sdu);
				return;
			  }
		}

		break;
	case C_TIMER:
		//
		// Ignore timers that have been cancelled with TIMER_STOP().
		//
		if(TIMER_CANCELED(sdu))
		  {
			if(cdesc->active_timer)
			  {
				TIMER_STOP(cdesc->active_timer);
				cdesc->active_timer = NULL;
			  }
			MSG_FREE(sdu);
			return;
		  }
		break;
	case C_NOTIFY:
		break;
	default:
		fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Event Class %d", sdu->class);
	}


ENTER_MUTEX(&Service_Q_Mutex);
	if(cdesc->fd < 0)
		fatal_error(LOG_EMERG, "PROTO_TASK: No FD Allocated");
EXIT_MUTEX(&Service_Q_Mutex);

	switch(cdesc->state)
	{
	case S_IDLE:
		//
		// Wait for the IO_Task() to inform you the incoming connection is complete.
		// The save the uv_poll_t poll handle used by the IO_Task() for last connection
		// dismantlement.
		//
		switch(sdu->type)
		{
		case T_POLL_CONFIG_RSP:
//
// printf("ASYNC[%d]: CONNECTION CONFIGUATION COMPLETE\n", cdesc->index);
// fflush(stdout);
//
			cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
			cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
			cdesc->poll_handle = (uv_poll_t *) sdu->conn_desc;

			break;
		case T_SHUTDOWN:
ENTER_MUTEX(&Service_Q_Mutex);
			if(N_Sockets <= 0)
			  {
				syslog(LOG_ALERT, "PEXD DAEMON: Service Terminated");
				exit(0);
			  }
EXIT_MUTEX(&Service_Q_Mutex);
			break;
		default:
			syslog(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
		}

		break;

	case S_DISMANTLE:
		//
		// Connection dismantlement is in progress.
		//
// printf("CONN %d: Release State %d\n", cdesc->index, cdesc->release_state);

		switch(cdesc->release_state)
		{
		case RS_POLL_FREE:

			switch(sdu->type)
			{
			case T_DISMANTLE_RSP:
// printf("ASYNC[%d]: POLLING TERMINATED\n", cdesc->index);
// fflush(stdout);
// printf("ASYNC[%d]: START CONNECTION TERMINATION\n", cdesc->index);
// fflush(stdout);
				cdesc->release_state = RS_CONN_FREE;
				//
				// Polling is terminated - Now release the connection.
				// This is done in the main() process Connect_Loop by the connect_proxy().
				// It responds with a T_DISMANTLE_RSP message when everything is done.
				//
				bzero((void *) &parm, sizeof(ASYNC_DATA));
				parm.type = T_CONN_DISMANTLE_REQ;
				parm.async_handle = &Connect_Task_Async_Handle;
				parm.async_q = &Connect_Task_Input_Q;
				parm.object_handle = (uv_handle_t *) cdesc->conn_handle;

				send_async_msg(cdesc, &parm);

				break;
			case T_SHUTDOWN:
				cdesc->shutdown = TRUE;
			case T_SVC_ABORT:
			case T_DISCONNECT:
			case T_FAULT:
				//
				// Ignore these messages as connection release is already in progress.
				//
				break;
			default:
				fatal_error(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
			}

			break;

		case RS_CONN_FREE:

			switch(sdu->type)
			{
			case T_DISMANTLE_RSP:
// printf("ASYNC[%d]: CONNECTION TERMINATED\n", cdesc->index);
// fflush(stdout);
				syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - CONNECTION RELEASE COMPLETE", cdesc->index);
				//
				// Notify the Database Task and wait for a response.
				// The final release of resources will occur after
				// the Database Task responds.
				//
				if(cdesc->service_down != TRUE)
				   {
					MSG *msg;

					cdesc->service_down = TRUE;

					msg = MSG_ALLOC(0, FALSE);
					msg->class = C_DB;
					msg->type = T_DB_SVC_DOWN;
					msg->conn_desc = (void *) cdesc;
					msg->info = 0;
					SendDbMsg(msg);

					break;
				   }
			case T_DB_RESULT:
				//
				// Final release of resources.
				//
				if(cdesc->service_down)
					cdesc->service_down = FALSE;
				else
					fatal_error(LOG_EMERG, "PROTO_TASK: Premature Service Down Response");
				
ENTER_MUTEX(&Protocol_D_Mutex);
ENTER_MUTEX(&Service_Q_Mutex);
				cdesc->state = S_IDLE;
				cdesc->release_state = RS_IDLE;
				done = cdesc->shutdown;
				//
				// The connection and socket have been released.
				// Free the connection descriptor and remove it
				// from the Service Queue.
				//
				if(N_Sockets > 0)
					N_Sockets--;

				DELETE_CONN(cdesc);
#ifdef USE_SSL
				Ssl_ReleaseConnect(cdesc);
#endif // USE_SSL
				flush_msg(&cdesc->task_input_q, &DataXfer_Mutex);
EXIT_MUTEX(&Service_Q_Mutex);
				//
				// TBD: LIVUB may have already closed the fd.
				//
				close(cdesc->fd);
				cdesc->fd = -1;
				if(done && N_Sockets <= 0)
				  {
					syslog(LOG_ALERT, "PEXD DAEMON: Service Terminated");
					exit(0);
				  }
EXIT_MUTEX(&Protocol_D_Mutex);

// printf("BYE !\n");
// fflush(stdout);
				break;

			case T_SHUTDOWN:
				cdesc->shutdown = TRUE;
			case T_SVC_ABORT:
			case T_DISCONNECT:
			case T_FAULT:
				//
				// Ignore these messages as connection release is already in progress.
				//
				break;
			default:
				fatal_error(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
			}

			break;

		default:
			fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Release State %d", cdesc->release_state);
		}

		break;

	case S_STANDBY:

		syslog(LOG_DEBUG, "PROTO_TASK: CONN[%d] - Release State %d", cdesc->index, cdesc->release_state);

		switch(sdu->type)
		{
		case T_SVC_READY:
			//
			// Acknowledge the SDU with positive SDU_RESPONSE.
			//
#ifdef USE_SCRAM
			cdesc->state = S_AUTHENTICATE;
#else // USE_SCRAM
			cdesc->state = S_ACTIVE;
#endif // USE_SCRAM
			send_response(cdesc, TRUE, 0);
			break;
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize if the timer is not running
			// and cancel the RELEASE_TIMER if it is running.
			// Otherwise, ignore this message as it is a message in flight.
			//
			if( !cdesc->active_timer )
			  {
				start_conn_release(cdesc);
			  }
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_DISCONNECT:
		case T_FAULT:
			start_conn_release(cdesc);
			break;
		case T_RELEASE_TIMER:
			//
			// Terminate service and release the TCP connection.
			//
// printf("Final Timeout\n");
// fflush(stdout);
			TIMER_STOP(cdesc->active_timer);
			cdesc->active_timer = NULL;
			start_conn_release(cdesc);
			break;

		default:
			//
			// Protocol Error: Send an abort and reinitialize if the timer
			// is not running and o start the T_RELEASE timer in this case.
			// Otherwise, ignore this message as it is a message in flight.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->active_timer)
				do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;

#ifdef USE_SCRAM
	case S_AUTHENTICATE:
		switch(sdu->type)
		{
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			Scram_Finish(cdesc);
			start_conn_release(cdesc);
			break;
		default:
			scram_fsm(cdesc, sdu);
			sdu = NULL;
		}

		break;
#endif // USE_SCRAM

	case S_ACTIVE:
		switch(sdu->type)
		{
		case T_SYS_INFO:
			//
			// Acknowledge the SDU with a positive SDU_RESPONSE
			// if the system ID information is valid. Otherwise,
			// send a negative SDU_RESPONSE and return to STANDBY state.
			// Start the T_RELEASE timer if the response is negative.
			//
			// The first step is start a DB operation and wait for the result.
			//
			if(handle_sys_info(cdesc, sdu) != TRUE)
			  {
				cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
				cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

				tdata.task = PROTOCOL_TASK;
				tdata.type = T_RELEASE_TIMER;
				tdata.period = RELEASE_TIMER_PERIOD;
				cdesc->active_timer = TIMER_START(cdesc, &tdata);

				send_response(cdesc, FALSE, R_FAILURE);
			  }
			sdu = NULL;
			break;
		case T_DB_RESULT:
// printf("SYNC[%d] - SYS_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
			if(handle_db_rsp(cdesc, sdu))
			  {
				cdesc->state = S_VERIFIED;
				send_response(cdesc, TRUE, 0);
			  }
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			// if no DB operation is in progress.
			//
			if(cdesc->db_op)
				cdesc->defer_sdu_op = sdu->type;
			else
			  {
				start_conn_release(cdesc);
			  }
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize
			// if no DB operation is in progress.
			// You need to start the T_RELEASE timer in this case.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_UNKNOWN;
			else
				do_abort(cdesc, sdu, R_PROTOCOL);
		}


		break;
	case S_VERIFIED:
		switch(sdu->type)
		{
		case T_PROF_INFO:
			//
			//
			// Acknowledge the SDU with positive SDU_RESPONSE
			// if the subject profile information is valid. Otherwise,
			// send a negative SDU_RESPONSE and return to STANDBY state.
			// Start the T_RELEASE timer if the response is negative.
			//
			//
			// The first step is start a DB operation and wait for the result.
			//
			if(handle_prof_info(cdesc, sdu) != TRUE)
			  {
				cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
				cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

				tdata.task = PROTOCOL_TASK;
				tdata.type = T_RELEASE_TIMER;
				tdata.period = RELEASE_TIMER_PERIOD;
				cdesc->active_timer = TIMER_START(cdesc, &tdata);
									 
				send_response(cdesc, FALSE, R_FAILURE);
			  }
			sdu = NULL;
			break;
		case T_DB_RESULT:
// printf("SYNC[%d] - PROF_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
			if(handle_db_rsp(cdesc, sdu))
			  {
				cdesc->state = S_PROFILED;
				send_response(cdesc, TRUE, 0);
			  }
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			// if no DB operation is in progress.
			//
			if(cdesc->db_op)
				cdesc->defer_sdu_op = sdu->type;
			else
			  {
				start_conn_release(cdesc);
			  }
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize
			// if no DB operation is in progress.
			// You need to start the T_RELEASE timer in this case.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_UNKNOWN;
			else
				do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;
	case S_PROFILED:
		switch(sdu->type)
		{
		case T_PARM_INFO:
			//
			//
			// Acknowledge the SDU with positive SDU_RESPONSE
			// if the session parameter information is valid. Otherwise,
			// send a negative SDU_RESPONSE and return to STANDBY state.
			// Start the T_RELEASE timer if the response is negative.
			//
			// The first step is start a DB operation and wait for the result.
			//
			if(handle_parm_info(cdesc, sdu) != TRUE)
			  {
				cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
				cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

				tdata.task = PROTOCOL_TASK;
				tdata.type = T_RELEASE_TIMER;
				tdata.period = RELEASE_TIMER_PERIOD;
				cdesc->active_timer = TIMER_START(cdesc, &tdata);

				send_response(cdesc, FALSE, R_FAILURE);
			  }
			sdu = NULL;
			break;
		case T_DB_RESULT:
// printf("SYNC[%d] - PARM_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
			if(handle_db_rsp(cdesc, sdu))
			  {
				cdesc->state = S_CONFIGURED;
				send_response(cdesc, TRUE, 0);
			  }
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			// if no DB operation is in progress.
			//
			if(cdesc->db_op)
				cdesc->defer_sdu_op = sdu->type;
			else
			  {
				start_conn_release(cdesc);
			  }
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize
			// if no DB operation is in progress.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_UNKNOWN;
			else
				do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;
	case S_CONFIGURED:
		switch(sdu->type)
		{
		case T_METRIC_INFO:
			//
			//
			// Acknowledge the SDU with positive SDU_RESPONSE
			// if the session metric data is valid. Otherwise,
			// send a negative SDU_RESPONSE and return to STANDBY state.
			// Start the T_RELEASE timer if the response is negative.
			//
			// The first step is start a DB operation and wait for the result.
			//
			if(handle_metric_info(cdesc, sdu) != TRUE)
			  {
				cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
				cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

				tdata.task = PROTOCOL_TASK;
				tdata.type = T_RELEASE_TIMER;
				tdata.period = RELEASE_TIMER_PERIOD;
				cdesc->active_timer = TIMER_START(cdesc, &tdata);

				send_response(cdesc, FALSE, R_FAILURE);
			  }
			sdu = NULL;
			break;
		case T_DB_RESULT:
// printf("SYNC[%d] - METRIC_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
			if(handle_db_rsp(cdesc, sdu))
			  {
				cdesc->state = S_READY;
				send_response(cdesc, TRUE, 0);
			  }
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			// if no DB operation is in progress.
			//
			if(cdesc->db_op)
				cdesc->defer_sdu_op = sdu->type;
			else
			  {
				start_conn_release(cdesc);
			  }
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize
			// if no DB operation is in progress.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_UNKNOWN;
			else
				do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;
	case S_READY:
		//
		// We can either begin or finish dataset transfer in this state.
		//
		// If the T_SDU timer is running cancel it.
		//
		// IMPORTANT: Set sdu to NULL if you queue it or free it in this state.
		//
		if(cdesc->active_timer)
		   {
			TIMER_STOP(cdesc->active_timer);
			cdesc->active_timer = NULL;
		   }

		switch(sdu->type)
		{
		case T_SVC_DONE:
			syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - SESSION COMPLETE", cdesc->index);
			//
			// All transactions succceeded - Send a positive SDU_RESPONSE and start the SDU timer.
			//
			cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
			cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

			if(cdesc->final_dataset)
				cdesc->final_dataset = FALSE;

			cdesc->xfer_state = X_IDLE;
			cdesc->n_retry = 0;

			tdata.task = PROTOCOL_TASK;
			tdata.type = T_RELEASE_TIMER;
			tdata.period = RELEASE_TIMER_PERIOD;
			cdesc->active_timer = TIMER_START(cdesc, &tdata);

if(cdesc->poll_handle == NULL)
{
	fprintf(stderr, "BUGGG 4\n");
	abort();
}

			send_response(cdesc, TRUE, 0);
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			//
			start_conn_release(cdesc);
			break;
		case T_DATA_XFER:
			//
			// Start dataset transfer.
			//
			cdesc->state = S_TRANSFER;
			xfer_fsm(cdesc, sdu);
			sdu = NULL;
			break;
		case T_SDU_TIMER:
			//
			// Mobile didn't respond to a retry SDU_RESPONSE
			// sent to the Mobile. You can ask for retranmission again or
			// send an abort.  Manage N_Retry in both cases.
			// You should only request retransmission if N_Retry < MAX_RETRY
			//
			// (We didn't see a new DATA_XFER.)
			//
			if(cdesc->n_retry++ < MAX_RETRY)
			  {
				cdesc->state = S_READY;
				cdesc->xfer_state = X_IDLE;

				tdata.task = PROTOCOL_TASK;
				tdata.type = T_SDU_TIMER;
				tdata.period = SDU_TIMER_PERIOD;
				cdesc->active_timer = TIMER_START(cdesc, &tdata);

				send_response(cdesc, FALSE, R_RETRY);
			  }
			else
				do_abort(cdesc, sdu, R_SEQUENCE);
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize.
			// You need to start the T_RELEASE timer in this case.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;
	case S_TRANSFER:
		//
		// Cancel the inactivity timer if it is running.
		//
		//
		// IMPORTANT: Set sdu to NULL if you queue it or free it in this state.
		//
		switch(sdu->type)
		{
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			//
			// Terminate service and reintialize 
			// if no DB operation is in progress.
			//
			if(cdesc->db_op)
				cdesc->defer_sdu_op = sdu->type;
			else
			  {
				start_conn_release(cdesc);
			  }
			break;
		case T_DATA_XFER:
			if(cdesc->active_timer)
			  {
				TIMER_STOP(cdesc->active_timer);
				cdesc->active_timer = NULL;
			  }
			xfer_fsm(cdesc, sdu);
			sdu = NULL;
			break;
		case T_DB_RESULT:
// printf("SYNC[%d] - XFER_DATA(Final Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
			cdesc->db_op = FALSE;
			switch(cdesc->defer_sdu_op)
			{
			case T_RAW_SDU:
// printf("SYNC[%d] - XFER_DONE\n", cdesc->index);
// fflush(stdout);
				cdesc->state = S_READY;
				if(sdu->info == 1)
					send_response(cdesc, TRUE, 0);
				else
					send_response(cdesc, FALSE, sdu->info);
if(cdesc->poll_handle == NULL)
{
	fprintf(stderr, "BUGGG 3\n");
	abort();
}
				break;
			case T_SHUTDOWN:
				cdesc->shutdown = TRUE;
			case T_FAULT:
			case T_DISCONNECT:
			case T_SVC_ABORT:
				start_conn_release(cdesc);
				break;
			case T_INACTIVITY_TIMER:
				do_abort(cdesc, sdu, R_INACTIVITY);
				break;
			case T_UNKNOWN:
				do_abort(cdesc, sdu, R_PROTOCOL);
				break;
			default:
				fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Deferred Operation %d",
					 cdesc->defer_sdu_op);

			}
			break;

		case T_INACTIVITY_TIMER:
			//
			// The Mobile appears to be dead. There
			// is no choice but to send and abort and
			// and start the T_RELEASE timer if no DB
			// operation is in progress.
			//
			// Otherwise, the DB operation is hung.
			//
			TIMER_STOP(cdesc->active_timer);
			cdesc->active_timer = NULL;

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_INACTIVITY_TIMER;
			else
				do_abort(cdesc, sdu, R_INACTIVITY);
			break;
		default:
			//
			// Protocol Error: Send an abort and reinitialize.
			// You need to start the T_RELEASE timer in this case.
			//
			syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);

			if(cdesc->db_op)
				cdesc->defer_sdu_op = T_UNKNOWN;
			else
				do_abort(cdesc, sdu, R_PROTOCOL);
		}

		break;
	default:
		fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Transaction State %d", cdesc->state);
	
	}

	if(sdu)
		MSG_FREE(sdu);

// printf("FINISHED: Final State %d\n\n", cdesc->state);

	return;
}


typedef enum data_xfer_action {
A_NEXT		=	0,
A_FINAL		=	1,
A_REJECT	=	2,
A_RETRY		=	3,
A_ABORT		=	4,
A_IGNORE	=	5,
A_UNDEFINED	=	6
} DATA_XFER_ACTION;


//
// FUNCTION - xfer_fsm(): Handles DATA_XFER SDU segments.
//
// ARGUMENTS
// -----------
// cdesc: A connection decriptor.
// sdu: A DATA_XFER SDU or segment of one.
//
// RETURN VALUE
// --------------
// None.
//
// NOTES
// -------
// None
//
ROUTINE PRIVATE void xfer_fsm(CONN_DESC *cdesc, MSG *sdu)
{
	DATA_XFER_SDU *body = (DATA_XFER_SDU *) sdu->body;
	TIMER_DATA tdata;
	DATA_XFER_ACTION action;

	syslog(LOG_DEBUG, "PROTO_TASK: RX SEG - State %d Type %d Segment %d",
			 cdesc->xfer_state, sdu->type, body->segment);

	if(sdu->type != T_DATA_XFER)
	  {
		//
		// This is a protocol error because only DATA_XFER SDUs are valid
		// in this state machine. Send and abort and transition to STANDBY state.
		// You need to start the T_RELEASE timer in this case.
		//
		do_abort(cdesc, sdu, R_PROTOCOL);
		return;
	  }

	action = A_UNDEFINED;

	switch(cdesc->xfer_state)
	{
	case X_IDLE:

		if(body->segment & SEG_START)
		  {
// printf("N_SAMPLES = %d\n", body->nsample);

			cdesc->expect_sn = 0;
			cdesc->n_retry = 0;


			if(body->data_type == FVC_INSPIRATORY)
				cdesc->final_dataset = TRUE;
			else
				cdesc->final_dataset = FALSE;

			if(body->segment & SEG_FINAL)
			  {
				//
				// This is a unitary segment. Acknowledge it if it valid.
				//
				action = A_FINAL;
			  }
			else
			  {
				//
				// Just wait for another segment.
				//
				action = A_NEXT;
				cdesc->xfer_state = X_START;
			  }
		  }
		else
		if(cdesc->n_retry > 0)
		  {
			//
			// This must be a message in flight. Discard it, and
			// wait for the 1st DATA_XFER segment.
			//
			action = A_IGNORE;
		  }
		else
		  {
			//
			// This is a protocol error because only initial DATA_XFER SDU segments are
			// valid in this state.
			//
			action = A_ABORT;
		  }
		break;
	case X_START:
		if(body->segment & SEG_MID)
		  {
			if(body->sn == cdesc->expect_sn)
			  {
				//
				// Just wait for another segment.
				//
				action = A_NEXT;
				cdesc->xfer_state = X_INTERMEDIATE;
			  }
			else
			if(cdesc->n_retry++ < MAX_RETRY)
			  {
				//
				// Invalid sequence number - Request retransmission.
				//
				action = A_RETRY;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			  {
				//
				// Invalid sequence number - Reject the DATA_XFER SDU.
				//
				action = A_REJECT;
				cdesc->xfer_state = X_IDLE;
			  }
		  }
		else
		if(body->segment & SEG_FINAL)
		  {
			if(body->sn == cdesc->expect_sn)
			  {
				//
				// The DATA_XFER SDU is complete. Acknowedge it if it is valid.
				//
				action = A_FINAL;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			if(cdesc->n_retry++ < MAX_RETRY)
			  {
				//
				// Invalid sequence number - Request retransmission.
				//
				action = A_RETRY;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			  {
				//
				// Invalid sequence number - Reject the DATA_XFER SDU.
				//
				action = A_REJECT;
				cdesc->xfer_state = X_IDLE;
			  }
			
		  }
		else
		  {
			//
			// This is a protocol error because only intermediate and final
			// segments are valid in this state.
			//
			action = A_ABORT;
		  }
		break;
	case X_INTERMEDIATE:
		if(body->segment & SEG_MID)
		  {
			//
			// If the sequence number is valie, just wait for another segment.
			//
			if(body->sn == cdesc->expect_sn)
				action = A_NEXT;
			else
			if(cdesc->n_retry++ < MAX_RETRY)
			  {
				//
				// Invalid sequence number - Request retransmission.
				//
				action = A_RETRY;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			  {
				//
				// Invalid sequence number - Reject the DATA_XFER SDU.
				//
				action = A_REJECT;
				cdesc->xfer_state = X_IDLE;
			  }
		  }
		else
		if(body->segment & SEG_FINAL)
		  {
			if(body->sn == cdesc->expect_sn)
			  {
				//
				// The DATA_XFER SDU is complete. Acknowledge it if it is valid.
				//
				action = A_FINAL;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			if(cdesc->n_retry++ < MAX_RETRY)
			  {
				//
				// Invalid sequence number - Request retransmission.
				//
				action = A_RETRY;
				cdesc->xfer_state = X_IDLE;
			  }
			else
			  {
				//
				// Invalid sequence number - Reject the DATA_XFER SDU.
				//
				action = A_REJECT;
				cdesc->xfer_state = X_IDLE;
			  }
		  }
		else
		  {
			//
			// This is a protocol error because only intermediate and final
			// segments are valid in this state.
			//
			action = A_ABORT;
		  }
		break;
	case X_FINAL:
		//
		// This state is conceptual for the Server and you should never
		// enter long enough to receive anything if things are done correctly.
		//
		cdesc->state = S_READY;
		break;
	default:
		fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Data Transfer State %d", cdesc->xfer_state);
	}

	switch(action)
	{
	case A_NEXT:
		//
		// The segment is valid and non-terminal. Start the intactivity timer
		// and wait for another segment.
		//
		cdesc->expect_sn++;

ENTER_MUTEX(&DataXfer_Mutex);
		insert_msg(&cdesc->bin_seg_q, sdu);
EXIT_MUTEX(&DataXfer_Mutex);
		sdu = NULL;

		tdata.task = PROTOCOL_TASK;
		tdata.type = T_INACTIVITY_TIMER;
		tdata.period = INACTIVITY_TIMER_PERIOD;
		cdesc->active_timer = TIMER_START(cdesc, &tdata);
		break;
	case A_FINAL:
		syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - DATASET COMPLETE", cdesc->index);
		//
		// The segment is valid and terminal or unitary. Acknowledge it
		// and wait for another message from the Mobile.
		//
		// Do something with the binary data and send a postive SDU_REPONSE.
		// The first step is to wait for the DB operation to compleet.
		// 
		cdesc->expect_sn = 0;
		cdesc->n_retry = 0;

ENTER_MUTEX(&DataXfer_Mutex);
		insert_msg(&cdesc->bin_seg_q, sdu);
EXIT_MUTEX(&DataXfer_Mutex);
		sdu = NULL;
		if(handle_binary_data(cdesc, &cdesc->bin_seg_q, cdesc->final_dataset) != TRUE)
		  {
			cdesc->state = S_READY;
			send_response(cdesc, FALSE, R_DB_FAILURE);
		  }
		break;
	case A_REJECT:
		//
		// The segment is invalid due to a bad sequence number and
		// the retry threshold has been reached. Send a negative response.
		//
		cdesc->state = S_READY;
		cdesc->expect_sn = 0;
		cdesc->n_retry = 0;

		flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);

		send_response(cdesc, FALSE, R_SEQUENCE);
		break;
	case A_RETRY:
		//
		// The segment is invalid due to a bad sequence number and the retry
		// threshold has not been reached. Request retransmission of
		// the entre DATA_XFER SDU.
		//
		cdesc->state = S_READY;
		cdesc->expect_sn = 0;
		flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);

		tdata.task = PROTOCOL_TASK;
		tdata.type = T_SDU_TIMER;
		tdata.period = SDU_TIMER_PERIOD;
		cdesc->active_timer = TIMER_START(cdesc, &tdata);

		send_response(cdesc, FALSE, R_RETRY);
		break;
	case A_ABORT:
		//
		// The segment has some kind of protocol error.
		//
		cdesc->final_dataset = FALSE;
		do_abort(cdesc, sdu, R_PROTOCOL);
		break;
	case A_IGNORE:
		//
		// Ignore the message and wait for an initial segment.
		//
		cdesc->state = S_READY;
		cdesc->expect_sn = 0;
		cdesc->final_dataset = FALSE;

		tdata.task = PROTOCOL_TASK;
		tdata.type = T_SDU_TIMER;
		tdata.period = SDU_TIMER_PERIOD;
		cdesc->active_timer = TIMER_START(cdesc, &tdata);
		break;
	case A_UNDEFINED:
		fatal_error(LOG_EMERG, "PROTO_TASK: No DATA_XFER Action");
	}

	if(sdu)
		MSG_FREE(sdu);

	return;
}

//
// FUNCTION - handle_db_rsp(): Handle a T_DB_RESPONSE SDU from the Database Task.
//
// ARGUMENTS
// -----------
// cdesc: A connection descriptor.
// sdu: The T_DB_RESPONSE message.
//
// RETURN VALUE
// --------------
// Returns TRUE if successful and FALSE otherwise.
//
// NOTES
// -------
// None
//
ROUTINE PRIVATE BOOL handle_db_rsp(CONN_DESC *cdesc, MSG *sdu)
{
	BOOL rval = FALSE;

	cdesc->db_op = FALSE;
	if(sdu->info == 1)
	  {
		//
		// DB operation succeeded.
		//
		switch(cdesc->defer_sdu_op)
		{
		case T_RAW_SDU:
			//
			// No interim message received.
			//
			rval = TRUE;
			break;
		case T_SHUTDOWN:
			cdesc->shutdown = TRUE;
		case T_FAULT:
		case T_DISCONNECT:
		case T_SVC_ABORT:
			start_conn_release(cdesc);
			break;
		case T_UNKNOWN:
			do_abort(cdesc, sdu, R_PROTOCOL);
			break;
		default:
			fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Deferred Operation %d", cdesc->defer_sdu_op);

		}
	  }
	else
		do_abort(cdesc, sdu, R_DB_FAILURE);

	cdesc->defer_sdu_op = T_RAW_SDU;

	return(rval);
}

//
// Send and SDU_RESPONSE message to the peer.
//
ROUTINE void send_response(CONN_DESC *cdesc, BOOL positive, RESULT code)
{
	MSG *msg;
	RESULT result;

	if(positive)
	  {
		result = R_SUCCESS;
	  }
	else
		result = R_FAILURE;

	msg = build_sdu_response(result, code);

	syslog(LOG_DEBUG, "PROTO_TASK: TX EVENT %d - CONN[%d] State %d Class %d Type %d", NEvent++,
		 cdesc->index, cdesc->state, msg->class, msg->type);

	SendTxMsg(cdesc, msg);

	return;
}


//
// Send an SVC_ABORT message to the peer.
//
ROUTINE void do_abort(CONN_DESC *cdesc, MSG *sdu, RESULT code)
{
	MSG *msg;
	TIMER_DATA tdata;

	cdesc->state = S_STANDBY;
	cdesc->xfer_state = X_IDLE;
#ifdef USE_SCRAM
	cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM

	cdesc->db_op = FALSE;
	cdesc->defer_sdu_op = T_RAW_SDU;

	if(cdesc->active_timer)
	  {
		TIMER_STOP(cdesc->active_timer);
		cdesc->active_timer = NULL;
	  }

#ifdef NOT_DEFINED
	if(sdu->class == C_TIMER)
		fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Timer Type %d", sdu->type);
#endif // NOT_DEFINED

	cdesc->expect_sn = 0;
	cdesc->n_retry = 0;
	flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);

	msg = build_svc_abort(code);

	syslog(LOG_DEBUG, "PROTO_TASK: TX EVENT %d - CONN [%d] State %d Class %d Type %d", NEvent++,
		  cdesc->index, cdesc->state, msg->class, msg->type);

	tdata.task = PROTOCOL_TASK;
	tdata.type = T_RELEASE_TIMER;
	tdata.period = RELEASE_TIMER_PERIOD;
	cdesc->active_timer = TIMER_START(cdesc, &tdata);

	SendTxMsg(cdesc, msg);

	return;
}


//
// Begin termination of a socket connection and all the associated infrastructure.
//
ROUTINE PRIVATE void start_conn_release(CONN_DESC *cdesc)
{
	ASYNC_DATA parm;
	//
	// Do protocol book keeping.
	//
	cdesc->state = S_DISMANTLE;
	cdesc->xfer_state = X_IDLE;
#ifdef USE_SCRAM
	cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
	cdesc->release_state = RS_POLL_FREE;

	cdesc->db_op = FALSE;
	cdesc->defer_sdu_op = T_RAW_SDU;

	if(cdesc->active_timer)
	  {
		TIMER_STOP(cdesc->active_timer);
		cdesc->active_timer = NULL;
	  }

	cdesc->expect_sn = 0;
	cdesc->n_retry = 0;
	flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);
//
// printf("ASYNC[%d]: START POLL TERMINATION\n", cdesc->index);
// fflush(stdout);

	//
	// Terminate polling and release the poll handle. This is done by sending
	// message to the IO_Task(). It responds with a T_DISMATLE_RSP when everything is done.
	//
	bzero((void *) &parm, sizeof(ASYNC_DATA));
	parm.type = T_POLL_DISMANTLE_REQ;
	parm.async_handle = &IO_Task_Async_Handle;
	parm.async_q = &IO_Task_Input_Q;
	parm.object_handle = (uv_handle_t *) cdesc->poll_handle;

if(parm.object_handle == NULL)
{
	fprintf(stderr, "BUGGG 1\n");
	abort();
}

	send_async_msg(cdesc, &parm);

	return;
}

ROUTINE void init_rx_fsm(CONN_DESC *cdesc, int fd)
{
	int index = cdesc->index;

	bzero(cdesc, sizeof(CONN_DESC));

ENTER_MUTEX(&Service_Q_Mutex);

	cdesc->index = index;
	cdesc->state = S_IDLE;
	cdesc->xfer_state = X_IDLE;
	cdesc->release_state = RS_IDLE;
	cdesc->fd = fd;
	cdesc->decoder.buf_ptr = cdesc->decoder.buffer;

EXIT_MUTEX(&Service_Q_Mutex);

	return;
}

Reply via email to