If a process is forked after log agent initialized, the child process use the same mds address as parent process. If the child process initializes another log client, logd still consider it as a log client of log agent in parent process. The parent process didn't know that therefore a mailbox is introduced for interaction between processes in a group. --- src/log/agent/lga_agent.cc | 2 + src/log/agent/lga_util.cc | 138 +++++++++++++++++++++++++++++++++++-- src/log/agent/lga_util.h | 17 +++++ 3 files changed, 150 insertions(+), 7 deletions(-)
diff --git a/src/log/agent/lga_agent.cc b/src/log/agent/lga_agent.cc index e84ea3a28..a0f720ff3 100644 --- a/src/log/agent/lga_agent.cc +++ b/src/log/agent/lga_agent.cc @@ -137,6 +137,7 @@ ScopeData::~ScopeData() { //------------------------------------------------------------------------------ LogAgent::LogAgent() { client_list_.clear(); + lga_proc_mng_start(); // There is high risk of calling one @LogClient method // in the body of other @LogClient methods, such case would cause deadlock // even they are in the same thread context. @@ -167,6 +168,7 @@ LogAgent::~LogAgent() { TRACE_ENTER(); ScopeLock scopeLock(mutex_); + lga_proc_mng_stop(); stop_recovery2_thread(); lga_shutdown(); m_NCS_SEL_OBJ_DESTROY(&init_clm_status_sel_); diff --git a/src/log/agent/lga_util.cc b/src/log/agent/lga_util.cc index 30920b7aa..fc4b9175e 100644 --- a/src/log/agent/lga_util.cc +++ b/src/log/agent/lga_util.cc @@ -32,6 +32,112 @@ // Variables used during startup/shutdown only static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; static unsigned int client_counter = 0; +static bool is_proc_mng_started = false; + +// Mailbox between forked processes +static int proc_socks[2] = {0}; +static int* child_proc_sock_p = &proc_socks[0]; +static int* mng_proc_sock_p = &proc_socks[1]; +// Id of thread managing the forked processes +static pthread_t proc_mng_thread_id = 0; + +/** + * Thread manages forked processed + */ +void* proc_mng_thread(void*) { + TRACE_ENTER(); + while (1) { + // Wait for timeout or a signal to terminate + int ret = osaf_poll_one_fd(*mng_proc_sock_p, -1); + if (ret <= 0) { + LOG_NO("%s: poll failed %s", __func__, strerror(errno)); + break; + } else { + lga_proc_msg_t msg; + if (read(*mng_proc_sock_p, &msg, sizeof(msg)) > 0) { + switch (msg.type) { + case LGA_PROC_TERMINATE_MSG: + TRACE("Received terminate event. Terminating!"); + pthread_exit(nullptr); + TRACE_LEAVE(); + return nullptr; + case LGA_PROC_INCREASE_USER_MSG: + client_counter++; + TRACE("Increase client_counter to %u", client_counter); + break; + case LGA_PROC_DECREASE_USER_MSG: + client_counter--; + TRACE("Descrease client_counter to %u", client_counter); + break; + default: + TRACE("Unknown type %d", msg.type); + break; + } + } else { + TRACE("Failed to receive message"); + } + } + } + pthread_exit(nullptr); + TRACE_LEAVE(); + return nullptr; +} + +/** + * Start management for forked processes + */ +void lga_proc_mng_start(void) { + TRACE_ENTER(); + if (!is_proc_mng_started) { + is_proc_mng_started = true; + // Create socket for process interaction + int rc = socketpair(AF_UNIX, SOCK_STREAM, 0, proc_socks); + if (rc == -1) { + LOG_NO("%s: socketpair failed %s", __func__, strerror(errno)); + is_proc_mng_started = false; + return; + } + // Start thread to handle message from forked processes + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + rc = pthread_create(&proc_mng_thread_id, &attr, proc_mng_thread, nullptr); + if (rc != 0) { + LOG_NO("%s: pthread_create failed %s", __func__, strerror(rc)); + close(proc_socks[0]); + close(proc_socks[1]); + pthread_attr_destroy(&attr); + is_proc_mng_started = false; + } else { + pthread_attr_destroy(&attr); + } + } + TRACE_LEAVE(); +} + +/** + * Stop management for forked processes + */ +void lga_proc_mng_stop(void) { + TRACE_ENTER(); + if (is_proc_mng_started) { + // Send the terminating message to the management thread + lga_proc_msg_t term_msg; + term_msg.type = LGA_PROC_TERMINATE_MSG; + lga_proc_send_to_mng(term_msg); + // Wait for management thread + int rc = pthread_join(proc_mng_thread_id, nullptr); + if (rc != 0) { + LOG_NO("%s: Could not join the management thread %s", + __func__, strerror(rc)); + } + // Close interaction sockets + close(proc_socks[0]); + close(proc_socks[1]); + is_proc_mng_started = false; + } + TRACE_LEAVE(); +} /** * @return unsigned int @@ -144,10 +250,12 @@ done: * The function help to trace number of clients */ void lga_increase_user_counter(void) { + TRACE_ENTER(); ScopeLock lock(init_lock); - - ++client_counter; - TRACE_2("client_counter: %u", client_counter); + lga_proc_msg_t msg; + msg.type = LGA_PROC_INCREASE_USER_MSG; + lga_proc_send_to_mng(msg); + TRACE_LEAVE(); } /** @@ -155,11 +263,12 @@ void lga_increase_user_counter(void) { * The function help to trace number of clients */ void lga_decrease_user_counter(void) { - TRACE_ENTER2("client_counter: %u", client_counter); + TRACE_ENTER(); ScopeLock lock(init_lock); - - if (client_counter > 0) - --client_counter; + lga_proc_msg_t msg; + msg.type = LGA_PROC_DECREASE_USER_MSG; + lga_proc_send_to_mng(msg); + TRACE_LEAVE(); } /** @@ -171,6 +280,21 @@ unsigned int lga_get_number_of_user(void){ return client_counter; } +/** + * Send the message to the process constructed this agent + */ +uint32_t lga_proc_send_to_mng(const lga_proc_msg_t& msg) { + int rc = 0; + + rc = write(*child_proc_sock_p, &msg, sizeof(msg)); + if (rc == -1) { + LOG_WA("Log agent: failed to send message to management thread"); + return NCSCC_RC_FAILURE; + } + + return NCSCC_RC_SUCCESS; +} + /** * Check if the name is valid or not. */ diff --git a/src/log/agent/lga_util.h b/src/log/agent/lga_util.h index 4858e758a..a6a341f5e 100644 --- a/src/log/agent/lga_util.h +++ b/src/log/agent/lga_util.h @@ -22,6 +22,23 @@ #include <saAis.h> #include <saLog.h> +/* Message type */ +enum lga_proc_msg_type_t { + LGA_PROC_INCREASE_USER_MSG, + LGA_PROC_DECREASE_USER_MSG, + LGA_PROC_TERMINATE_MSG, + LGA_MSG_MAX +}; + +/* Message used for forked processes */ +typedef struct lga_proc_msg_t_ { + lga_proc_msg_type_t type; /* message type */ +} lga_proc_msg_t; + +void lga_proc_mng_start(void); +void lga_proc_mng_stop(void); +uint32_t lga_proc_send_to_mng(const lga_proc_msg_t& msg); + unsigned int lga_startup(); unsigned int lga_shutdown(); void lga_increase_user_counter(void); -- 2.25.1 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel