Hi:
The most common symptom of the problem is that uv_poll_init() fails
which means the problem must
be related to the uv_poll_t Poll_Loop. The relevant code segment is as
follows.
poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));
if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling
Initialization Error %d, %s",
r, uv_err_name(r));
}
The following files illustrate the problem and are attached to this
message. Please advise me
how to attach files to the list directly.
* trace_badf.txt: A stack trace when the error code is EBADF.
* trace_eexist.txt: A stack trace when the error code is EEXIST.
* network_io.c: The IO_Task() thread code. The problem occurs in the
poll_proxy() routine.
* main.c: The main() process/thread code where most initialization occurs.
* transmit.c: The Transmit_Task() thread.
* rx_fsm.c: The heart of the protocol task code.
You are right that only the IO_Task() performs polling related operations.
However, please note that the Transmit_Task() thread uses the underlying
TCP socket descriptor
to send data to the network with Linux send(). (See the TX_DATA()
routine.) Also,
USE_SSL and USE_SCRAM are not defined.
I will send more information if am able to produce the problem at
another point.
Also, I have seen the issue you mentioned before and suspect it may be
relevant.
Please let me know if you need anything else.
Best Regards,
Paul R.
Best Regards,
Paul R.
On 04/28/2021 08:52 PM, Jameson Nash wrote:
I'm assuming from your description that you've ensured all libuv calls
happen from exactly one thread (except uv_async_send). It sounds like
you're describing the issue that
https://github.com/libuv/libuv/commit/c9406ba0e3d67907c1973a71968b89a6bd83c63c
was intended to fix, which was included in v1.41.0
Note that the Poll_Loop.watchers[N] is not always required to be NULL,
so perhaps you've found a case where it is still incorrectly expecting
that? You might want to file an issue with code to reproduce the
problem, or at least an `rr` trace dump of the code that hits problem
(note that `rr` could potentially upload all of your hard drive, so be
careful what you are submitting).
On Wed, Apr 28, 2021 at 11:13 PM pa...@rcom-software.com
<mailto:pa...@rcom-software.com> <pa...@rcom-software.com
<mailto:pa...@rcom-software.com>> wrote:
Hi Folks:
I am experiencing an intermittent problem with uv_poll_init()
after the successful establishment and release
of multiple concurrent TCP connections. I am not sure if this
problem is due to a bug, which may
be corrected in another Libuv release, or if I am doing something
wrong when releasing the poll handle.
Do you have insight about the cause ? The details I consider most
important follow.
The uv_poll_t callback function reads incoming TCP data as follows.
iff(events & UV_READABLE)
{
CONN_DESC *cdesc = (CONN_DESC *) poll_handle->data;
n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE, MSG_DONTWAIT);
NOTE: I am using Libuv version 1.41.0 running on Ubuntu Linux
version 15.04.
The problem is that uv_poll_init() fails, normally with the EEXIST
or EBADF error code, and
my investigation indicates the uv_loop_t Poll_Loop.watchers[N]
field is not NULL when it should be,
where N is TCP socket descriptor. It occurs immediately after the
uv_poll_t poll_handle is allocated.
(There is exactly one TCP socket descriptor per poll handle.) The
call to uv_poll_init() is as follows
and the socket descriptor is obtained with uv_fileno().
if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling
Initialization Error %d, %s",
r, uv_err_name(r));
}
It occurs in the IO_Task() thread when there multiple TCP socket
descriptors are in use. The IO_task
releases the poll_handle with the following code sequence when it
is notified that polling should stop
via a Libuv async. message from the Protocol_Task() thread.
if( (r = uv_poll_stop(poll_handle)) )
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Poll Stop
Error %d, %s",
r, uv_err_name(r) );
}
poll_handle->data = (void *) cdesc;
uv_close((uv_handle_t *) poll_handle, async_close_callback);
The actual release occurs in async_close_callback() as follows and
the Protocol_Task() releases the
TCP socket decriptor with a normal Linux close() after it receives
the T_DISMANTLE response message.
VFREE((UCHAR *) handle);
//
// Send a notification message to the Protocol_Task.
//
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISMANTLE_RSP;
msg->info = 0;
SendProtoMsg(cdesc, msg);
I think the underlying cause is that if there is new TCP
connection that uses a the same TCP socket descriptor
as one released with uv_poll_stop() and uv_close(), the call to
uv_poll_init() occurs before the socket closure
has propogated into the uv_loop_t Poll_Handle.
Best Regards,
Paul Romero
--
You received this message because you are subscribed to the Google
Groups "libuv" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to libuv+unsubscr...@googlegroups.com
<mailto:libuv+unsubscr...@googlegroups.com>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/libuv/c3290a76-ab6b-42ad-8540-33021c6188b9n%40googlegroups.com
<https://groups.google.com/d/msgid/libuv/c3290a76-ab6b-42ad-8540-33021c6188b9n%40googlegroups.com?utm_medium=email&utm_source=footer>.
--
You received this message because you are subscribed to a topic in the
Google Groups "libuv" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/libuv/_4ClQoaVPCg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
libuv+unsubscr...@googlegroups.com
<mailto:libuv+unsubscr...@googlegroups.com>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/libuv/CADnnjUXSsF7QwRnVqFojAm1W_o35CHKyMCPM%2BVw0FM0FjqN1XQ%40mail.gmail.com
<https://groups.google.com/d/msgid/libuv/CADnnjUXSsF7QwRnVqFojAm1W_o35CHKyMCPM%2BVw0FM0FjqN1XQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.
--
Paul Romero
-----------
RCOM Communications Software
EMAIL: pa...@rcom-software.com
PHONE: (510)482-2769
--
You received this message because you are subscribed to the Google Groups
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to libuv+unsubscr...@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/libuv/608ADA21.3000304%40rcom-software.com.
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE
//
// *************************************************************************************
// ************************************ DATA *******************************************
// *************************************************************************************
//
//
// **************** Data Shared by multple Tasks ******************************
//
//
// Protects Connect_Task_Input_Q and IO_Task_Input_Q.
//
uv_mutex_t Async_Mutex;
#ifdef TEST_UV_STABILITY
//
// Protects internal Libuv data from async. callbacks.
//
uv_mutex_t UV_Data_Mutex;
#endif // TEST_UV_STABILITY
//
// Protects protocol data shared by or accessible to multiple tasks.
//
uv_mutex_t Protocol_D_Mutex;
int N_Tasks; // The number of initialized thread tasks.
int N_Sockets; // The current number of connections accepted.
//
// A special purpose per connection Mutex that protects the binary seqment queue which
// is accessible to the Protocol_Task() and DB_Task().
//
uv_mutex_t DataXfer_Mutex;
//
// Connection Descriptor Table.
//------------------------------------
// Using a statically allocated table like this is only appropriate for
// an implementation with a small maximum number of connections.
//
// TBD: In the case of large maximum number of connections this table
// should be replaced by one that holds pointers to dynamically allocated
// connection descriptors.
//
CONN_DESC Conn_Desc_Table[MAX_CONN_DESC];
//
// **************************** main() Process/Task Data *********************
//
//
// Loop for detecting incoming connections,
//
PRIVATE uv_loop_t Connect_Loop;
//
// TRUE when the connection accept callback is executing.
//
PRIVATE BOOL Connect_Accept_Busy = FALSE;
//
// TRUE when the connection plumbing proxy is executing.
//
PRIVATE BOOL Connect_Proxy_Busy = FALSE;
//
// The main process async. message queue.
//
MSG_FIFO Connect_Task_Input_Q;
//
// The main process async. operation channel.
//
uv_async_t Connect_Task_Async_Handle;
//
// **************************** IO_Task() Data *****************************
//
//
// The IO_Task handles epoll() read I/O event with a poll loop of type uv_poll_t.
//
void IO_Task(void *arg);
uv_thread_t IO_Task_Handle;
//
// IO_Task() async. opertion handle.
//
uv_async_t IO_Task_Async_Handle;
//
// IO_Task() asyc. message queue.
//
MSG_FIFO IO_Task_Input_Q;
//
// **************************** DB_Task() Data *************************
//
//
// The DB_Task performs slow database insertions.
//
void DB_Task(void *);
uv_thread_t DB_Task_Handle;
//
// ***************************** Timer_Task() Data **********************
//
//
// The Time_Task() implements protocol timers and handlles ticks every 1/10 of second in the Timer_Loop.
//
void Timer_Task(void *);
uv_thread_t Timer_Task_Handle;
//
// ************************* Trasnsit_Task() Data ******************************
//
//
// The tranmission task handle.
//
void Transmit_Task(void *);
uv_thread_t Send_Task_Handle;
//
// ************************* Protocol_Task() Data *****************************
//
//
// The protocol task handles the arrival of messages on the Service Queue.
//
void Protocol_Task(void *arg);
uv_thread_t Protocol_Task_Handle;
//
// The Service Queue is doubly linked Circular queue containing
// only Connection Descriptors with SDUs on their Task Input Queue.
//
CONN_DESC *Service_Q; // The head of the queue.
//
// Protects the Service_Q and its condition variable.
//
uv_mutex_t Service_Q_Mutex;
//
// Service_Q message queue condition variable.
//
uv_cond_t Service_Q_Cond;
//
// ******************************************************************************************
// ********************************** PROTOTYPES *********************************************
// ******************************************************************************************
//
void send_async_msg(CONN_DESC *, const ASYNC_DATA *);
void async_close_callback(uv_handle_t *);
PRIVATE void make_incoming_connection(uv_stream_t *, int ); // This is a callback routine.
PRIVATE void connect_proxy(uv_async_t * ); // This callback routine executes in the main() process.
PRIVATE void forced_close_callback(uv_handle_t *);
char *host_to_ip(const char *, char [], int );
#ifdef USE_SSL
SSL *Ssl_Accept(CONN_DESC *, int );
void Ssl_ReleaseConnect(CONN_DESC *, uv_mutex_t * );
void Ssl_Init();
#ifdef USE_CRYPTO_LOCK
ROUTINE void Ssl_InitLock(int );
#endif // USE_CRYPTO_LOCK
#endif // USE_SSL
void sig_hup_callback(uv_signal_t *, int );
void sig_term_callback(uv_signal_t *, int );
void sig_kill_callback(uv_signal_t *, int );
MSG *remove_msg(MSG_FIFO *);
CONN_DESC *ALLOC_CONN_DESC(int );
BOOL DELETE_CONN(CONN_DESC *);
void MSG_FREE(MSG *);
UCHAR *VALLOC(int n);
void VFREE(UCHAR *);
void VMEM_INIT();
#ifdef LIBUV_DETACH
int uv_thread_detached_create(uv_thread_t *, void (* )(void * ), void * );
#else // LIBUV_DETACH
int uv_thread_detach(uv_thread_t *);
#endif // LIBUV_DETACH
void fatal_error(int , const char *, ...);
int main()
{
uv_tcp_t listen_handle;
uv_signal_t sig_term_handle, sig_hup_handle, sig_int_handle;
struct sockaddr_in addr;
int r, k;
//
// Open the debug log.
//
openlog("pexd", LOG_PID, LOG_LOCAL1);
syslog(LOG_ALERT, "PEXD DAEMON: VERSION %s STARTING", VERSION);
syslog(LOG_INFO, "DAEMON: Initializing System");
//
// Initalize memory managment.
//
VMEM_INIT();
//
// Initilize global data.
//
N_Tasks = 0;
N_Sockets = 0;
bzero((void *) Conn_Desc_Table, sizeof(Conn_Desc_Table));
for(k = 0; k < MAX_CONN_DESC; k++)
{
Conn_Desc_Table[k].index = k;
Conn_Desc_Table[k].fd = -1;
Conn_Desc_Table[k].decoder.buf_ptr = Conn_Desc_Table[k].decoder.buffer;
}
//
// Intialize the Libuv async. channel Mutex.
//
uv_mutex_init(&Async_Mutex);
#ifdef TEST_UV_STABILITY
//
// Intialize the Libuv internal data Mutex.
//
uv_mutex_init(&UV_Data_Mutex);
#endif // TEST_UV_STABILITY
//
// Initialize the Service_Q, used in the Protocol_Task() and
// its Mutex and condition variable.
//
Service_Q = NULL;
uv_mutex_init(&Service_Q_Mutex);
uv_cond_init(&Service_Q_Cond);
#ifdef USE_SSL
//
// Initialize SSL connection infrastructure.
//
Ssl_Init();
#ifdef USE_CRYPTO_LOCK
//
// Protect internal crypto. operations.
//
Ssl_InitLock(MAIN_TASK);
#endif // USE_CRYPTO_LOCK
#endif // USE_SSL
//
// Initialize the protocol data DataXfer Mutexes.
//
uv_mutex_init(&Protocol_D_Mutex);
uv_mutex_init(&DataXfer_Mutex);
//
// Launch the Timer_Task(), IO_Task(), DB_Task(), Protocol_Task(), and Transmit_Task().
//
// TBD: When a newer version of Libuv is available use uv_thread_detach() instead
// of uv_thread_detached_create(). Note that uv_thread_detached_create() is a custom
// routine I added to the src/unix/thread.c code and not part of the official Libuv
// package.
//
syslog(LOG_INFO, "DAEMON: Spawning Tasks");
#ifdef LIBUV_DETACH
uv_thread_detached_create(&DB_Task_Handle, DB_Task, NULL);
uv_thread_detached_create(&Protocol_Task_Handle, Protocol_Task, NULL);
uv_thread_detached_create(&Send_Task_Handle, Transmit_Task, NULL);
uv_thread_detached_create(&Timer_Task_Handle, Timer_Task, NULL);
uv_thread_detached_create(&IO_Task_Handle, IO_Task, NULL);
#else // LIBUV_DETACH
tid = uv_thread_create(&DB_Task_Handle, DB_Task, NULL);
uv_thread_detach(DB_Task_Handle);
tid = uv_thread_create(&Protocol_Task_Handle, Protocol_Task, NULL);
uv_thread_detach(Protocol_Task_Handle);
tid = uv_thread_create(&Send_Task_Handle, Transmit_Task, NULL);
uv_thread_detach(Send_Task_Handle);
tid = uv_thread_create(&Timer_Task_Handle, Timer_Task, NULL);
uv_thread_detach(Timer_Task_Handle);
tid = uv_thread_create(&IO_Task_Handle, IO_Task, NULL);
uv_thread_detach(IO_Task_Handle);
#endif // LIBUV_DETACH
//
// Initialize the Connect_Loop and its data.
//
Connect_Accept_Busy = FALSE;
Connect_Proxy_Busy = FALSE;
Connect_Task_Input_Q.head = Connect_Task_Input_Q.tail = NULL;
uv_loop_init(&Connect_Loop);
uv_async_init(&Connect_Loop, &Connect_Task_Async_Handle, connect_proxy);
//
// Initialize Linux signal handling in this task.
//
uv_signal_init(&Connect_Loop, &sig_term_handle);
uv_signal_start(&sig_term_handle, sig_term_callback, SIGTERM);
uv_signal_init(&Connect_Loop, &sig_hup_handle);
uv_signal_start(&sig_hup_handle, sig_hup_callback, SIGHUP);
uv_signal_init(&Connect_Loop, &sig_int_handle);
uv_signal_start(&sig_int_handle, sig_kill_callback, SIGKILL);
//
// TBD: This signal should be disabled in real life.
//
uv_signal_init(&Connect_Loop, &sig_int_handle);
uv_signal_start(&sig_int_handle, sig_kill_callback, SIGINT);
//
// Initialize the incoming connection handle.
//
uv_tcp_init(&Connect_Loop, &listen_handle);
//
// Bind to a listen address.
//
uv_ip4_addr(SERVER_ADDRESS, SERVER_PORT, &addr);
//
// host_to_ip(SERVER_NAME, ip_addr_buf, sizeof(ip_addr_buf) );
// uv_ip4_addr(ip_addr_buf, SERVER_PORT, &addr);
//
uv_tcp_bind(&listen_handle, (const struct sockaddr *) &addr, 0);
//
// Wait for all the thread tasks to be initialized.
//
for(;;)
{
ENTER_MUTEX(&Protocol_D_Mutex);
if(N_Tasks == 5)
{
EXIT_MUTEX(&Protocol_D_Mutex);
break;
}
EXIT_MUTEX(&Protocol_D_Mutex);
pthread_yield();
}
syslog(LOG_INFO, "DAEMON: All Spawned Tasks Running");
//
// Configure the listen socket.
//
if( (r = uv_listen((uv_stream_t*) &listen_handle, MAX_CONN_DESC, make_incoming_connection)) )
fatal_error(LOG_ERR, "MAIN_TASK: Can't Configure Listening Socket. Error %d: %s", r, uv_err_name(r));
syslog(LOG_ALERT, "PEXD DAEMON: RUNNING");
//
// Wait for a connection.
//
for(;;)
{
r = uv_run(&Connect_Loop, UV_RUN_DEFAULT);
if(r)
{
// fprintf(stderr, "MAIN: Run Error %d: %s\n", uv_err_name(r));
r = 0;
}
}
return(r);
}
#ifdef LIBUV_DETACH
//
// FUNCTION - uv_thread_detach(): Create a detached thread task and put into execution.
//
// ARGUMENTS
// -----------
// tid: Points to memory for holding the Livuv thread ID.
// entry: The thread task.
// arg: Points to data to be passed to the task when it starts.
//
// RETURN VALUE
// --------------
// Returns 0 if successful and a Libuv error code otherwise.
//
// NOTES
// -------
// This routine is kludge which supplies the same functionality as the uv_thread_detach() API.
// It is necessary because some version of Libuv don't include this functionality.
//
ROUTINE int uv_thread_detached_create(uv_thread_t *tid, void (*entry )(void * ), void *arg )
{
int rval;
rval = uv_thread_create(tid, entry, arg);
if(rval != 0)
rval = pthread_detach(*tid);
return(rval);
}
#endif // LIBUV_DETACH
//
// FUNCTION - make_incoming_connection(): A callback routine that executes when an incoming connection event occurs.
//
// ARGUMENTS
// -----------
// listen_handle: The TCP listen socket handle.
// status: The value is 0 when a connection detected without errors.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -----
// The execution of this routine must be preconfigured with Libuv uv_listen().
//
ROUTINE PRIVATE void make_incoming_connection(uv_stream_t *listen_handle, int status)
{
ASYNC_DATA poll_data;
CONN_DESC *cdesc;
uv_tcp_t *conn_handle;
int r;
BOOL reject = FALSE;
if (status == -1)
fatal_error(LOG_ERR, "MAIN_TASK: Invalid Incoming Connection Status");
// printf("NEW_CONN ATTEMPT: N %d\n", N_Sockets);
// fflush(stdout);
//
// If this routine is called before the work from the last invocation is finished,
// we have no choice but to reject the connection.
//
if(Connect_Accept_Busy)
reject = TRUE;
else
Connect_Accept_Busy = TRUE;
//
// Initialize the connection handle.
//
conn_handle = (uv_tcp_t *) VALLOC(sizeof(uv_tcp_t));
uv_tcp_init(&Connect_Loop, conn_handle);
if((r = uv_accept(listen_handle, (uv_stream_t *) conn_handle)) == 0)
{
int nsock;
//
// A new connection occured.
//
uv_os_fd_t fd;
ENTER_MUTEX(&Protocol_D_Mutex);
nsock = N_Sockets++;
EXIT_MUTEX(&Protocol_D_Mutex);
if(reject || nsock >= MAX_CONN_DESC)
{
syslog(LOG_DEBUG, "MAIN_TASK: CONNECTION REJECTION - Transient, N Sockets = %d", nsock);
ENTER_MUTEX(&Protocol_D_Mutex);
N_Sockets--;
EXIT_MUTEX(&Protocol_D_Mutex);
fprintf(stderr, "xxxxxxxxxxxxx REJECTION DONE %d xxxxxxxxxxxxxxxxxx\n", N_Sockets);
uv_close((uv_handle_t *) conn_handle, forced_close_callback);
return;
}
//
// Fetch the socket descriptor from the connection handle.
//
uv_fileno((const uv_handle_t*) conn_handle, &fd);
//
// Allocate the connection descriptor.
//
cdesc = ALLOC_CONN_DESC(fd);
if( !cdesc )
{
syslog(LOG_DEBUG, "MAIN_TASK: CONNECTION REJECTION - No Connection Descriptors, N = %d", nsock);
ENTER_MUTEX(&Protocol_D_Mutex);
N_Sockets--;
close(fd);
EXIT_MUTEX(&Protocol_D_Mutex);
uv_close((uv_handle_t *) conn_handle, forced_close_callback);
return;
}
#ifdef USE_SSL
if( !(cdesc->ssl_socket = Ssl_Accept(cdesc, fd)) )
{
ENTER_MUTEX(&Protocol_D_Mutex);
N_Sockets--;
close(cdesc->fd);
cdesc->fd = -1;
DELETE_CONN(cdesc);
EXIT_MUTEX(&Protocol_D_Mutex);
uv_close((uv_handle_t *) conn_handle, forced_close_callback);
return;
}
#endif // USE_SSL
//
// Save the connection handle and start polling.
//
cdesc->conn_handle = (uv_tcp_t *) conn_handle;
syslog(LOG_INFO, "MAIN_TASK: NEW CONNECTION ESTABLISHED - CONN %d FD %d", cdesc->index, cdesc->fd);
//
// Set up epoll() plumbing by sending a message to IO_Task();
//
bzero((void *) &poll_data, sizeof(ASYNC_DATA));
poll_data.type = T_POLL_CONFIG_REQ;
poll_data.async_handle = &IO_Task_Async_Handle;
poll_data.async_q = &IO_Task_Input_Q;
poll_data.object_handle = NULL;
//
// The T_POLL_CONFIG_RSP message will be sent to the Protocol_Task() which
// is in S_IDLE state.
//
send_async_msg(cdesc, &poll_data);
}
else
fatal_error(LOG_ERR, "MAIN_TASK: CONNECTION REJECTION - Libuv Accept Failure. Error %d: %s", r, uv_err_name(r));
Connect_Accept_Busy = FALSE;
return;
}
//
// FUNCTION - connect_proxy(): Dismantles and releases a Libuv connection handle on behalf of another task.
// The proxy routine executes in the main() process/task bound to the Connect_Loop.
//
// ARGUMENTS
//-----------
// handle: The async. channel handle. (Connect_Task_Async_Handle)
//
// RETURN VALUE
// -----------
// None
//
// NOTES
// --------
// Execution of the routine is triggered by a call to async_send() in the Protocol_Task().
//
ROUTINE PRIVATE void connect_proxy(uv_async_t *handle)
{
CONN_DESC *cdesc;
MSG *msg;
uv_tcp_t *conn_handle;
uv_handle_t **ppdata;
BOOL done;
//
// Avoid clobbering a previous invocation of this routine.
//
if(Connect_Proxy_Busy)
return;
Connect_Proxy_Busy = TRUE;
//
// Handle all messages from the Protocol_Task()
//
done = FALSE;
while(done != TRUE)
{
ENTER_MUTEX(&Async_Mutex);
msg = remove_msg(&Connect_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);
if(msg)
{
cdesc = (CONN_DESC *) msg->conn_desc;
syslog(LOG_DEBUG, "MAIN_TASK(Proxy): CONN[%d] Type %d\n", cdesc->index, msg->type);
switch(msg->type)
{
case T_CONN_DISMANTLE_REQ:
//
// Release a uv_tcp_t connection handle.
// The protocol task is notified by async_close_callback()
// when the operation is complete.
//
ppdata = (uv_handle_t **) &msg->buffer[0];
conn_handle = (uv_tcp_t *) *ppdata;
#ifdef TEST_UV_STABILITY
ENTER_MUTEX(&UV_Data_Mutex);
switch(cdesc->uv_state)
{
case UVO_CLOSED:
cdesc->uv_task = MAIN_TASK;
break;
default:
fprintf(stderr, "Invalid T_CONN_DISMANTLE_REQ. State = %d\n", cdesc->uv_state);
abort();
}
EXIT_MUTEX(&UV_Data_Mutex);
#endif // TEST_UV_STABILITY
conn_handle->data = (void *) cdesc;
uv_close((uv_handle_t *) conn_handle, async_close_callback);
break;
default:
fatal_error(LOG_EMERG, "MAIN_TASK: CONNECT_PROXY - Invalid Message = %d", msg->type);
}
MSG_FREE(msg);
}
else
done = TRUE;
}
Connect_Proxy_Busy = FALSE;
return;
}
//
// FUNCTION - force_close_callback(): Deallocate a Libuv handle.
//
// ARGUMENTS
//-----------
// handle: The handle.
//
// NOTES
// ------
// Releases a uv_tcp_t handle in the case it was allocated before an incoming connection was rejected.
//
ROUTINE PRIVATE void forced_close_callback(uv_handle_t *handle)
{
VFREE((UCHAR *) handle);
// printf("ASYNC[X]: REJECT COMPLETE\n");
// fflush(stdout);
//
// Setting this variable indicates the handle has been freed.
//
Connect_Accept_Busy = FALSE;
return;
}
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE
//
// **********************************************************
// ******************** DATA ***********************************
// **************************************************************
//
//
// This data is used to indicate when the task is initialized.
//
extern int N_Tasks;
extern uv_mutex_t Protocol_D_Mutex;
//
// Protects the IO_Task_Input_Q.
//
extern uv_mutex_t Async_Mutex;
//
// IO_Task() async. opertion handle.
//
extern uv_async_t IO_Task_Async_Handle;
//
// IO_Task() asyc. message queue.
//
extern MSG_FIFO IO_Task_Input_Q;
//
// Loop for handling epoll() read I/O events.
//
PRIVATE uv_loop_t Poll_Loop;
//
// TRUE when the Poll proxy routine is executing.
//
PRIVATE BOOL Poll_Proxy_Busy;
//
// *******************************************************
// *********************** PROTOTYPES *******************
// *****************************************************
//
PRIVATE void poll_callback(uv_poll_t *, int , int );
PRIVATE void poll_proxy(uv_async_t *);
void async_close_callback(uv_handle_t *);
#ifdef USE_SSL
int Ssl_RxData(SSL *, UCHAR *, int );
#endif // USE_SSL
DECODE_RESULT rx_sdu(CONN_DESC *, int);
void SendProtoMsg(CONN_DESC *, MSG *);
MSG *remove_msg(MSG_FIFO *);
MSG *MSG_ALLOC(int, BOOL);
void MSG_FREE(MSG *);
UCHAR *VALLOC(int n);
void VFREE(UCHAR *);
void fatal_error(int , const char *, ...);
//
// Monitor connections for incoming data with epoll()
//
TASK void IO_Task(void *arg)
{
int r;
uv_loop_init(&Poll_Loop);
//
// Initialize communications from the main() process and Protocol_Task() to the IO_Task().
// The main() and Protocol_Task() tasks insert message on the IO_Task_Input_Q just prior
// to invoking uv_async_send().
//
IO_Task_Input_Q.head = IO_Task_Input_Q.tail = NULL;
uv_async_init(&Poll_Loop, &IO_Task_Async_Handle, poll_proxy);
ENTER_MUTEX(&Protocol_D_Mutex);
N_Tasks++;
EXIT_MUTEX(&Protocol_D_Mutex);
syslog(LOG_INFO, "IO_TASK: Started");
for(;;)
{
r = uv_run(&Poll_Loop, UV_RUN_DEFAULT);
if(r)
{
syslog(LOG_ERR, "IO_TASK: Run Error %d", r);
r = 0;
}
}
return;
}
//
// FUNCTION - poll_callback(): A callback routine that executes when incoming data is available.
// and reads the data.
//
// ARGUMENTS
// ---------------
// poll_handle: The Libuv poll handle.
// status: The value is 0 if execution occurs due to incoming data.
// events: The value should always be UV_READABLE, which means incoming
// data is available, if status is 0.
//
// RETURNS
//--------
// None
//
// NOTES
// ------
// This routine is invoked by the Libuv epoll() mechanism.
//
ROUTINE PRIVATE void poll_callback(uv_poll_t *poll_handle, int status, int events)
{
if(status == 0)
{
if(events & UV_READABLE)
{
//
// Since the right connection descriptor is known, all you
// neeed to to is read data into the connection descriptor buffer.
//
int n;
MSG *sdu;
CONN_DESC *cdesc = (CONN_DESC *) poll_handle->data;
#ifdef USE_SSL
n = Ssl_RxData(cdesc->ssl_socket, cdesc->rx_buf, RX_BUF_SIZE);
#else // USE_SSL
n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE, MSG_DONTWAIT);
#endif // USE_SSL
if(n > 0)
{
DECODE_RESULT r;
//
// Recognize the SDU in the usual way.
// If the SDU is complete, send the it to the Protocol_Task().
//
r = rx_sdu(cdesc, n);
switch(r)
{
case RX_PARTIAL:
case RX_COMPLETE:
case RX_EMPTY:
break;
case RX_OVERFLOW:
case RX_NOMEMORY:
//
// There is no choice but to send an abort, reinitialize everything,
// and release the connection.
//
syslog(LOG_ERR, "IO_TASK: Irrecoverable SDU Reception Error %d", r);
sdu = MSG_ALLOC(0, FALSE);
sdu->class = C_NOTIFY;
sdu->type = T_FAULT;
sdu->info = (int) r;
SendProtoMsg(cdesc, sdu);
break;
default:
fatal_error(LOG_EMERG, "IO_TASK: Invalid SDU Recognition Code, %d", r);
}
}
else
if(n < 0)
{
#ifdef USE_SSL
MSG *msg;
n = 0;
syslog(LOG_INFO, "IO_TASK: RX DISCONNECT %d - %s !", errno, strerror(errno) );
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISCONNECT;
msg->info = 0;
SendProtoMsg(cdesc, msg);
#else // USE_SSL
n = 0;
if( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) )
{
MSG *msg;
syslog(LOG_INFO, "IO_TASK: RX DISCONNECT %d - %s !", errno, strerror(errno) );
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISCONNECT;
msg->info = 0;
SendProtoMsg(cdesc, msg);
}
#endif // USE_SSL
}
}
}
else
syslog(LOG_ERR, "IO_TASK: poll_callback - Bad Status %d", status);
return;
}
//
// FUNCTION - poll_proxy(): The proxy routine executes in the IO_Task() Poll Loop when
// it has unserviced async. messages from other tasks.
//
// ARGUMENTS
// -----------
// handle: A Libuv async. operation handle.
//
// RETURNS
// -------
// Nothing
//
// NOTES
// -----
// The main() and Protocol_Tasks() use the async. handled to send messages to the IO_Task().
// They do so by inserting messages in the IO_Task_Input_Q which is in shared memory.
//
ROUTINE PRIVATE void poll_proxy(uv_async_t *handle)
{
CONN_DESC *cdesc;
MSG *msg, *rsp;
uv_poll_t *poll_handle;
uv_handle_t **ppdata;
int r;
BOOL done;
//
// Do nothing if execution is already in progress.
//
if(Poll_Proxy_Busy)
return;
Poll_Proxy_Busy = TRUE;
//
// Handle messages from other tasks.
//
done = FALSE;
while(done != TRUE)
{
ENTER_MUTEX(&Async_Mutex);
msg = remove_msg(&IO_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);
if(msg)
{
cdesc = (CONN_DESC *) msg->conn_desc;
syslog(LOG_DEBUG, "IO_TASK(Proxy): RX EVENT - CONN[%d] Type %d\n", cdesc->index, msg->type);
switch(msg->type)
{
case T_POLL_CONFIG_REQ:
//
// Main process request to start Libuv/epoll() operation.
//
poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));
if( (r = uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd)) != 0)
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s",
r, uv_err_name(r));
}
if( !poll_handle->loop )
{
fprintf(stderr, "BUG: POLL LOOP NOT SET\n");
abort();
}
poll_handle->data = (void *) cdesc;
if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Polling Initiation Error %d, %s",
r, uv_err_name(r));
}
//
// Notify the Protocol_Task(), that polling has started.
//
rsp = MSG_ALLOC(0, FALSE);
rsp->class = C_NOTIFY;
rsp->type = T_POLL_CONFIG_RSP;
rsp->info = TRUE;
rsp->conn_desc = (void *) poll_handle;
SendProtoMsg(cdesc, rsp);
break;
case T_POLL_DISMANTLE_REQ:
//
// Protocol_Task() request to cease Libuv/epoll() operation and
// release the poll handle and its resources.
//
ppdata = (uv_handle_t **) &msg->buffer[0];
poll_handle = (uv_poll_t *) *ppdata;
if( (r = uv_poll_stop(poll_handle)) )
{
fatal_error(LOG_CRIT, "IO_TASK: POLL_PROXY - Poll Stop Error %d, %s",
r, uv_err_name(r) );
}
//
// The callback routine notifies the Protocol_Task() when everything is done.
//
poll_handle->data = (void *) cdesc;
uv_close((uv_handle_t *) poll_handle, async_close_callback);
break;
default:
fatal_error(LOG_EMERG, "IO_TASK: POLL_PROXY - Invalid Message = %d", msg->type);
}
MSG_FREE(msg);
}
else
done = TRUE;
}
Poll_Proxy_Busy = FALSE;
return;
}
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE
#define MAX_RETRY 3
//
// ********************************************************************************
// ******************************** DATA ******************************************
// ********************************************************************************
//
extern int N_Sockets;
extern uv_mutex_t Service_Q_Mutex;
extern uv_mutex_t Protocol_D_Mutex;
extern uv_mutex_t DataXfer_Mutex;
//
// The main process async. operation channel.
//
extern uv_async_t Connect_Task_Async_Handle;
//
// The main() process async. message input queue.
//
extern MSG_FIFO Connect_Task_Input_Q;
//
// IO_Task() async. opertion handle.
//
extern uv_async_t IO_Task_Async_Handle;
//
// IO_Task() async. message input queue.
//
extern MSG_FIFO IO_Task_Input_Q;
//
// *************************************************************************************
// ***************************** PROTOTYPES *********************************************
// *************************************************************************************
//
void rx_fsm(CONN_DESC *, MSG *);
void init_rx_fsm(CONN_DESC *cdesc, int);
void SendTxMsg(CONN_DESC *, MSG *);
void SendDbMsg(MSG *);
PRIVATE void xfer_fsm(CONN_DESC *, MSG *);
#ifdef USE_SCRAM
void scram_fsm(CONN_DESC *, MSG *);
void Scram_Finish(CONN_DESC *);
#endif // USE_SCRAM
void send_response(CONN_DESC * , BOOL , RESULT );
void do_abort(CONN_DESC * , MSG *, RESULT );
void start_conn_release(CONN_DESC *);
#ifdef USE_SSL
void Ssl_ReleaseConnect(CONN_DESC * );
#endif // USE_SSL
void send_async_msg(CONN_DESC *, const ASYNC_DATA *);
BOOL DELETE_CONN(CONN_DESC *);
void insert_msg(MSG_FIFO *, MSG *);
void flush_msg(MSG_FIFO *, uv_mutex_t *);
MSG *build_svc_abort(RESULT );
MSG *build_sdu_response(RESULT , RESULT );
BOOL handle_sys_info(CONN_DESC *, MSG *);
BOOL handle_prof_info(CONN_DESC *, MSG *);
BOOL handle_parm_info(CONN_DESC *, MSG *);
BOOL handle_metric_info(CONN_DESC *, MSG *);
BOOL handle_binary_data(CONN_DESC *, MSG_FIFO *, BOOL );
BOOL PRIVATE handle_db_rsp(CONN_DESC *, MSG *);
RESULT parse_data_xfer(MSG *);
RESULT parse_sdu(MSG *);
TIMER_NODE *TIMER_START(CONN_DESC *, TIMER_DATA *);
void TIMER_STOP(TIMER_NODE *);
BOOL TIMER_CANCELED(MSG *);
MSG *MSG_ALLOC(int, BOOL );
void MSG_FREE(MSG * );
void fatal_error(int , const char *, ...);
//
// Handle protocol messages from all sources.
//
int NEvent = 0; // Only for visual convenience.
//
// FUNCTION - rx_fsm(): Implements the state machine described by the PEX MOBILE/SERVER DATA TRANSFER FRAMEWORK
// protocol specification on a per connection basis. It handles, SDU messages, generated by the IO_Task(), and
// a variety of messages generated by other Daemon Tasks including those for Libuv operational management and
// control.
//
// ARGUMENTS
// -----------
// cdesc: The connection descriptor to which invocation applies.
// sdu: A message.
//
// RETURN VALUE
// --------------
// None
//
// NOTES
// -------
// The term SDU denotes a protocol message receieved from the network and are sent by the Protocol_Task().
// Other kinds of messages handled by this routine are generated by the following tasks: Main Task/Process,
// IO_Task(), DB_Task(), and Timer_Task().
//
extern uv_loop_t Poll_Loop; // KLUDGE
ROUTINE void rx_fsm(CONN_DESC *cdesc, MSG *sdu)
{
ASYNC_DATA parm;
TIMER_DATA tdata;
RESULT r;
BOOL done = FALSE;
// printf("***********************************************************************\n");
syslog(LOG_DEBUG, "PROTO_TASK: RX EVENT %d - CONN[%d] State %d Class %d Type %d", NEvent++,
cdesc->index, cdesc->state, sdu->class, sdu->type);
//
// First, parse the SDU
//
switch(sdu->class)
{
case C_SDU:
switch(sdu->type)
{
case T_SVC_READY:
//
// Nothing to do - No fields.
//
r = R_SUCCESS;
break;
case T_DATA_XFER:
//
// Special case.
//
r = parse_data_xfer(sdu);
if(r != R_SUCCESS)
{
syslog(LOG_ERR, "PROTO_TASK: Corrupt SDU - DATA_XFER %d Error %d", sdu->type, r);
do_abort(cdesc, sdu, r);
MSG_FREE(sdu);
return;
}
break;
default:
//
// For now just send an abort and reinitialize everything
// if the SDU is corrupt.
//
r = parse_sdu(sdu);
if(r != R_SUCCESS)
{
syslog(LOG_ERR, "PROTO_TASK: Corrupt SDU - Type %d Error %d", sdu->type, r);
do_abort(cdesc, sdu, r);
MSG_FREE(sdu);
return;
}
}
break;
case C_TIMER:
//
// Ignore timers that have been cancelled with TIMER_STOP().
//
if(TIMER_CANCELED(sdu))
{
if(cdesc->active_timer)
{
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
}
MSG_FREE(sdu);
return;
}
break;
case C_NOTIFY:
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Event Class %d", sdu->class);
}
ENTER_MUTEX(&Service_Q_Mutex);
if(cdesc->fd < 0)
fatal_error(LOG_EMERG, "PROTO_TASK: No FD Allocated");
EXIT_MUTEX(&Service_Q_Mutex);
switch(cdesc->state)
{
case S_IDLE:
//
// Wait for the IO_Task() to inform you the incoming connection is complete.
// The save the uv_poll_t poll handle used by the IO_Task() for last connection
// dismantlement.
//
switch(sdu->type)
{
case T_POLL_CONFIG_RSP:
#ifdef HANDLE_EE
if(sdu->info == TRUE)
{
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - POLLING CONFIGURATION SUCCESS", cdesc->index);
// printf("ASYNC[%d]: CONNECTION CONFIGUATION COMPLETE\n", cdesc->index);
// fflush(stdout);
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
fprintf(stderr, "KLUDGE B0: Conn %d Fd %d [Fd] %p\n", cdesc->index, cdesc->fd, Poll_Loop.watchers[cdesc->fd]);
cdesc->poll_handle = (uv_poll_t *) sdu->conn_desc;
}
else
{
printf("EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n");
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - POLLING CONFIGURATION FAILURE", cdesc->index);
//
// Shutdown the connection as in the case of
// a positive response to a T_POLL_DISMANTLE_REQ.
// However, there will be no response from the DB_Task() after
// reception of the T_DISMATLE_REQ from the main() Task.
//
cdesc->state = S_DISMANTLE;
// printf("ASYNC[%d]: POLLING TERMINATED\n", cdesc->index);
// fflush(stdout);
// printf("ASYNC[%d]: START CONNECTION TERMINATION\n", cdesc->index);
// fflush(stdout);
cdesc->release_state = RS_CONN_REJECT;
//
// Polling is terminated - Now release the connection.
// This is done in the main() process Connect_Loop by the connect_proxy().
// It responds with a T_DISMANTLE_RSP message when everything is done.
//
bzero((void *) &parm, sizeof(ASYNC_DATA));
parm.type = T_CONN_DISMANTLE_REQ;
parm.async_handle = &Connect_Task_Async_Handle;
parm.async_q = &Connect_Task_Input_Q;
parm.object_handle = (uv_handle_t *) cdesc->conn_handle;
send_async_msg(cdesc, &parm);
}
#endif // HANDLE_EE
break;
case T_SHUTDOWN:
ENTER_MUTEX(&Service_Q_Mutex);
if(N_Sockets <= 0)
{
syslog(LOG_ALERT, "PEXD DAEMON: Service Terminated");
exit(0);
}
EXIT_MUTEX(&Service_Q_Mutex);
break;
default:
syslog(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
}
break;
case S_DISMANTLE:
//
// Connection dismantlement is in progress.
//
// printf("CONN %d: Release State %d\n", cdesc->index, cdesc->release_state);
switch(cdesc->release_state)
{
case RS_POLL_FREE:
switch(sdu->type)
{
case T_DISMANTLE_RSP:
// printf("ASYNC[%d]: POLLING TERMINATED\n", cdesc->index);
// fflush(stdout);
// printf("ASYNC[%d]: START CONNECTION TERMINATION\n", cdesc->index);
// fflush(stdout);
fprintf(stderr, "KLUDGE B1: Conn %d Fd %d [Fd] %p\n", cdesc->index, cdesc->fd, Poll_Loop.watchers[cdesc->fd]);
cdesc->release_state = RS_CONN_FREE;
//
// Polling is terminated - Now release the connection.
// This is done in the main() process Connect_Loop by the connect_proxy().
// It responds with a T_DISMANTLE_RSP message when everything is done.
//
bzero((void *) &parm, sizeof(ASYNC_DATA));
parm.type = T_CONN_DISMANTLE_REQ;
parm.async_handle = &Connect_Task_Async_Handle;
parm.async_q = &Connect_Task_Input_Q;
parm.object_handle = (uv_handle_t *) cdesc->conn_handle;
send_async_msg(cdesc, &parm);
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_SVC_ABORT:
case T_DISCONNECT:
case T_FAULT:
//
// Ignore these messages as connection release is already in progress.
//
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
}
break;
case RS_CONN_FREE:
switch(sdu->type)
{
case T_DISMANTLE_RSP:
// printf("ASYNC[%d]: CONNECTION TERMINATED\n", cdesc->index);
// fflush(stdout);
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - CONNECTION RELEASE COMPLETE", cdesc->index);
//
// Notify the Database Task and wait for a response.
// The final release of resources will occur after
// the Database Task responds.
//
if(cdesc->service_down != TRUE)
{
MSG *msg;
fprintf(stderr, "KLUDGE B2: Conn %d Fd %d [Fd] %p\n", cdesc->index, cdesc->fd, Poll_Loop.watchers[cdesc->fd]);
cdesc->service_down = TRUE;
msg = MSG_ALLOC(0, FALSE);
msg->class = C_DB;
msg->type = T_DB_SVC_DOWN;
msg->conn_desc = (void *) cdesc;
msg->info = 0;
SendDbMsg(msg);
break;
}
case T_DB_RESULT:
//
// Final release of resources.
//
if(cdesc->service_down)
cdesc->service_down = FALSE;
else
fatal_error(LOG_EMERG, "PROTO_TASK: Premature Service Down Response");
fprintf(stderr, "KLUDGE B3: Conn %d Fd %d [Fd] %p\n", cdesc->index, cdesc->fd, Poll_Loop.watchers[cdesc->fd]);
ENTER_MUTEX(&Protocol_D_Mutex);
ENTER_MUTEX(&Service_Q_Mutex);
cdesc->state = S_IDLE;
cdesc->release_state = RS_IDLE;
done = cdesc->shutdown;
//
// The connection and socket have been released.
// Free the connection descriptor and remove it
// from the Service Queue.
//
if(N_Sockets > 0)
N_Sockets--;
DELETE_CONN(cdesc);
#ifdef USE_SSL
Ssl_ReleaseConnect(cdesc);
#endif // USE_SSL
flush_msg(&cdesc->task_input_q, &DataXfer_Mutex);
EXIT_MUTEX(&Service_Q_Mutex);
//
// TBD: LIVUB may have already closed the fd.
//
close(cdesc->fd);
cdesc->fd = -1;
if(done && N_Sockets <= 0)
{
syslog(LOG_ALERT, "PEXD DAEMON: Service Terminated");
exit(0);
}
EXIT_MUTEX(&Protocol_D_Mutex);
// printf("BYE !\n");
// fflush(stdout);
fprintf(stderr, "KLUDGE B4: Conn %d Fd %d [Fd] %p\n", cdesc->index, cdesc->fd, Poll_Loop.watchers[cdesc->fd]);
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_SVC_ABORT:
case T_DISCONNECT:
case T_FAULT:
//
// Ignore these messages as connection release is already in progress.
//
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
}
break;
#ifdef HANDLE_EE
case RS_CONN_REJECT:
switch(sdu->type)
{
case T_DISMANTLE_RSP:
// printf("ASYNC[%d]: CONNECTION TERMINATED\n", cdesc->index);
// fflush(stdout);
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - CONNECTION RELEASE EARLY", cdesc->index);
//
//
// Final release of resources.
//
if(cdesc->service_down)
cdesc->service_down = FALSE;
ENTER_MUTEX(&Protocol_D_Mutex);
ENTER_MUTEX(&Service_Q_Mutex);
cdesc->state = S_IDLE;
cdesc->release_state = RS_IDLE;
done = cdesc->shutdown;
//
// The connection and socket have been released.
// Free the connection descriptor and remove it
// from the Service Queue.
//
if(N_Sockets > 0)
N_Sockets--;
DELETE_CONN(cdesc);
#ifdef USE_SSL
Ssl_ReleaseConnect(cdesc);
#endif // USE_SSL
flush_msg(&cdesc->task_input_q, &DataXfer_Mutex);
EXIT_MUTEX(&Service_Q_Mutex);
//
// TBD: LIVUB may have already closed the fd.
//
close(cdesc->fd);
cdesc->fd = -1;
if(done && N_Sockets <= 0)
{
syslog(LOG_ALERT, "PEXD DAEMON: Service Terminated");
exit(0);
}
EXIT_MUTEX(&Protocol_D_Mutex);
printf("BYE EEXIST !\n");
fflush(stdout);
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_SVC_ABORT:
case T_DISCONNECT:
case T_FAULT:
//
// Ignore these messages as connection release is already in progress.
//
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
}
break;
#endif // HANDLE_EE
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Release State %d", cdesc->release_state);
}
break;
case S_STANDBY:
syslog(LOG_DEBUG, "PROTO_TASK: CONN[%d] - Release State %d", cdesc->index, cdesc->release_state);
switch(sdu->type)
{
case T_SVC_READY:
//
// Acknowledge the SDU with positive SDU_RESPONSE.
//
#ifdef USE_SCRAM
cdesc->state = S_AUTHENTICATE;
#else // USE_SCRAM
cdesc->state = S_ACTIVE;
#endif // USE_SCRAM
send_response(cdesc, TRUE, 0);
break;
case T_SVC_ABORT:
//
// Terminate service and reintialize if the timer is not running
// and cancel the RELEASE_TIMER if it is running.
// Otherwise, ignore this message as it is a message in flight.
//
if( !cdesc->active_timer )
{
start_conn_release(cdesc);
}
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_DISCONNECT:
case T_FAULT:
start_conn_release(cdesc);
break;
case T_RELEASE_TIMER:
//
// Terminate service and release the TCP connection.
//
// printf("Final Timeout\n");
// fflush(stdout);
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
start_conn_release(cdesc);
break;
default:
//
// Protocol Error: Send an abort and reinitialize if the timer
// is not running and o start the T_RELEASE timer in this case.
// Otherwise, ignore this message as it is a message in flight.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->active_timer)
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
#ifdef USE_SCRAM
case S_AUTHENTICATE:
switch(sdu->type)
{
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
Scram_Finish(cdesc);
start_conn_release(cdesc);
break;
default:
scram_fsm(cdesc, sdu);
sdu = NULL;
}
break;
#endif // USE_SCRAM
case S_ACTIVE:
switch(sdu->type)
{
case T_SYS_INFO:
//
// Acknowledge the SDU with a positive SDU_RESPONSE
// if the system ID information is valid. Otherwise,
// send a negative SDU_RESPONSE and return to STANDBY state.
// Start the T_RELEASE timer if the response is negative.
//
// The first step is start a DB operation and wait for the result.
//
if(handle_sys_info(cdesc, sdu) != TRUE)
{
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_FAILURE);
}
sdu = NULL;
break;
case T_DB_RESULT:
// printf("SYNC[%d] - SYS_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
if(handle_db_rsp(cdesc, sdu))
{
cdesc->state = S_VERIFIED;
send_response(cdesc, TRUE, 0);
}
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
// if no DB operation is in progress.
//
if(cdesc->db_op)
cdesc->defer_sdu_op = sdu->type;
else
{
start_conn_release(cdesc);
}
break;
default:
//
// Protocol Error: Send an abort and reinitialize
// if no DB operation is in progress.
// You need to start the T_RELEASE timer in this case.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->db_op)
cdesc->defer_sdu_op = T_UNKNOWN;
else
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
case S_VERIFIED:
switch(sdu->type)
{
case T_PROF_INFO:
//
//
// Acknowledge the SDU with positive SDU_RESPONSE
// if the subject profile information is valid. Otherwise,
// send a negative SDU_RESPONSE and return to STANDBY state.
// Start the T_RELEASE timer if the response is negative.
//
//
// The first step is start a DB operation and wait for the result.
//
if(handle_prof_info(cdesc, sdu) != TRUE)
{
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_FAILURE);
}
sdu = NULL;
break;
case T_DB_RESULT:
// printf("SYNC[%d] - PROF_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
if(handle_db_rsp(cdesc, sdu))
{
cdesc->state = S_PROFILED;
send_response(cdesc, TRUE, 0);
}
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
// if no DB operation is in progress.
//
if(cdesc->db_op)
cdesc->defer_sdu_op = sdu->type;
else
{
start_conn_release(cdesc);
}
break;
default:
//
// Protocol Error: Send an abort and reinitialize
// if no DB operation is in progress.
// You need to start the T_RELEASE timer in this case.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->db_op)
cdesc->defer_sdu_op = T_UNKNOWN;
else
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
case S_PROFILED:
switch(sdu->type)
{
case T_PARM_INFO:
//
//
// Acknowledge the SDU with positive SDU_RESPONSE
// if the session parameter information is valid. Otherwise,
// send a negative SDU_RESPONSE and return to STANDBY state.
// Start the T_RELEASE timer if the response is negative.
//
// The first step is start a DB operation and wait for the result.
//
if(handle_parm_info(cdesc, sdu) != TRUE)
{
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_FAILURE);
}
sdu = NULL;
break;
case T_DB_RESULT:
// printf("SYNC[%d] - PARM_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
if(handle_db_rsp(cdesc, sdu))
{
cdesc->state = S_CONFIGURED;
send_response(cdesc, TRUE, 0);
}
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
// if no DB operation is in progress.
//
if(cdesc->db_op)
cdesc->defer_sdu_op = sdu->type;
else
{
start_conn_release(cdesc);
}
break;
default:
//
// Protocol Error: Send an abort and reinitialize
// if no DB operation is in progress.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->db_op)
cdesc->defer_sdu_op = T_UNKNOWN;
else
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
case S_CONFIGURED:
switch(sdu->type)
{
case T_METRIC_INFO:
//
//
// Acknowledge the SDU with positive SDU_RESPONSE
// if the session metric data is valid. Otherwise,
// send a negative SDU_RESPONSE and return to STANDBY state.
// Start the T_RELEASE timer if the response is negative.
//
// The first step is start a DB operation and wait for the result.
//
if(handle_metric_info(cdesc, sdu) != TRUE)
{
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_FAILURE);
}
sdu = NULL;
break;
case T_DB_RESULT:
// printf("SYNC[%d] - METRIC_INFO: Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
if(handle_db_rsp(cdesc, sdu))
{
cdesc->state = S_READY;
send_response(cdesc, TRUE, 0);
}
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
// if no DB operation is in progress.
//
if(cdesc->db_op)
cdesc->defer_sdu_op = sdu->type;
else
{
start_conn_release(cdesc);
}
break;
default:
//
// Protocol Error: Send an abort and reinitialize
// if no DB operation is in progress.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->db_op)
cdesc->defer_sdu_op = T_UNKNOWN;
else
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
case S_READY:
//
// We can either begin or finish dataset transfer in this state.
//
// If the T_SDU timer is running cancel it.
//
// IMPORTANT: Set sdu to NULL if you queue it or free it in this state.
//
if(cdesc->active_timer)
{
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
}
switch(sdu->type)
{
case T_SVC_DONE:
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - SESSION COMPLETE", cdesc->index);
//
// All transactions succceeded - Send a positive SDU_RESPONSE and start the SDU timer.
//
cdesc->state = S_STANDBY;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
if(cdesc->final_dataset)
cdesc->final_dataset = FALSE;
cdesc->xfer_state = X_IDLE;
cdesc->n_retry = 0;
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, TRUE, 0);
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
//
start_conn_release(cdesc);
break;
case T_DATA_XFER:
//
// Start dataset transfer.
//
cdesc->state = S_TRANSFER;
xfer_fsm(cdesc, sdu);
sdu = NULL;
break;
case T_SDU_TIMER:
//
// Mobile didn't respond to a retry SDU_RESPONSE
// sent to the Mobile. You can ask for retranmission again or
// send an abort. Manage N_Retry in both cases.
// You should only request retransmission if N_Retry < MAX_RETRY
//
// (We didn't see a new DATA_XFER.)
//
if(cdesc->n_retry++ < MAX_RETRY)
{
cdesc->state = S_READY;
cdesc->xfer_state = X_IDLE;
tdata.task = PROTOCOL_TASK;
tdata.type = T_SDU_TIMER;
tdata.period = SDU_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_RETRY);
}
else
do_abort(cdesc, sdu, R_SEQUENCE);
break;
default:
//
// Protocol Error: Send an abort and reinitialize.
// You need to start the T_RELEASE timer in this case.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
case S_TRANSFER:
//
// Cancel the inactivity timer if it is running.
//
//
// IMPORTANT: Set sdu to NULL if you queue it or free it in this state.
//
switch(sdu->type)
{
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
//
// Terminate service and reintialize
// if no DB operation is in progress.
//
if(cdesc->db_op)
cdesc->defer_sdu_op = sdu->type;
else
{
start_conn_release(cdesc);
}
break;
case T_DATA_XFER:
if(cdesc->active_timer)
{
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
}
xfer_fsm(cdesc, sdu);
sdu = NULL;
break;
case T_DB_RESULT:
// printf("SYNC[%d] - XFER_DATA(Final Result %d\n", cdesc->index, sdu->info);
// fflush(stdout);
cdesc->db_op = FALSE;
switch(cdesc->defer_sdu_op)
{
case T_RAW_SDU:
// printf("SYNC[%d] - XFER_DONE\n", cdesc->index);
// fflush(stdout);
cdesc->state = S_READY;
if(sdu->info == 1)
send_response(cdesc, TRUE, 0);
else
send_response(cdesc, FALSE, sdu->info);
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
start_conn_release(cdesc);
break;
case T_INACTIVITY_TIMER:
do_abort(cdesc, sdu, R_INACTIVITY);
break;
case T_UNKNOWN:
do_abort(cdesc, sdu, R_PROTOCOL);
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Deferred Operation %d",
cdesc->defer_sdu_op);
}
break;
case T_INACTIVITY_TIMER:
//
// The Mobile appears to be dead. There
// is no choice but to send and abort and
// and start the T_RELEASE timer if no DB
// operation is in progress.
//
// Otherwise, the DB operation is hung.
//
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
if(cdesc->db_op)
cdesc->defer_sdu_op = T_INACTIVITY_TIMER;
else
do_abort(cdesc, sdu, R_INACTIVITY);
break;
default:
//
// Protocol Error: Send an abort and reinitialize.
// You need to start the T_RELEASE timer in this case.
//
syslog(LOG_ERR, "PROTO_TASK: Invalid SDU Type %d", sdu->type);
if(cdesc->db_op)
cdesc->defer_sdu_op = T_UNKNOWN;
else
do_abort(cdesc, sdu, R_PROTOCOL);
}
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Transaction State %d", cdesc->state);
}
if(sdu)
MSG_FREE(sdu);
// printf("FINISHED: Final State %d\n\n", cdesc->state);
return;
}
typedef enum data_xfer_action {
A_NEXT = 0,
A_FINAL = 1,
A_REJECT = 2,
A_RETRY = 3,
A_ABORT = 4,
A_IGNORE = 5,
A_UNDEFINED = 6
} DATA_XFER_ACTION;
//
// FUNCTION - xfer_fsm(): Handles DATA_XFER SDU segments.
//
// ARGUMENTS
// -----------
// cdesc: A connection decriptor.
// sdu: A DATA_XFER SDU or segment of one.
//
// RETURN VALUE
// --------------
// None.
//
// NOTES
// -------
// None
//
ROUTINE PRIVATE void xfer_fsm(CONN_DESC *cdesc, MSG *sdu)
{
DATA_XFER_SDU *body = (DATA_XFER_SDU *) sdu->body;
TIMER_DATA tdata;
DATA_XFER_ACTION action;
syslog(LOG_DEBUG, "PROTO_TASK: RX SEG - State %d Type %d Segment %d",
cdesc->xfer_state, sdu->type, body->segment);
if(sdu->type != T_DATA_XFER)
{
//
// This is a protocol error because only DATA_XFER SDUs are valid
// in this state machine. Send and abort and transition to STANDBY state.
// You need to start the T_RELEASE timer in this case.
//
do_abort(cdesc, sdu, R_PROTOCOL);
return;
}
action = A_UNDEFINED;
switch(cdesc->xfer_state)
{
case X_IDLE:
if(body->segment & SEG_START)
{
// printf("N_SAMPLES = %d\n", body->nsample);
cdesc->expect_sn = 0;
cdesc->n_retry = 0;
if(body->data_type == FVC_INSPIRATORY)
cdesc->final_dataset = TRUE;
else
cdesc->final_dataset = FALSE;
if(body->segment & SEG_FINAL)
{
//
// This is a unitary segment. Acknowledge it if it valid.
//
action = A_FINAL;
}
else
{
//
// Just wait for another segment.
//
action = A_NEXT;
cdesc->xfer_state = X_START;
}
}
else
if(cdesc->n_retry > 0)
{
//
// This must be a message in flight. Discard it, and
// wait for the 1st DATA_XFER segment.
//
action = A_IGNORE;
}
else
{
//
// This is a protocol error because only initial DATA_XFER SDU segments are
// valid in this state.
//
action = A_ABORT;
}
break;
case X_START:
if(body->segment & SEG_MID)
{
if(body->sn == cdesc->expect_sn)
{
//
// Just wait for another segment.
//
action = A_NEXT;
cdesc->xfer_state = X_INTERMEDIATE;
}
else
if(cdesc->n_retry++ < MAX_RETRY)
{
//
// Invalid sequence number - Request retransmission.
//
action = A_RETRY;
cdesc->xfer_state = X_IDLE;
}
else
{
//
// Invalid sequence number - Reject the DATA_XFER SDU.
//
action = A_REJECT;
cdesc->xfer_state = X_IDLE;
}
}
else
if(body->segment & SEG_FINAL)
{
if(body->sn == cdesc->expect_sn)
{
//
// The DATA_XFER SDU is complete. Acknowedge it if it is valid.
//
action = A_FINAL;
cdesc->xfer_state = X_IDLE;
}
else
if(cdesc->n_retry++ < MAX_RETRY)
{
//
// Invalid sequence number - Request retransmission.
//
action = A_RETRY;
cdesc->xfer_state = X_IDLE;
}
else
{
//
// Invalid sequence number - Reject the DATA_XFER SDU.
//
action = A_REJECT;
cdesc->xfer_state = X_IDLE;
}
}
else
{
//
// This is a protocol error because only intermediate and final
// segments are valid in this state.
//
action = A_ABORT;
}
break;
case X_INTERMEDIATE:
if(body->segment & SEG_MID)
{
//
// If the sequence number is valie, just wait for another segment.
//
if(body->sn == cdesc->expect_sn)
action = A_NEXT;
else
if(cdesc->n_retry++ < MAX_RETRY)
{
//
// Invalid sequence number - Request retransmission.
//
action = A_RETRY;
cdesc->xfer_state = X_IDLE;
}
else
{
//
// Invalid sequence number - Reject the DATA_XFER SDU.
//
action = A_REJECT;
cdesc->xfer_state = X_IDLE;
}
}
else
if(body->segment & SEG_FINAL)
{
if(body->sn == cdesc->expect_sn)
{
//
// The DATA_XFER SDU is complete. Acknowledge it if it is valid.
//
action = A_FINAL;
cdesc->xfer_state = X_IDLE;
}
else
if(cdesc->n_retry++ < MAX_RETRY)
{
//
// Invalid sequence number - Request retransmission.
//
action = A_RETRY;
cdesc->xfer_state = X_IDLE;
}
else
{
//
// Invalid sequence number - Reject the DATA_XFER SDU.
//
action = A_REJECT;
cdesc->xfer_state = X_IDLE;
}
}
else
{
//
// This is a protocol error because only intermediate and final
// segments are valid in this state.
//
action = A_ABORT;
}
break;
case X_FINAL:
//
// This state is conceptual for the Server and you should never
// enter long enough to receive anything if things are done correctly.
//
cdesc->state = S_READY;
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Data Transfer State %d", cdesc->xfer_state);
}
switch(action)
{
case A_NEXT:
//
// The segment is valid and non-terminal. Start the intactivity timer
// and wait for another segment.
//
cdesc->expect_sn++;
ENTER_MUTEX(&DataXfer_Mutex);
insert_msg(&cdesc->bin_seg_q, sdu);
EXIT_MUTEX(&DataXfer_Mutex);
sdu = NULL;
tdata.task = PROTOCOL_TASK;
tdata.type = T_INACTIVITY_TIMER;
tdata.period = INACTIVITY_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
break;
case A_FINAL:
syslog(LOG_INFO, "PROTO_TASK: CONN[%d] - DATASET COMPLETE", cdesc->index);
//
// The segment is valid and terminal or unitary. Acknowledge it
// and wait for another message from the Mobile.
//
// Do something with the binary data and send a postive SDU_REPONSE.
// The first step is to wait for the DB operation to compleet.
//
cdesc->expect_sn = 0;
cdesc->n_retry = 0;
ENTER_MUTEX(&DataXfer_Mutex);
insert_msg(&cdesc->bin_seg_q, sdu);
EXIT_MUTEX(&DataXfer_Mutex);
sdu = NULL;
if(handle_binary_data(cdesc, &cdesc->bin_seg_q, cdesc->final_dataset) != TRUE)
{
cdesc->state = S_READY;
send_response(cdesc, FALSE, R_DB_FAILURE);
}
break;
case A_REJECT:
//
// The segment is invalid due to a bad sequence number and
// the retry threshold has been reached. Send a negative response.
//
cdesc->state = S_READY;
cdesc->expect_sn = 0;
cdesc->n_retry = 0;
flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);
send_response(cdesc, FALSE, R_SEQUENCE);
break;
case A_RETRY:
//
// The segment is invalid due to a bad sequence number and the retry
// threshold has not been reached. Request retransmission of
// the entre DATA_XFER SDU.
//
cdesc->state = S_READY;
cdesc->expect_sn = 0;
flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);
tdata.task = PROTOCOL_TASK;
tdata.type = T_SDU_TIMER;
tdata.period = SDU_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
send_response(cdesc, FALSE, R_RETRY);
break;
case A_ABORT:
//
// The segment has some kind of protocol error.
//
cdesc->final_dataset = FALSE;
do_abort(cdesc, sdu, R_PROTOCOL);
break;
case A_IGNORE:
//
// Ignore the message and wait for an initial segment.
//
cdesc->state = S_READY;
cdesc->expect_sn = 0;
cdesc->final_dataset = FALSE;
tdata.task = PROTOCOL_TASK;
tdata.type = T_SDU_TIMER;
tdata.period = SDU_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
break;
case A_UNDEFINED:
fatal_error(LOG_EMERG, "PROTO_TASK: No DATA_XFER Action");
}
if(sdu)
MSG_FREE(sdu);
return;
}
//
// FUNCTION - handle_db_rsp(): Handle a T_DB_RESPONSE SDU from the Database Task.
//
// ARGUMENTS
// -----------
// cdesc: A connection descriptor.
// sdu: The T_DB_RESPONSE message.
//
// RETURN VALUE
// --------------
// Returns TRUE if successful and FALSE otherwise.
//
// NOTES
// -------
// None
//
ROUTINE PRIVATE BOOL handle_db_rsp(CONN_DESC *cdesc, MSG *sdu)
{
BOOL rval = FALSE;
cdesc->db_op = FALSE;
if(sdu->info == 1)
{
//
// DB operation succeeded.
//
switch(cdesc->defer_sdu_op)
{
case T_RAW_SDU:
//
// No interim message received.
//
rval = TRUE;
break;
case T_SHUTDOWN:
cdesc->shutdown = TRUE;
case T_FAULT:
case T_DISCONNECT:
case T_SVC_ABORT:
start_conn_release(cdesc);
break;
case T_UNKNOWN:
do_abort(cdesc, sdu, R_PROTOCOL);
break;
default:
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Deferred Operation %d", cdesc->defer_sdu_op);
}
}
else
do_abort(cdesc, sdu, R_DB_FAILURE);
cdesc->defer_sdu_op = T_RAW_SDU;
return(rval);
}
//
// Send and SDU_RESPONSE message to the peer.
//
ROUTINE void send_response(CONN_DESC *cdesc, BOOL positive, RESULT code)
{
MSG *msg;
RESULT result;
if(positive)
{
result = R_SUCCESS;
}
else
result = R_FAILURE;
msg = build_sdu_response(result, code);
syslog(LOG_DEBUG, "PROTO_TASK: TX EVENT %d - CONN[%d] State %d Class %d Type %d", NEvent++,
cdesc->index, cdesc->state, msg->class, msg->type);
SendTxMsg(cdesc, msg);
return;
}
//
// Send an SVC_ABORT message to the peer.
//
ROUTINE void do_abort(CONN_DESC *cdesc, MSG *sdu, RESULT code)
{
MSG *msg;
TIMER_DATA tdata;
cdesc->state = S_STANDBY;
cdesc->xfer_state = X_IDLE;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
cdesc->db_op = FALSE;
cdesc->defer_sdu_op = T_RAW_SDU;
if(cdesc->active_timer)
{
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
}
#ifdef NOT_DEFINED
if(sdu->class == C_TIMER)
fatal_error(LOG_EMERG, "PROTO_TASK: Invalid Timer Type %d", sdu->type);
#endif // NOT_DEFINED
cdesc->expect_sn = 0;
cdesc->n_retry = 0;
flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);
msg = build_svc_abort(code);
syslog(LOG_DEBUG, "PROTO_TASK: TX EVENT %d - CONN [%d] State %d Class %d Type %d", NEvent++,
cdesc->index, cdesc->state, msg->class, msg->type);
tdata.task = PROTOCOL_TASK;
tdata.type = T_RELEASE_TIMER;
tdata.period = RELEASE_TIMER_PERIOD;
cdesc->active_timer = TIMER_START(cdesc, &tdata);
SendTxMsg(cdesc, msg);
return;
}
//
// Begin termination of a socket connection and all the associated infrastructure.
//
ROUTINE PRIVATE void start_conn_release(CONN_DESC *cdesc)
{
ASYNC_DATA parm;
//
// Do protocol book keeping.
//
cdesc->state = S_DISMANTLE;
cdesc->xfer_state = X_IDLE;
#ifdef USE_SCRAM
cdesc->scram_state = AS_UNAUTHENTICATED;
#endif // USE_SCRAM
cdesc->release_state = RS_POLL_FREE;
cdesc->db_op = FALSE;
cdesc->defer_sdu_op = T_RAW_SDU;
if(cdesc->active_timer)
{
TIMER_STOP(cdesc->active_timer);
cdesc->active_timer = NULL;
}
cdesc->expect_sn = 0;
cdesc->n_retry = 0;
flush_msg(&cdesc->bin_seg_q, &DataXfer_Mutex);
//
// printf("ASYNC[%d]: START POLL TERMINATION\n", cdesc->index);
// fflush(stdout);
//
// Terminate polling and release the poll handle. This is done by sending
// message to the IO_Task(). It responds with a T_DISMATLE_RSP when everything is done.
//
bzero((void *) &parm, sizeof(ASYNC_DATA));
parm.type = T_POLL_DISMANTLE_REQ;
parm.async_handle = &IO_Task_Async_Handle;
parm.async_q = &IO_Task_Input_Q;
parm.object_handle = (uv_handle_t *) cdesc->poll_handle;
send_async_msg(cdesc, &parm);
return;
}
ROUTINE void init_rx_fsm(CONN_DESC *cdesc, int fd)
{
int index = cdesc->index;
bzero(cdesc, sizeof(CONN_DESC));
ENTER_MUTEX(&Service_Q_Mutex);
cdesc->index = index;
cdesc->state = S_IDLE;
cdesc->xfer_state = X_IDLE;
cdesc->release_state = RS_IDLE;
cdesc->fd = fd;
cdesc->decoder.buf_ptr = cdesc->decoder.buffer;
EXIT_MUTEX(&Service_Q_Mutex);
return;
}
This is the diagnostic resulting from uv_poll_init() when the return value is
EBADF.
IO_TASK: POLL_PROXY - Polling Initialization Error -9, EBADF
The stack trace is as follows.
Program terminated with signal SIGABRT, Aborted.
#0 0x00007fae27551267 in __GI_raise (sig=sig@entry=6) at
../sysdeps/unix/sysv/linux/raise.c:55
55 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007fae27551267 in __GI_raise (sig=sig@entry=6) at
../sysdeps/unix/sysv/linux/raise.c:55
#1 0x00007fae27552eca in __GI_abort () at abort.c:89
#2 0x000000000040d70f in fatal_error (log_prio=2,
fmt=0x425028 "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s")
at misc.c:135
#3 0x0000000000405ba3 in poll_proxy (handle=0x642f80 <IO_Task_Async_Handle>)
at network_io.c:287
#4 0x00000000004117bb in uv__async_io (loop=0xc27800 <Poll_Loop>, w=<optimized
out>, events=<optimized out>)
at src/unix/async.c:163
#5 0x000000000041d545 in uv__io_poll (loop=loop@entry=0xc27800 <Poll_Loop>,
timeout=<optimized out>)
at src/unix/linux-core.c:462
#6 0x0000000000411f38 in uv_run (loop=0xc27800 <Poll_Loop>,
mode=UV_RUN_DEFAULT) at src/unix/core.c:385
#7 0x0000000000405891 in IO_Task (arg=0x0) at network_io.c:100
#8 0x00007fae278ed6aa in start_thread (arg=0x7fae226c2700) at
pthread_create.c:333
#9 0x00007fae27622eed in clone () at
../sysdeps/unix/sysv/linux/x86_64/clone.S:109
In this case fd = 24 and Poll_Loop.watchers[24] is NULL.
This is the diagnostic resulting from uv_poll_init() when the return value is
EEXIST.
IO_TASK: POLL_PROXY - Polling Initialization Error -17, EEXIST
The stack trace is as follows.
Program terminated with signal SIGABRT, Aborted.
#0 0x00007fb53a39b267 in __GI_raise (sig=sig@entry=6) at
../sysdeps/unix/sysv/linux/raise.c:55
55 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007fb53a39b267 in __GI_raise (sig=sig@entry=6) at
../sysdeps/unix/sysv/linux/raise.c:55
#1 0x00007fb53a39ceca in __GI_abort () at abort.c:89
#2 0x000000000040d70f in fatal_error (log_prio=2,
fmt=0x425028 "IO_TASK: POLL_PROXY - Polling Initialization Error %d, %s")
at misc.c:135
#3 0x0000000000405ba3 in poll_proxy (handle=0x642f80 <IO_Task_Async_Handle>)
at network_io.c:287
#4 0x00000000004117bb in uv__async_io (loop=0xc27800 <Poll_Loop>, w=<optimized
out>, events=<optimized out>)
at src/unix/async.c:163
#5 0x000000000041d545 in uv__io_poll (loop=loop@entry=0xc27800 <Poll_Loop>,
timeout=<optimized out>)
at src/unix/linux-core.c:462
#6 0x0000000000411f38 in uv_run (loop=0xc27800 <Poll_Loop>,
mode=UV_RUN_DEFAULT) at src/unix/core.c:385
#7 0x0000000000405891 in IO_Task (arg=0x0) at network_io.c:100
#8 0x00007fb53a7376aa in start_thread (arg=0x7fb535d0d700) at
pthread_create.c:333
#9 0x00007fb53a46ceed in clone () at
../sysdeps/unix/sysv/linux/x86_64/clone.S:109
In this case fd = 30 and Poll_Loop.watchers[30] is NULL.
//
// Asynchronous Task for sending SDUs to the mobile and related routines.
//
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "scram.h"
#include "framework.h"
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE
//
// **************************************************************************************
// ************************************ DATA *******************************************
// **************************************************************************************
//
//
// This data is used to indicate when the task is initialized.
//
extern int N_Tasks;
extern uv_mutex_t Protocol_D_Mutex;
// The Transmit_Task() transmits messages on the Send_Q across the network via TCP.
// The Send_Q is FIFO holding all network bound SDU messages,
//
PRIVATE MSG_FIFO Send_Q;
//
// Protects the Send_Q and its condition variable.
//
PRIVATE uv_mutex_t Send_Q_Mutex;
//
// Send task message input condition variable.
//
PRIVATE uv_cond_t Send_Q_Cond;
//
// *****************************************************************************************
// *********************************** PROTOTYPES *******************************************
// *****************************************************************************************
//
void Transmit_Task(void *);
void SendTxMsg(CONN_DESC *, MSG *);
PRIVATE MSG *WaitTxMsg();
#ifdef USE_SSL
PRIVATE BOOL TX_DATA(CONN_DESC *, SSL *, const UCHAR *, int );
int Ssl_TxData(SSL *, const UCHAR *, int );
#else // USE_SSL
PRIVATE BOOL TX_DATA(CONN_DESC *, int , const UCHAR *, int );
#endif // USE_SSL
void SendProtoMsg(CONN_DESC *, MSG *);
void insert_msg(MSG_FIFO *, MSG *);
MSG *remove_msg(MSG_FIFO *);
MSG *MSG_ALLOC(int, BOOL );
void MSG_FREE(MSG *);
//
// This task transmits protocol messages across the network.
//
TASK void Transmit_Task(void *arg)
{
MSG *msg;
//
// Intialize the Transmit_Task() Mutex and condition variable.
//
Send_Q.head = Send_Q.tail = NULL;
uv_mutex_init(&Send_Q_Mutex);
uv_cond_init(&Send_Q_Cond);
ENTER_MUTEX(&Protocol_D_Mutex);
N_Tasks++;
EXIT_MUTEX(&Protocol_D_Mutex);
syslog(LOG_INFO, "TRANSMIT_TASK: Started");
//
// Transmit all messages on the send queue.
//
for(;;)
{
//
// Wait for a message.
//
msg = WaitTxMsg();
//
// The tranmission can only fail due to socket malfunction.
// In that case a T_DICONNECT message is automatically sent
// to the Server Protocol Task.
//
#ifdef USE_SSL
TX_DATA((CONN_DESC *) msg->conn_desc, msg->ssl_socket, msg->buffer, msg->data_size);
#else // USE_SSL
TX_DATA((CONN_DESC *) msg->conn_desc, (int) msg->info, msg->buffer, msg->data_size);
#endif // USE_SSL
MSG_FREE(msg);
}
return;
}
//
// FUNCTION - SendTxMSg(): Transmits an SDU to a mobile system.
//
// ARGUMENTS
// -----------
// cdesc: The connection descriptor of the network connection over which the message is sent.
// sdu: The SDU
//
// RETURN VALUE
// --------------
// None.
//
// NOTES
// -------
// This routine does not actually transmit the message. Instead the message is
// conveyed to the Transmit_Task() which actually performs transmission.
//
ROUTINE void SendTxMsg(CONN_DESC *cdesc, MSG *sdu)
{
#ifdef USE_SSL
sdu->ssl_socket = cdesc->ssl_socket;
#else // USE_SSL
sdu->info = cdesc->fd;
#endif // USE_SSL
sdu->conn_desc = (CONN_DESC *) cdesc;
ENTER_MUTEX(&Send_Q_Mutex);
//
// Send a message to the task whose input queue is q.
//
insert_msg(&Send_Q, sdu);
uv_cond_signal(&Send_Q_Cond);
EXIT_MUTEX(&Send_Q_Mutex);
return;
}
//
// Wait for a message to arrive from another task.
//
ROUTINE PRIVATE MSG *WaitTxMsg()
{
MSG *sdu;
ENTER_MUTEX(&Send_Q_Mutex);
//
// Wait for a message from a task that inserts messages in q.
//
while( !(sdu = remove_msg(&Send_Q)) )
{
uv_cond_wait(&Send_Q_Cond, &Send_Q_Mutex);
}
EXIT_MUTEX(&Send_Q_Mutex);
return(sdu);
}
#ifdef USE_SSL
//
// Transmit a message across the network.
//
ROUTINE PRIVATE BOOL TX_DATA(CONN_DESC *cdesc, SSL *socket, const UCHAR *buf, int size)
{
const UCHAR *pbuf = buf;
int n;
BOOL result = TRUE;
while(size > 0)
{
if( (n = Ssl_TxData(socket, pbuf, size)) > 0 )
{
size -= n;
pbuf += n;
}
else
if(n < 0)
{
MSG *msg;
result = FALSE;
syslog(LOG_ERR, "TRANSMIT_TASK: TX DISCONNECT %d - %s !\n", errno, strerror(errno) );
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISCONNECT;
msg->info = 0;
SendProtoMsg(cdesc, msg);
break;
}
}
return(result);
}
#else // USE_SSL
//
// Transmit a message across the network.
//
ROUTINE PRIVATE BOOL TX_DATA(CONN_DESC *cdesc, int fd, const UCHAR *buf, int size)
{
const UCHAR *pbuf = buf;
int n;
BOOL result = TRUE;
while(size > 0)
{
if( (n = send(fd, pbuf, size, MSG_DONTWAIT)) > 0 )
{
size -= n;
pbuf += n;
}
else
if(n < 0)
{
if( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) )
{
MSG *msg;
result = FALSE;
syslog(LOG_ERR, "TRANSMIT_TASK: TX DISCONNECT %d - %s !\n", errno, strerror(errno) );
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISCONNECT;
msg->info = 0;
SendProtoMsg(cdesc, msg);
break;
}
}
}
return(result);
}
#endif // USE_SSL