Re-factor the inter-node poll loop by replacing the 100 entry fixed size poll
vector with a epoll event processing. This allows dtm to scale to clusters
larger than 100 nodes.
---
src/dtm/dtmnd/dtm.h | 7 +-
src/dtm/dtmnd/dtm_cb.h | 14 +-
src/dtm/dtmnd/dtm_inter_trans.c | 20 +-
src/dtm/dtmnd/dtm_inter_trans.h | 10 +-
src/dtm/dtmnd/dtm_intra.c | 3 +-
src/dtm/dtmnd/dtm_main.c | 12 +-
src/dtm/dtmnd/dtm_node.c | 529 +++++++++++++++------------------------
src/dtm/dtmnd/dtm_node.h | 22 +-
src/dtm/dtmnd/dtm_node_db.c | 94 ++-----
src/dtm/dtmnd/dtm_node_sockets.c | 356 +++++++++++---------------
src/dtm/dtmnd/dtm_read_config.c | 14 +-
11 files changed, 435 insertions(+), 646 deletions(-)
diff --git a/src/dtm/dtmnd/dtm.h b/src/dtm/dtmnd/dtm.h
index 76db40b61..4dd103db6 100644
--- a/src/dtm/dtmnd/dtm.h
+++ b/src/dtm/dtmnd/dtm.h
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -21,6 +22,7 @@
/* From /base/common/inc */
#include "base/logtrace.h"
+#include <stdbool.h>
#include <poll.h>
#include "base/usrbuf.h"
#include "base/ncsencdec_pub.h"
@@ -30,7 +32,7 @@
#include "dtm_cb.h"
extern DTM_INTERNODE_CB *dtms_gl_cb;
-extern uint8_t initial_discovery_phase;
+extern bool initial_discovery_phase;
#define BMCAST_MSG_LEN ( sizeof(uint16_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint16_t)
@@ -106,10 +108,9 @@ extern void node_discovery_process(void *arg);
extern uint32_t dtm_cb_init(DTM_INTERNODE_CB *dtms_cb);
extern DTM_NODE_DB *dtm_node_get_by_id(uint32_t nodeid);
extern DTM_NODE_DB *dtm_node_getnext_by_id(uint32_t node_id);
-extern DTM_NODE_DB *dtm_node_get_by_comm_socket(uint32_t comm_socket);
extern uint32_t dtm_node_add(DTM_NODE_DB *node, int i);
extern uint32_t dtm_node_delete(DTM_NODE_DB *nnode, int i);
-extern DTM_NODE_DB *dtm_node_new(DTM_NODE_DB *new_node);
+extern DTM_NODE_DB *dtm_node_new(const DTM_NODE_DB *new_node);
extern int dtm_read_config(DTM_INTERNODE_CB *config, char *dtm_config_file);
uint32_t dtm_service_discovery_init(DTM_INTERNODE_CB *dtms_cb);
diff --git a/src/dtm/dtmnd/dtm_cb.h b/src/dtm/dtmnd/dtm_cb.h
index c5b5890d9..cb18f0fad 100644
--- a/src/dtm/dtmnd/dtm_cb.h
+++ b/src/dtm/dtmnd/dtm_cb.h
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -18,6 +19,8 @@
#ifndef DTM_DTMND_DTM_CB_H_
#define DTM_DTMND_DTM_CB_H_
+#include <stdbool.h>
+
#define MAX_PORT_LENGTH 256
typedef enum dtm_ip_addr_type {
@@ -42,9 +45,8 @@ typedef struct node_list {
DTM_IP_ADDR_TYPE i_addr_family; /* Indicates V4 or V6 */
int comm_socket;
NCS_PATRICIA_NODE pat_nodeid;
- NCS_PATRICIA_NODE pat_comm_socket;
NCS_PATRICIA_NODE pat_ip_address;
- uint8_t comm_status;
+ bool comm_status;
NCS_LOCK node_lock;
SYSF_MBX mbx;
int mbx_fd;
@@ -78,13 +80,12 @@ typedef struct dtm_internode_cb {
int dgram_sock_sndr; /* */
int dgram_sock_rcvr; /* */
DTM_IP_ADDR_TYPE i_addr_family; /* Indicates V4 or V6 */
- uint8_t mcast_flag; /* Indicates mcast */
+ bool mcast_flag; /* Indicates mcast */
int32_t initial_dis_timeout;
int32_t cont_bcast_int;
int64_t bcast_msg_freq;
- NCS_PATRICIA_TREE nodeid_tree; /* NODE_DB information of Nodes */
- NCS_PATRICIA_TREE comm_sock_tree; /* NODE_DB information of Nodes */
- NCS_PATRICIA_TREE ip_addr_tree; /* NODE_DB information of Nodes */
+ NCS_PATRICIA_TREE nodeid_tree; /* NODE_DB information of Nodes */
+ NCS_PATRICIA_TREE ip_addr_tree; /* NODE_DB information of Nodes */
int so_keepalive;
NCS_LOCK cb_lock;
int comm_keepidle_time;
@@ -96,6 +97,7 @@ typedef struct dtm_internode_cb {
int32_t sock_rcvbuf_size; /* The value of SO_RCVBUF */
SYSF_MBX mbx;
int mbx_fd;
+ int epoll_fd;
} DTM_INTERNODE_CB;
/*extern DTM_INTERNODE_CB *dtms_gl_cb; */
diff --git a/src/dtm/dtmnd/dtm_inter_trans.c b/src/dtm/dtmnd/dtm_inter_trans.c
index 1827f2060..4ea87fe02 100644
--- a/src/dtm/dtmnd/dtm_inter_trans.c
+++ b/src/dtm/dtmnd/dtm_inter_trans.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -15,6 +16,7 @@
*
*/
+#include "dtm/dtmnd/dtm_inter_trans.h"
#include "dtm.h"
#include "dtm_cb.h"
#include "dtm_inter.h"
@@ -25,7 +27,6 @@ uint32_t dtm_internode_snd_msg_to_all_nodes(uint8_t *buffer,
uint16_t len);
uint32_t dtm_internode_snd_msg_to_node(uint8_t *buffer, uint16_t len,
NODE_ID node_id);
-uint32_t dtm_internode_process_pollout(int fd);
uint32_t dtm_prepare_data_msg(uint8_t *buffer, uint16_t len);
static uint32_t dtm_internode_snd_unsent_msg(DTM_NODE_DB *node);
static uint32_t dtm_internode_snd_msg_common(DTM_NODE_DB *node, uint8_t
*buffer,
@@ -203,8 +204,7 @@ static uint32_t dtm_internode_snd_msg_common(DTM_NODE_DB
*node, uint8_t *buffer,
add_ptr->len = len;
node->msgs_hdr = add_ptr;
node->msgs_tail = add_ptr;
- dtm_internode_set_poll_fdlist(node->comm_socket,
- POLLOUT);
+ dtm_internode_set_pollout(node);
return NCSCC_RC_SUCCESS;
}
} else {
@@ -219,8 +219,7 @@ static uint32_t dtm_internode_snd_msg_common(DTM_NODE_DB
*node, uint8_t *buffer,
add_ptr->len = len;
tail->next = add_ptr;
node->msgs_tail = add_ptr;
- dtm_internode_set_poll_fdlist(node->comm_socket,
- POLLOUT);
+ dtm_internode_set_pollout(node);
return NCSCC_RC_SUCCESS;
}
}
@@ -271,12 +270,9 @@ uint32_t dtm_internode_snd_msg_to_node(uint8_t *buffer,
uint16_t len,
* @return NCSCC_RC_FAILURE
*
*/
-uint32_t dtm_internode_process_pollout(int fd)
+uint32_t dtm_internode_process_pollout(DTM_NODE_DB *node)
{
- DTM_NODE_DB *node = NULL;
-
TRACE_ENTER();
- node = dtm_node_get_by_comm_socket((uint32_t)fd);
if (NULL == node) {
LOG_ER(
"DTM :No node matching the fd for pollout, delete this fd from
fd list ");
@@ -288,7 +284,7 @@ uint32_t dtm_internode_process_pollout(int fd)
if (NULL == hdr) {
/* No messages to be sent, reset the POLLOUT event on
* this fd */
- dtm_internode_reset_poll_fdlist(node->comm_socket);
+ dtm_internode_clear_pollout(node);
} else {
dtm_internode_snd_unsent_msg(node);
}
@@ -315,7 +311,7 @@ static uint32_t dtm_internode_snd_unsent_msg(DTM_NODE_DB
*node)
int snd_count = 0;
TRACE_ENTER();
if (NULL == unsent_msg) {
- dtm_internode_reset_poll_fdlist(node->comm_socket);
+ dtm_internode_clear_pollout(node);
return NCSCC_RC_SUCCESS;
}
while (NULL != unsent_msg) {
@@ -365,7 +361,7 @@ static uint32_t dtm_internode_snd_unsent_msg(DTM_NODE_DB
*node)
}
}
if (NULL == node->msgs_hdr) {
- dtm_internode_reset_poll_fdlist(node->comm_socket);
+ dtm_internode_clear_pollout(node);
}
TRACE_LEAVE();
return NCSCC_RC_SUCCESS;
diff --git a/src/dtm/dtmnd/dtm_inter_trans.h b/src/dtm/dtmnd/dtm_inter_trans.h
index ab13292b3..961eef8de 100644
--- a/src/dtm/dtmnd/dtm_inter_trans.h
+++ b/src/dtm/dtmnd/dtm_inter_trans.h
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -14,9 +15,16 @@
* Author(s): GoAhead Software
*
*/
+
#ifndef DTM_DTMND_DTM_INTER_TRANS_H_
#define DTM_DTMND_DTM_INTER_TRANS_H_
+#include <stdint.h>
+#include "mds/mds_papi.h"
+
+struct node_list;
+typedef struct node_list DTM_NODE_DB;
+
extern uint32_t dtm_add_to_msg_dist_list(uint8_t *buffer, uint16_t len,
NODE_ID node_id);
@@ -25,7 +33,7 @@ extern uint32_t dtm_internode_snd_msg_to_all_nodes(uint8_t *buffer,
extern uint32_t dtm_internode_snd_msg_to_node(uint8_t *buffer, uint16_t len,
NODE_ID node_id);
-extern uint32_t dtm_internode_process_pollout(int fd);
+extern uint32_t dtm_internode_process_pollout(DTM_NODE_DB *node);
extern uint32_t dtm_prepare_data_msg(uint8_t *buffer, uint16_t len);
#endif // DTM_DTMND_DTM_INTER_TRANS_H_
diff --git a/src/dtm/dtmnd/dtm_intra.c b/src/dtm/dtmnd/dtm_intra.c
index fa67a9678..d5bb3ac9b 100644
--- a/src/dtm/dtmnd/dtm_intra.c
+++ b/src/dtm/dtmnd/dtm_intra.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -130,7 +131,7 @@ uint32_t dtm_intra_processing_init(char *node_name, char
*node_ip,
/* Open a socket, If socket opens to fail return Error */
dtm_intranode_cb->server_sockfd =
- socket(dtm_socket_domain, SOCK_STREAM, 0);
+ socket(dtm_socket_domain, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (dtm_intranode_cb->server_sockfd < 0) {
LOG_ER("DTM: Socket creation failed err :%s ", strerror(errno));
diff --git a/src/dtm/dtmnd/dtm_main.c b/src/dtm/dtmnd/dtm_main.c
index aa6b8427e..98818a02f 100644
--- a/src/dtm/dtmnd/dtm_main.c
+++ b/src/dtm/dtmnd/dtm_main.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -28,8 +29,10 @@
#include "dtm_node.h"
#include "base/osaf_poll.h"
-#include <stdlib.h>
#include <sched.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
/* ========================================================================
* DEFINITIONS
@@ -57,7 +60,7 @@ NCSCONTEXT gl_serv_dis_task_hdl = 0;
static DTM_INTERNODE_CB _dtms_cb;
DTM_INTERNODE_CB *dtms_gl_cb = &_dtms_cb;
-uint8_t initial_discovery_phase = true;
+bool initial_discovery_phase = true;
/* ========================================================================
* FUNCTION PROTOTYPES
@@ -111,7 +114,7 @@ static uint32_t dtm_construct_bcast_hdr(DTM_INTERNODE_CB
*dtms_cb,
ncs_encode_16bit(&data, *pack_size);
ncs_encode_16bit(&data, dtms_cb->cluster_id);
ncs_encode_32bit(&data, dtms_cb->node_id);
- ncs_encode_8bit(&data, dtms_cb->mcast_flag);
+ ncs_encode_8bit(&data, (uint8_t)dtms_cb->mcast_flag);
ncs_encode_16bit(&data, dtms_cb->stream_port);
ncs_encode_8bit(&data, (uint8_t)dtms_cb->i_addr_family);
memcpy(data, dtms_cb->ip_addr, INET6_ADDRSTRLEN);
@@ -399,7 +402,8 @@ int main(int argc, char *argv[])
send_bcast_buffer,
bcast_buf_len);
} else {
- m_NCS_TASK_SLEEP(0xfffffff0);
+ for (;;)
+ pause();
}
}
done1:
diff --git a/src/dtm/dtmnd/dtm_node.c b/src/dtm/dtmnd/dtm_node.c
index 29821633e..9061b629d 100644
--- a/src/dtm/dtmnd/dtm_node.c
+++ b/src/dtm/dtmnd/dtm_node.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -16,14 +17,15 @@
*/
#include "dtm.h"
+#include <stdbool.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
#include "dtm_socket.h"
#include "dtm_node.h"
#include "dtm_inter.h"
#include "dtm_inter_disc.h"
#include "dtm_inter_trans.h"
-#define MAX_FD 103
-#define DTM_TCP_POLL_TIMEOUT 20000
#define DTM_INTERNODE_RECV_BUFFER_SIZE 1024
/* packet_size + mds_indentifier + mds_version + msg_type +node_id +
@@ -32,8 +34,16 @@
#define NODE_INFO_PKT_SIZE (NODE_INFO_HDR_SIZE + _POSIX_HOST_NAME_MAX)
-static struct pollfd fds[MAX_FD]; /* Poll fds global list */
-static int nfds = 0;
+static void ReceiveBcastOrMcast(void);
+static void AcceptTcpConnections(uint8_t *node_info_hrd,
+ int node_info_buffer_len);
+static void ReceiveFromMailbox(void);
+static void AddNodeToEpoll(DTM_INTERNODE_CB *dtms_cb, DTM_NODE_DB *node);
+static void RemoveNodeFromEpoll(DTM_INTERNODE_CB *dtms_cb, DTM_NODE_DB *node);
+
+static DTM_NODE_DB dgram_sock_rcvr;
+static DTM_NODE_DB stream_sock;
+static DTM_NODE_DB mbx_fd;
/**
* Function to construct the node info hdr
@@ -74,29 +84,28 @@ static uint32_t
dtm_construct_node_info_hdr(DTM_INTERNODE_CB *dtms_cb,
* @return NCSCC_RC_FAILURE
*
*/
-uint32_t dtm_process_node_info(DTM_INTERNODE_CB *dtms_cb, int stream_sock,
+uint32_t dtm_process_node_info(DTM_INTERNODE_CB *dtms_cb, DTM_NODE_DB *node,
uint8_t *buffer, uint8_t *node_info_hrd,
int buffer_len)
{
uint32_t node_id;
- DTM_NODE_DB *node;
uint32_t nodename_len;
char nodename[_POSIX_HOST_NAME_MAX];
int rc = 0;
uint8_t *data = buffer;
TRACE_ENTER();
- node_id = ncs_decode_32bit(&data);
- nodename_len = ncs_decode_32bit(&data);
- strncpy((char *)nodename, (char *)data, nodename_len);
-
- node = dtm_node_get_by_comm_socket(stream_sock);
-
if (node == NULL) {
rc = NCSCC_RC_FAILURE;
goto done;
}
+ int fd = node->comm_socket;
+
+ node_id = ncs_decode_32bit(&data);
+ nodename_len = ncs_decode_32bit(&data);
+ strncpy((char *)nodename, (char *)data, nodename_len);
+
if (!node->comm_status) {
/*****************************************************/
@@ -120,8 +129,8 @@ uint32_t dtm_process_node_info(DTM_INTERNODE_CB *dtms_cb,
int stream_sock,
} else if (node->node_id == node_id) {
strncpy((char *)&node->node_name, nodename,
nodename_len);
- rc = dtm_comm_socket_send(stream_sock, node_info_hrd,
- buffer_len);
+ rc =
+ dtm_comm_socket_send(fd, node_info_hrd, buffer_len);
if (rc != NCSCC_RC_SUCCESS) {
LOG_ER(
@@ -175,9 +184,9 @@ done:
*/
uint32_t dtm_process_node_up_down(NODE_ID node_id, char *node_name,
char *node_ip, DTM_IP_ADDR_TYPE i_addr_family,
- uint8_t comm_status)
+ bool comm_status)
{
- if (true == comm_status) {
+ if (comm_status == true) {
TRACE(
"DTM: dtm_process_node_up_down node_ip:%s, node_id:%u
i_addr_family:%d ",
node_ip, node_id, i_addr_family);
@@ -201,7 +210,7 @@ void dtm_internode_process_poll_rcv_msg_common(DTM_NODE_DB
*node,
uint16_t local_len_buf,
uint8_t *node_info_hrd,
uint16_t node_info_buffer_len,
- int fd, int *close_conn)
+ bool *close_conn)
{
DTM_MSG_TYPES pkt_type = 0;
uint32_t identifier = 0;
@@ -238,7 +247,7 @@ void dtm_internode_process_poll_rcv_msg_common(DTM_NODE_DB
*node,
alloc_buffer, (local_len_buf - 6), node->node_id);
} else if (pkt_type == DTM_CONN_DETAILS_MSG_TYPE) {
if (dtm_process_node_info(
- dtms_cb, fd, &node->buffer[8], node_info_hrd,
+ dtms_cb, node, &node->buffer[8], node_info_hrd,
node_info_buffer_len) != NCSCC_RC_SUCCESS) {
LOG_ER(
" DTM : communication socket Connection closed\n");
@@ -289,19 +298,19 @@ done:
* @return NCSCC_RC_FAILURE
*
*/
-void dtm_internode_process_poll_rcv_msg(int fd, int *close_conn,
+void dtm_internode_process_poll_rcv_msg(DTM_NODE_DB *node, bool *close_conn,
uint8_t *node_info_hrd,
uint16_t node_info_buffer_len)
{
- DTM_NODE_DB *node = NULL;
TRACE_ENTER();
- node = dtm_node_get_by_comm_socket(fd);
-
if (NULL == node) {
LOG_ER("DTM: database mismatch");
osafassert(0);
}
+
+ int fd = node->comm_socket;
+
if (0 == node->bytes_tb_read) {
if (0 == node->num_by_read_for_len_buff) {
uint8_t *data;
@@ -311,7 +320,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
/* Receive all incoming data on this socket */
/*******************************************************/
- recd_bytes = recv(fd, node->len_buff, 2, 0);
+ recd_bytes = recv(fd, node->len_buff, 2, MSG_DONTWAIT);
if (0 == recd_bytes) {
*close_conn = true;
return;
@@ -333,7 +342,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
return;
}
recd_bytes = recv(fd, &node->buffer[2],
- local_len_buf, 0);
+ local_len_buf, MSG_DONTWAIT);
if (recd_bytes < 0) {
return;
@@ -353,8 +362,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
/* Call the common rcv function */
dtm_internode_process_poll_rcv_msg_common(
node, local_len_buf, node_info_hrd,
- node_info_buffer_len, fd,
- close_conn);
+ node_info_buffer_len, close_conn);
} else {
LOG_ER(
"DTM :unknown corrupted data received on
this file descriptor \n");
@@ -380,7 +388,8 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
} else if (1 == node->num_by_read_for_len_buff) {
int recd_bytes = 0;
- recd_bytes = recv(fd, &node->len_buff[1], 1, 0);
+ recd_bytes =
+ recv(fd, &node->len_buff[1], 1, MSG_DONTWAIT);
if (recd_bytes < 0) {
/* This can happen due to system call interrupt
*/
@@ -411,8 +420,8 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
"DTM :Memory allocation failed in
dtm_internode_processing \n");
return;
}
- recd_bytes =
- recv(fd, &node->buffer[2], node->buff_total_len, 0);
+ recd_bytes = recv(fd, &node->buffer[2],
+ node->buff_total_len, MSG_DONTWAIT);
if (recd_bytes < 0) {
return;
@@ -432,7 +441,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
/* Call the common rcv function */
dtm_internode_process_poll_rcv_msg_common(
node, node->buff_total_len, node_info_hrd,
- node_info_buffer_len, fd, close_conn);
+ node_info_buffer_len, close_conn);
} else {
LOG_ER(
"DTM :unknown corrupted data received on this
file descriptor \n");
@@ -451,7 +460,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
recd_bytes = recv(fd,
&node->buffer[2 + (node->buff_total_len -
node->bytes_tb_read)],
- node->bytes_tb_read, 0);
+ node->bytes_tb_read, MSG_DONTWAIT);
if (recd_bytes < 0) {
return;
@@ -470,7 +479,7 @@ void dtm_internode_process_poll_rcv_msg(int fd, int
*close_conn,
/* Call the common rcv function */
dtm_internode_process_poll_rcv_msg_common(
node, node->buff_total_len, node_info_hrd,
- node_info_buffer_len, fd, close_conn);
+ node_info_buffer_len, close_conn);
} else {
LOG_ER(
"DTM :unknown corrupted data received on this file
descriptor \n");
@@ -494,23 +503,13 @@ void node_discovery_process(void *arg)
{
TRACE_ENTER();
- int poll_ret = 0;
- int end_server = false, compress_array = false;
- int close_conn = false;
+ bool close_conn = false;
DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
- int current_size = 0, i, j;
-
/* Data Received */
- uint8_t inbuf[DTM_INTERNODE_RECV_BUFFER_SIZE];
- uint8_t *data1; /* Used for DATAGRAM decoding */
- uint16_t recd_bytes = 0;
- uint16_t recd_buf_len = 0;
int node_info_buffer_len = 0;
uint8_t node_info_hrd[NODE_INFO_PKT_SIZE];
- char node_ip[INET6_ADDRSTRLEN];
- memset(&node_ip, 0, INET6_ADDRSTRLEN);
/*************************************************************/
/* Set up the initial bcast or mcast receiver socket */
/*************************************************************/
@@ -520,7 +519,7 @@ void node_discovery_process(void *arg)
if (NCSCC_RC_SUCCESS != dtm_dgram_bcast_listener(dtms_cb)) {
LOG_ER(
"DTM:Set up the initial bcast receiver socket
failed");
- exit(1);
+ exit(EXIT_FAILURE);
}
} else {
@@ -528,7 +527,7 @@ void node_discovery_process(void *arg)
if (NCSCC_RC_SUCCESS != dtm_dgram_mcast_listener(dtms_cb)) {
LOG_ER(
"DTM:Set up the initial mcast receiver socket
failed");
- exit(1);
+ exit(EXIT_FAILURE);
}
}
@@ -538,31 +537,15 @@ void node_discovery_process(void *arg)
if (NCSCC_RC_SUCCESS != dtm_stream_nonblocking_listener(dtms_cb)) {
LOG_ER(
"DTM: Set up the initial stream nonblocking serv failed");
- exit(1);
+ exit(EXIT_FAILURE);
}
- /*************************************************************/
- /* Initialize the pollfd structure */
- /*************************************************************/
- memset(fds, 0, sizeof(fds));
-
- /*************************************************************/
- /* Set up the initial listening socket */
- /*************************************************************/
-
- fds[0].fd = dtms_cb->dgram_sock_rcvr;
- fds[0].events = POLLIN;
-
- /*************************************************************/
- /* Set up the initial listening socket */
- /*************************************************************/
-
- fds[1].fd = dtms_cb->stream_sock;
- fds[1].events = POLLIN;
-
- fds[2].fd = dtms_cb->mbx_fd;
- fds[2].events = POLLIN;
- nfds = 3;
+ dgram_sock_rcvr.comm_socket = dtms_cb->dgram_sock_rcvr;
+ stream_sock.comm_socket = dtms_cb->stream_sock;
+ mbx_fd.comm_socket = dtms_cb->mbx_fd;
+ AddNodeToEpoll(dtms_cb, &dgram_sock_rcvr);
+ AddNodeToEpoll(dtms_cb, &stream_sock);
+ AddNodeToEpoll(dtms_cb, &mbx_fd);
/*************************************************************/
/* Set up the initial listening socket */
@@ -581,230 +564,56 @@ void node_discovery_process(void *arg)
/* on any of the connected sockets. */
/*************************************************************/
- do {
+ for (;;) {
/***********************************************************/
/* Call poll() and wait . */
/***********************************************************/
- int fd_check = 0;
- poll_ret = poll(fds, nfds, DTM_TCP_POLL_TIMEOUT);
+ struct epoll_event events[128];
+ int poll_ret;
+ do {
+ poll_ret =
+ epoll_wait(dtms_cb->epoll_fd, &events[0],
+ sizeof(events) / sizeof(events[0]), -1);
+ } while (poll_ret < 0 && errno == EINTR);
/***********************************************************/
/* Check to see if the poll call failed. */
/***********************************************************/
if (poll_ret < 0) {
- LOG_ER(" poll() failed");
- continue;
- }
- /***********************************************************/
- /* Check to see if the 3 minute time out expired. */
- /***********************************************************/
- if (poll_ret == 0) {
- continue;
+ LOG_ER("epoll_wait() failed: %d", errno);
+ break;
}
/***********************************************************/
/* One or more descriptors are readable. Need to */
/* determine which ones they are. */
/***********************************************************/
- current_size = nfds;
- for (i = 0; i < current_size; i++) {
+ for (int i = 0; i < poll_ret; ++i) {
+ DTM_NODE_DB *node = (DTM_NODE_DB *)events[i].data.ptr;
/*********************************************************/
/* Loop through to find the descriptors that returned */
- /* POLLIN and determine whether it's the listening */
+ /* EPOLLIN and determine whether it's the listening */
/* or the active connection. */
/*********************************************************/
- if (POLLIN & fds[i].revents) {
-
- if (fds[i].fd == dtms_cb->dgram_sock_rcvr) {
-
- fd_check++;
+ if ((events[i].events & EPOLLIN) != 0) {
+ if (node == &dgram_sock_rcvr) {
/* Data Received */
- memset(inbuf, 0,
- DTM_INTERNODE_RECV_BUFFER_SIZE);
- recd_bytes = 0;
- recd_buf_len = 0;
-
- recd_bytes = dtm_dgram_recvfrom_bmcast(
- dtms_cb, node_ip, inbuf,
- sizeof(inbuf));
-
- if (recd_bytes == 0) {
- LOG_ER(
- "DTM: recd bytes=0 on DGRAM
sock");
- continue;
- }
-
- data1 =
- inbuf; /* take care of previous
- address */
-
- recd_buf_len = ncs_decode_16bit(&data1);
-
- if (recd_buf_len == recd_bytes) {
-
- int new_sd = -1;
-
- new_sd = dtm_process_connect(
- dtms_cb, node_ip, inbuf,
- (recd_bytes - 2));
-
- if (new_sd == -1)
- continue;
-
-
/*****************************************************/
- /* Add the new incoming
- * connection to the */
- /* pollfd structure */
-
/*****************************************************/
- LOG_IN(
- "DTM: add New incoming
connection to fd : %d\n",
- new_sd);
- fds[nfds].fd = new_sd;
- fds[nfds].events =
- POLLIN | POLLERR | POLLHUP |
- POLLNVAL;
- nfds++;
-
- } else {
- /* Log message that we are
- * dropping the data */
- LOG_ER(
- "DTM: BRoadcastLEN-MISMATCH:
dropping the data");
- }
-
- } else if (fds[i].fd == dtms_cb->stream_sock) {
-
- int new_sd = -1;
- uint32_t local_rc = NCSCC_RC_SUCCESS;
- fd_check++;
+ ReceiveBcastOrMcast();
+ } else if (node == &stream_sock) {
/*******************************************************/
/* Listening descriptor is readable. */
/*******************************************************/
TRACE(
" DTM :Listening socket is
readable");
-
/*******************************************************/
- /* Accept all incoming connections that
- * are */
- /* queued up on the listening socket
- * before we */
- /* loop back and call poll again. */
-
/*******************************************************/
- /* do { */
-
/*****************************************************/
- /* Accept each incoming connection. If
- */
- /* accept fails with EWOULDBLOCK, then
- * we */
- /* have accepted all of them. Any other
- */
- /* failure on accept will cause us to
- * end the */
- /* serv. */
-
/*****************************************************/
- new_sd = dtm_process_accept(
- dtms_cb, dtms_cb->stream_sock);
- if (new_sd < 0) {
- LOG_ER("DTM: accept() failed");
- end_server = true;
- break;
- }
-
-
/*****************************************************/
- /* Node info data back to the accept
- * with node info */
-
/*****************************************************/
-
- local_rc = dtm_comm_socket_send(
- new_sd, node_info_hrd,
+ AcceptTcpConnections(
+ node_info_hrd,
node_info_buffer_len);
- if (local_rc != NCSCC_RC_SUCCESS) {
- dtm_comm_socket_close(&new_sd);
- LOG_ER("DTM: send() failed ");
- break;
- }
-
-
/*****************************************************/
- /* Add the new incoming connection to
- * the */
- /* pollfd structure */
-
/*****************************************************/
- TRACE(
- "DTM :add New incoming connection to fd
: %d\n",
- new_sd);
- fds[nfds].fd = new_sd;
- fds[nfds].events = POLLIN | POLLERR |
- POLLHUP | POLLNVAL;
- nfds++;
-
-
/*****************************************************/
- /* Loop back up and accept another
- * incoming */
- /* connection */
-
/*****************************************************/
- /* } while (new_sd != -1); */ /* accept
- one at
- a time
- */
-
- } else if (fds[i].fd == dtms_cb->mbx_fd) {
+ } else if (node == &mbx_fd) {
/* MBX fd messages that need to be sent
* out from this node */
/* Process the mailbox events */
- DTM_SND_MSG_ELEM *msg_elem = NULL;
-
- fd_check++;
- msg_elem =
- (DTM_SND_MSG_ELEM
- *)(m_NCS_IPC_NON_BLK_RECEIVE(
- &dtms_cb->mbx, NULL));
-
- if (NULL == msg_elem) {
- LOG_ER(
- "DTM: Inter Node Mailbox
IPC_NON_BLK_RECEIVE Failed");
- continue;
- } else if (DTM_MBX_ADD_DISTR_TYPE ==
- msg_elem->type) {
-
dtm_internode_add_to_svc_dist_list(
- msg_elem->info.svc_event
- .server_type,
- msg_elem->info.svc_event
- .server_inst,
- msg_elem->info.svc_event
- .pid);
- free(msg_elem);
- msg_elem = NULL;
- } else if (DTM_MBX_DEL_DISTR_TYPE ==
- msg_elem->type) {
-
dtm_internode_del_from_svc_dist_list(
- msg_elem->info.svc_event
- .server_type,
- msg_elem->info.svc_event
- .server_inst,
- msg_elem->info.svc_event
- .pid);
- free(msg_elem);
- msg_elem = NULL;
- } else if (DTM_MBX_DATA_MSG_TYPE ==
- msg_elem->type) {
- dtm_prepare_data_msg(
- msg_elem->info.data.buffer,
- msg_elem->info.data
- .buff_len);
- dtm_internode_snd_msg_to_node(
- msg_elem->info.data.buffer,
- msg_elem->info.data
- .buff_len,
- msg_elem->info.data
- .dst_nodeid);
- free(msg_elem);
- msg_elem = NULL;
- } else {
- LOG_ER(
- "DTM Intranode :Invalid evt type
from mbx");
- free(msg_elem);
- msg_elem = NULL;
- }
+ ReceiveFromMailbox();
} else {
/*********************************************************/
@@ -813,17 +622,13 @@ void node_discovery_process(void *arg)
/* existing connection must be readable
*/
/*********************************************************/
- fd_check++;
dtm_internode_process_poll_rcv_msg(
- fds[i].fd, &close_conn,
- node_info_hrd,
+ node, &close_conn, node_info_hrd,
node_info_buffer_len);
}
- } else if (fds[i].revents & POLLOUT) {
- fd_check++;
- dtm_internode_process_pollout(fds[i].fd);
- } else if (fds[i].revents & POLLHUP) {
- fd_check++;
+ } else if ((events[i].events & EPOLLOUT) != 0) {
+ dtm_internode_process_pollout(node);
+ } else if ((events[i].events & EPOLLHUP) != 0) {
close_conn = true;
}
@@ -834,51 +639,130 @@ void node_discovery_process(void *arg)
/* descriptor. */
/*******************************************************/
if (close_conn) {
- dtm_comm_socket_close(&fds[i].fd);
close_conn = false;
- compress_array = true;
- }
- /* End of existing connection is readable */
- if (poll_ret == fd_check) {
- break;
+ RemoveNodeFromEpoll(dtms_cb, node);
+ dtm_comm_socket_close(node);
}
}
-
- /***********************************************************/
- /* If the compress_array flag was turned on, we need */
- /* to squeeze together the array and decrement the number */
- /* of file descriptors. We do not need to move back the */
- /* events and revents fields because the events will always */
- /* be POLLIN in this case, and revents is output. */
- /***********************************************************/
-
- if (compress_array) {
- compress_array = false;
- for (i = 0; i < nfds; i++) {
- if (fds[i].fd == -1) {
- for (j = i; j < nfds; j++) {
- fds[j].fd = fds[j + 1].fd;
- }
- nfds--;
- }
- }
- }
-
- } while (end_server == false);
+ }
/* End of serving running. */
/*************************************************************/
/* Clean up all of the sockets that are open */
/*************************************************************/
done:
- for (i = 0; i < nfds; i++) {
- if (fds[i].fd >= 0)
- dtm_comm_socket_close(&fds[i].fd);
- }
TRACE_LEAVE();
return;
}
+static void ReceiveBcastOrMcast(void)
+{
+ DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
+ uint8_t inbuf[DTM_INTERNODE_RECV_BUFFER_SIZE];
+ ssize_t recd_bytes;
+ do {
+ recd_bytes =
+ dtm_dgram_recv_bmcast(dtms_cb, inbuf, sizeof(inbuf));
+ if (recd_bytes >= 2) {
+ uint8_t *data1 = inbuf;
+ uint16_t recd_buf_len = ncs_decode_16bit(&data1);
+ if (recd_buf_len == (size_t)recd_bytes) {
+ DTM_NODE_DB *new_node = dtm_process_connect(
+ dtms_cb, data1,
+ (recd_bytes - sizeof(uint16_t)));
+ if (new_node != NULL) {
+ // Add the new incoming connection to
+ // the pollfd structure
+ LOG_IN(
+ "DTM: add New incoming connection to fd
: %d\n",
+ new_node->comm_socket);
+ AddNodeToEpoll(dtms_cb, new_node);
+ }
+ } else {
+ // Log message that we are dropping the data
+ LOG_ER("DTM: BRoadcastLEN-MISMATCH %" PRIu16
+ "/%zd: dropping the data",
+ recd_buf_len, recd_bytes);
+ }
+ } else {
+ if (recd_bytes >= 0)
+ LOG_ER("DTM: recd bytes=%zd on DGRAM sock",
+ recd_bytes);
+ }
+ } while (recd_bytes >= 0);
+}
+
+static void AcceptTcpConnections(uint8_t *node_info_hrd,
+ int node_info_buffer_len)
+{
+ DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
+ DTM_NODE_DB *new_node;
+ while ((new_node = dtm_process_accept(dtms_cb, dtms_cb->stream_sock)) !=
+ NULL) {
+ if (dtm_comm_socket_send(new_node->comm_socket, node_info_hrd,
+ node_info_buffer_len) ==
+ NCSCC_RC_SUCCESS) {
+ TRACE("DTM: add New incoming connection to fd: %d",
+ new_node->comm_socket);
+ AddNodeToEpoll(dtms_cb, new_node);
+ } else {
+ dtm_comm_socket_close(new_node);
+ LOG_ER("DTM: send() failed");
+ }
+ }
+}
+
+static void ReceiveFromMailbox(void)
+{
+ DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
+ DTM_SND_MSG_ELEM *msg_elem;
+ while ((msg_elem = (DTM_SND_MSG_ELEM *)(m_NCS_IPC_NON_BLK_RECEIVE(
+ &dtms_cb->mbx, NULL))) != NULL) {
+ if (msg_elem->type == DTM_MBX_ADD_DISTR_TYPE) {
+ dtm_internode_add_to_svc_dist_list(
+ msg_elem->info.svc_event.server_type,
+ msg_elem->info.svc_event.server_inst,
+ msg_elem->info.svc_event.pid);
+ } else if (msg_elem->type == DTM_MBX_DEL_DISTR_TYPE) {
+ dtm_internode_del_from_svc_dist_list(
+ msg_elem->info.svc_event.server_type,
+ msg_elem->info.svc_event.server_inst,
+ msg_elem->info.svc_event.pid);
+ } else if (msg_elem->type == DTM_MBX_DATA_MSG_TYPE) {
+ dtm_prepare_data_msg(msg_elem->info.data.buffer,
+ msg_elem->info.data.buff_len);
+ dtm_internode_snd_msg_to_node(
+ msg_elem->info.data.buffer,
+ msg_elem->info.data.buff_len,
+ msg_elem->info.data.dst_nodeid);
+ } else {
+ LOG_ER("DTM Intranode :Invalid evt type from mbx");
+ }
+ free(msg_elem);
+ }
+}
+
+static void AddNodeToEpoll(DTM_INTERNODE_CB *dtms_cb, DTM_NODE_DB *node)
+{
+ struct epoll_event event = {EPOLLIN, {.ptr = node}};
+ if (epoll_ctl(dtms_cb->epoll_fd, EPOLL_CTL_ADD, node->comm_socket,
+ &event) != 0) {
+ LOG_ER("DTM: epoll_ctl(%d, EPOLL_CTL_ADD, %d) failed: %d",
+ dtms_cb->epoll_fd, node->comm_socket, errno);
+ exit(EXIT_FAILURE);
+ }
+}
+
+static void RemoveNodeFromEpoll(DTM_INTERNODE_CB *dtms_cb, DTM_NODE_DB *node)
+{
+ if (epoll_ctl(dtms_cb->epoll_fd, EPOLL_CTL_DEL, node->comm_socket,
+ NULL) != 0) {
+ LOG_ER("DTM: epoll_ctl(%d, EPOLL_CTL_DEL, %d) failed: %d",
+ dtms_cb->epoll_fd, node->comm_socket, errno);
+ exit(EXIT_FAILURE);
+ }
+}
+
/**
* Function to set poll fdlist
*
@@ -888,19 +772,17 @@ done:
* @return NCSCC_RC_FAILURE
*
*/
-uint32_t dtm_internode_set_poll_fdlist(int fd, uint16_t events)
+void dtm_internode_set_pollout(DTM_NODE_DB *node)
{
- int i = 0;
-
- for (i = 0; i < nfds; i++) {
- if (fd == fds[i].fd) {
- fds[i].events = fds[i].events | events;
- LOG_IN("event set success, in the poll fd list");
- return NCSCC_RC_SUCCESS;
- }
+ DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
+ struct epoll_event event = {EPOLLIN | EPOLLOUT, {.ptr = node}};
+ if (epoll_ctl(dtms_cb->epoll_fd, EPOLL_CTL_MOD, node->comm_socket,
+ &event) == 0) {
+ TRACE("event set success, in the poll fd list");
+ } else {
+ LOG_ER("DTM: epoll_ctl(%d, EPOLL_CTL_MOD, %d) failed: %d",
+ dtms_cb->epoll_fd, node->comm_socket, errno);
}
- LOG_ER("Unable to set the event in the poll list");
- return NCSCC_RC_FAILURE;
}
/**
@@ -912,16 +794,15 @@ uint32_t dtm_internode_set_poll_fdlist(int fd, uint16_t
events)
* @return NCSCC_RC_FAILURE
*
*/
-uint32_t dtm_internode_reset_poll_fdlist(int fd)
+void dtm_internode_clear_pollout(DTM_NODE_DB *node)
{
- int i = 0;
- for (i = 0; i < nfds; i++) {
- if (fd == fds[i].fd) {
- fds[i].events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
- LOG_IN("event set success, in the poll fd list");
- return NCSCC_RC_SUCCESS;
- }
+ DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
+ struct epoll_event event = {EPOLLIN, {.ptr = node}};
+ if (epoll_ctl(dtms_cb->epoll_fd, EPOLL_CTL_MOD, node->comm_socket,
+ &event) == 0) {
+ TRACE("event set success, in the poll fd list");
+ } else {
+ LOG_ER("DTM: epoll_ctl(%d, EPOLL_CTL_MOD, %d) failed: %d",
+ dtms_cb->epoll_fd, node->comm_socket, errno);
}
- LOG_ER("\nUnable to set the event in the poll list");
- return NCSCC_RC_FAILURE;
}
diff --git a/src/dtm/dtmnd/dtm_node.h b/src/dtm/dtmnd/dtm_node.h
index eb47e7b68..39b6d8459 100644
--- a/src/dtm/dtmnd/dtm_node.h
+++ b/src/dtm/dtmnd/dtm_node.h
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -17,6 +18,8 @@
#ifndef DTM_DTMND_DTM_NODE_H_
#define DTM_DTMND_DTM_NODE_H_
+#include <stdbool.h>
+
typedef void raw_type;
extern char *dtm_validate_listening_ip_addr(DTM_INTERNODE_CB *config);
@@ -31,19 +34,20 @@ extern uint32_t dtm_dgram_sendto_bcast(DTM_INTERNODE_CB
*dtms_cb,
extern uint32_t dtm_dgram_sendto_mcast(DTM_INTERNODE_CB *dtms_cb,
const void *buffer, int buffer_len);
extern uint32_t dtm_sockdesc_close(int sock_desc);
-extern int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char *node_ip,
- uint8_t *buffer, uint16_t len);
-extern int dtm_process_accept(DTM_INTERNODE_CB *dtms_cb, int stream_sock);
-extern int dtm_dgram_recvfrom_bmcast(DTM_INTERNODE_CB *dtms_cb, char *node_ip,
- void *buffer, int buffer_len);
+extern DTM_NODE_DB *dtm_process_connect(DTM_INTERNODE_CB *dtms_cb,
+ uint8_t *buffer, uint16_t len);
+extern DTM_NODE_DB *dtm_process_accept(DTM_INTERNODE_CB *dtms_cb,
+ int stream_sock);
+extern ssize_t dtm_dgram_recv_bmcast(DTM_INTERNODE_CB *dtms_cb, void *buffer,
+ int buffer_len);
extern uint32_t dtm_comm_socket_send(int sock_desc, const void *buffer,
int buffer_len);
-extern uint32_t dtm_comm_socket_close(int *comm_socket);
+extern void dtm_comm_socket_close(DTM_NODE_DB *node);
extern uint32_t dtm_process_node_up_down(NODE_ID node_id, char *node_name,
char *node_ip,
DTM_IP_ADDR_TYPE i_addr_family,
- uint8_t comm_status);
-uint32_t dtm_internode_set_poll_fdlist(int fd, uint16_t event);
-uint32_t dtm_internode_reset_poll_fdlist(int fd);
+ bool comm_status);
+extern void dtm_internode_set_pollout(DTM_NODE_DB *node);
+extern void dtm_internode_clear_pollout(DTM_NODE_DB *node);
#endif // DTM_DTMND_DTM_NODE_H_
diff --git a/src/dtm/dtmnd/dtm_node_db.c b/src/dtm/dtmnd/dtm_node_db.c
index 59278a52d..eb55110e6 100644
--- a/src/dtm/dtmnd/dtm_node_db.c
+++ b/src/dtm/dtmnd/dtm_node_db.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -15,8 +16,10 @@
*
*/
+#include <sys/epoll.h>
+#include <unistd.h>
#include "base/usrbuf.h"
-#include "dtm.h"
+#include "dtm/dtmnd/dtm.h"
/**
* Function to add new node
@@ -27,17 +30,14 @@
* @return NCSCC_RC_FAILURE
*
*/
-DTM_NODE_DB *dtm_node_new(DTM_NODE_DB *new_node)
+DTM_NODE_DB *dtm_node_new(const DTM_NODE_DB *new_node)
{
-
- DTM_NODE_DB *node = NULL;
-
TRACE_ENTER();
- node = calloc(1, sizeof(DTM_NODE_DB));
+ DTM_NODE_DB *node = malloc(sizeof(DTM_NODE_DB));
if (node == NULL) {
- LOG_ER("Calloc failed");
+ LOG_ER("malloc failed");
goto done;
}
@@ -63,30 +63,27 @@ done:
uint32_t dtm_cb_init(DTM_INTERNODE_CB *dtms_cb)
{
NCS_PATRICIA_PARAMS nodeid_param;
- NCS_PATRICIA_PARAMS comm_socket_param;
NCS_PATRICIA_PARAMS ipaddr_param;
TRACE_ENTER();
memset(&nodeid_param, 0, sizeof(NCS_PATRICIA_PARAMS));
- memset(&comm_socket_param, 0, sizeof(NCS_PATRICIA_PARAMS));
memset(&ipaddr_param, 0, sizeof(NCS_PATRICIA_PARAMS));
nodeid_param.key_size = sizeof(uint32_t);
- comm_socket_param.key_size = sizeof(uint32_t);
ipaddr_param.key_size = INET6_ADDRSTRLEN;
+ dtms_cb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (dtms_cb->epoll_fd < 0) {
+ LOG_ER("DTM: epoll_create() failed: %d", errno);
+ return NCSCC_RC_FAILURE;
+ }
+
/* Initialize patricia tree for nodeid list */
if (NCSCC_RC_SUCCESS !=
ncs_patricia_tree_init(&dtms_cb->nodeid_tree, &nodeid_param)) {
LOG_ER("DTM: ncs_patricia_tree_init FAILED");
- return NCSCC_RC_FAILURE;
- }
-
- /* Initialize comm_socket patricia tree */
- if (NCSCC_RC_SUCCESS != ncs_patricia_tree_init(&dtms_cb->comm_sock_tree,
- &comm_socket_param)) {
- LOG_ER("DTM:ncs_patricia_tree_init FAILED");
+ close(dtms_cb->epoll_fd);
return NCSCC_RC_FAILURE;
}
@@ -94,12 +91,14 @@ uint32_t dtm_cb_init(DTM_INTERNODE_CB *dtms_cb)
if (NCSCC_RC_SUCCESS !=
ncs_patricia_tree_init(&dtms_cb->ip_addr_tree, &ipaddr_param)) {
LOG_ER("DTM:ncs_patricia_tree_init FAILED");
+ close(dtms_cb->epoll_fd);
return NCSCC_RC_FAILURE;
}
if (m_NCS_IPC_CREATE(&dtms_cb->mbx) != NCSCC_RC_SUCCESS) {
/* Mail box creation failed */
LOG_ER("DTM:IPC create FAILED");
+ close(dtms_cb->epoll_fd);
return NCSCC_RC_FAILURE;
} else {
@@ -108,6 +107,7 @@ uint32_t dtm_cb_init(DTM_INTERNODE_CB *dtms_cb)
if (NCSCC_RC_SUCCESS != m_NCS_IPC_ATTACH(&dtms_cb->mbx)) {
m_NCS_IPC_RELEASE(&dtms_cb->mbx, NULL);
LOG_ER("DTM: Internode Mailbox Attach failed");
+ close(dtms_cb->epoll_fd);
return NCSCC_RC_FAILURE;
}
@@ -189,37 +189,6 @@ DTM_NODE_DB *dtm_node_getnext_by_id(uint32_t node_id)
}
/**
- * Retrieve node from node db by comm_socket
- *
- * @param comm_socket
- *
- * @return NCSCC_RC_SUCCESS
- * @return NCSCC_RC_FAILURE
- *
- */
-DTM_NODE_DB *dtm_node_get_by_comm_socket(uint32_t comm_socket)
-{
- DTM_NODE_DB *node = NULL;
- DTM_INTERNODE_CB *dtms_cb = dtms_gl_cb;
- TRACE_ENTER();
-
- node = (DTM_NODE_DB *)ncs_patricia_tree_get(&dtms_cb->comm_sock_tree,
- (uint8_t *)&comm_socket);
- if (node != (DTM_NODE_DB *)NULL) {
- /* Adjust the pointer */
- node =
- (DTM_NODE_DB *)(((char *)node) -
- (((char *)&(
- ((DTM_NODE_DB *)0)->pat_comm_socket)) -
- ((char *)((DTM_NODE_DB *)0))));
- TRACE("DTM:Node found %d", node->comm_socket);
- }
-
- TRACE_LEAVE();
- return node;
-}
-
-/**
* Adds the node to patricia tree
*
* @param node
@@ -255,20 +224,7 @@ uint32_t dtm_node_add(DTM_NODE_DB *node, int i)
}
break;
case 1:
- TRACE(
- "DTM:Adding comm_socket to the database with comm_socket :%u as
key",
- node->comm_socket);
- node->pat_comm_socket.key_info =
- (uint8_t *)&(node->comm_socket);
- rc = ncs_patricia_tree_add(&dtms_cb->comm_sock_tree,
- &node->pat_comm_socket);
- if (rc != NCSCC_RC_SUCCESS) {
- TRACE(
- "DTM:ncs_patricia_tree_add for comm_socket FAILED for
:%d :%u",
- node->comm_socket, rc);
- node->pat_comm_socket.key_info = NULL;
- goto done;
- }
+ osafassert(false);
break;
case 2:
@@ -333,19 +289,7 @@ uint32_t dtm_node_delete(DTM_NODE_DB *node, int i)
}
break;
case 1:
- if (node->comm_socket != 0 && node->pat_comm_socket.key_info) {
- TRACE(
- "DTM:Deleting comm_socket from the database with
comm_socket :%u as key",
- node->comm_socket);
- if ((rc = ncs_patricia_tree_del(
- &dtms_cb->comm_sock_tree,
- &node->pat_comm_socket)) != NCSCC_RC_SUCCESS) {
- TRACE(
- "DTM:ncs_patricia_tree_del FAILED for
comm_socket :%d rc :%u",
- node->comm_socket, rc);
- goto done;
- }
- }
+ osafassert(false);
break;
case 2:
diff --git a/src/dtm/dtmnd/dtm_node_sockets.c b/src/dtm/dtmnd/dtm_node_sockets.c
index 55b037488..99f499c40 100644
--- a/src/dtm/dtmnd/dtm_node_sockets.c
+++ b/src/dtm/dtmnd/dtm_node_sockets.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -15,12 +16,14 @@
*
*/
-#include <netdb.h>
-#include <netinet/tcp.h>
#include <arpa/inet.h>
-#include <sys/socket.h>
+#include <netdb.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdbool.h>
#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
#include "base/ncsencdec_pub.h"
#include "base/usrbuf.h"
#include "dtm.h"
@@ -89,7 +92,7 @@ static uint32_t set_keepalive(DTM_INTERNODE_CB *dtms_cb, int
sock_desc)
comm_keepalive_probes = dtms_cb->comm_keepalive_probes;
comm_user_timeout = dtms_cb->comm_user_timeout;
- if (so_keepalive == true) {
+ if (so_keepalive == 1) {
socklen_t optlen;
/* Set SO_KEEPALIVE */
@@ -386,19 +389,14 @@ static uint32_t dgram_set_mcast_ttl(DTM_INTERNODE_CB
*dtms_cb, int mcast_ttl)
* @return NCSCC_RC_FAILURE
*
*/
-uint32_t dtm_comm_socket_close(int *comm_socket)
+void dtm_comm_socket_close(DTM_NODE_DB *node)
{
-
- DTM_NODE_DB *node;
TRACE_ENTER();
- int rc = NCSCC_RC_SUCCESS;
int err = 0;
- node = dtm_node_get_by_comm_socket(*comm_socket);
-
if (node != NULL) {
TRACE("DTM: node deleting enty ");
- if (true == node->comm_status) {
+ if (node->comm_status == true) {
TRACE(
"DTM: dtm_comm_socket_close node_ip:%s, node_id:%u
i_addr_family:%d ",
node->node_ip, node->node_id, node->i_addr_family);
@@ -406,10 +404,7 @@ uint32_t dtm_comm_socket_close(int *comm_socket)
node->node_id, node->node_name, node->node_ip,
node->i_addr_family,
false) != NCSCC_RC_SUCCESS) {
- LOG_ER(
- " dtm_process_node_up_down() failed rc : %d
",
- rc);
- rc = NCSCC_RC_FAILURE;
+ LOG_ER("dtm_process_node_up_down() failed");
goto done;
}
}
@@ -418,30 +413,24 @@ uint32_t dtm_comm_socket_close(int *comm_socket)
LOG_ER("DTM :dtm_node_delete failed ");
}
- if (dtm_node_delete(node, 1) != NCSCC_RC_SUCCESS) {
- LOG_ER("DTM :dtm_node_delete failed");
- }
-
if (dtm_node_delete(node, 2) != NCSCC_RC_SUCCESS) {
LOG_ER("DTM :dtm_node_delete failed ");
}
+ if (close(node->comm_socket) != 0) {
+ err = errno;
+ LOG_ER("DTM : dtm_sockdesc_close err :%s ",
+ strerror(err));
+ goto done;
+ }
+
free(node);
} else
TRACE("DTM :comm_socket_not exist ");
- if (close(*comm_socket) != 0) {
- err = errno;
- LOG_ER("DTM : dtm_sockdesc_close err :%s ", strerror(err));
- rc = NCSCC_RC_FAILURE;
- goto done;
- }
- *comm_socket = -1;
-
done:
- TRACE_LEAVE2("rc :%d", rc);
- return rc;
+ TRACE_LEAVE();
}
/**
@@ -540,7 +529,7 @@ int comm_socket_setup_new(DTM_INTERNODE_CB *dtms_cb,
TRACE("DTM : family : %d, socktype : %d, protocol :%d", p->ai_family,
p->ai_socktype, p->ai_protocol);
/* Create socket for sending multicast datagrams */
- if ((sock_desc = socket(p->ai_family, p->ai_socktype,
+ if ((sock_desc = socket(p->ai_family, p->ai_socktype | SOCK_CLOEXEC,
p->ai_protocol)) == SOCKET_ERROR()) {
LOG_ER("DTM:Socket creation failed (socket()) err :%s",
strerror(errno));
@@ -552,7 +541,8 @@ int comm_socket_setup_new(DTM_INTERNODE_CB *dtms_cb,
sizeof(rcvbuf_size)) != 0)) {
LOG_ER("DTM:Socket rcv buf size set failed err :%s",
strerror(errno));
- dtm_comm_socket_close(&sock_desc);
+ close(sock_desc);
+ sock_desc = -1;
goto done;
}
@@ -561,7 +551,8 @@ int comm_socket_setup_new(DTM_INTERNODE_CB *dtms_cb,
sizeof(sndbuf_size)) != 0)) {
LOG_ER("DTM:Socket snd buf size set failed err :%s",
strerror(errno));
- dtm_comm_socket_close(&sock_desc);
+ close(sock_desc);
+ sock_desc = -1;
goto done;
}
@@ -570,13 +561,15 @@ int comm_socket_setup_new(DTM_INTERNODE_CB *dtms_cb,
sizeof(flag)) != 0) {
LOG_ER("DTM:Socket TCP_NODELAY set failed err :%s",
strerror(errno));
- dtm_comm_socket_close(&sock_desc);
+ close(sock_desc);
+ sock_desc = -1;
goto done;
}
if (NCSCC_RC_SUCCESS != set_keepalive(dtms_cb, sock_desc)) {
LOG_ER("DTM :set_keepalive failed ");
- dtm_comm_socket_close(&sock_desc);
+ close(sock_desc);
+ sock_desc = -1;
goto done;
}
@@ -585,13 +578,15 @@ int comm_socket_setup_new(DTM_INTERNODE_CB *dtms_cb,
err = errno;
LOG_ER("DTM :Connect failed (connect()) err :%s",
strerror(err));
- dtm_comm_socket_close(&sock_desc);
+ close(sock_desc);
+ sock_desc = -1;
+ goto done;
}
+done:
/* Free address structure(s) allocated by getaddrinfo() */
freeaddrinfo(addr_list);
-done:
TRACE_LEAVE2("sock_desc : %d", sock_desc);
return sock_desc;
}
@@ -707,8 +702,9 @@ uint32_t dtm_stream_nonblocking_listener(DTM_INTERNODE_CB
*dtms_cb)
TRACE("DTM :family : %d, socktype : %d, protocol :%d", p->ai_family,
p->ai_socktype, p->ai_protocol);
/* Create socket for sending multicast datagrams */
- if ((dtms_cb->stream_sock = socket(p->ai_family, p->ai_socktype,
- p->ai_protocol)) == SOCKET_ERROR()) {
+ if ((dtms_cb->stream_sock = socket(
+ p->ai_family, p->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ p->ai_protocol)) == SOCKET_ERROR()) {
LOG_ER("DTM:Socket creation failed (socket()) err :%s",
strerror(errno));
TRACE_LEAVE2("rc :%d", NCSCC_RC_FAILURE);
@@ -829,9 +825,9 @@ uint32_t dtm_dgram_mcast_listener(DTM_INTERNODE_CB *dtms_cb)
TRACE("DTM :family : %d, socktype : %d, protocol :%d", p->ai_family,
p->ai_socktype, p->ai_protocol);
/* Create socket for sending multicast datagrams */
- if ((dtms_cb->dgram_sock_rcvr =
- socket(p->ai_family, p->ai_socktype, p->ai_protocol)) ==
- SOCKET_ERROR()) {
+ if ((dtms_cb->dgram_sock_rcvr = socket(
+ p->ai_family, p->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ p->ai_protocol)) == SOCKET_ERROR()) {
LOG_ER("DTM:Socket creation failed (socket()) err :%s",
strerror(errno));
TRACE_LEAVE2("rc :%d", NCSCC_RC_FAILURE);
@@ -933,9 +929,9 @@ uint32_t dtm_dgram_mcast_sender(DTM_INTERNODE_CB *dtms_cb,
int mcast_ttl)
TRACE("DTM :family : %d, socktype : %d, protocol :%d", p->ai_family,
p->ai_socktype, p->ai_protocol);
/* Create socket for sending multicast datagrams */
- if ((dtms_cb->dgram_sock_sndr =
- socket(p->ai_family, p->ai_socktype, p->ai_protocol)) ==
- SOCKET_ERROR()) {
+ if ((dtms_cb->dgram_sock_sndr = socket(
+ p->ai_family, p->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ p->ai_protocol)) == SOCKET_ERROR()) {
LOG_ER("DTM:Socket creation failed (socket()) err :%s",
strerror(errno));
TRACE_LEAVE2("rc :%d", NCSCC_RC_FAILURE);
@@ -1072,7 +1068,8 @@ uint32_t dtm_dgram_bcast_listener(DTM_INTERNODE_CB
*dtms_cb)
ipstr);
}
if ((dtms_cb->dgram_sock_rcvr =
- socket(p->ai_family, p->ai_socktype,
+ socket(p->ai_family,
+ p->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
p->ai_protocol)) == SOCKET_ERROR()) {
LOG_ER("DTM:Socket creation failed (socket()) err :%s",
strerror(errno));
@@ -1217,7 +1214,8 @@ uint32_t dtm_dgram_bcast_sender(DTM_INTERNODE_CB *dtms_cb)
/* Create socket for sending/receiving datagrams */
dtms_cb->dgram_sock_sndr =
- socket(bcast_dest_address->sa_family, SOCK_DGRAM, IPPROTO_UDP);
+ socket(bcast_dest_address->sa_family,
+ SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
if (dtms_cb->dgram_sock_sndr == SOCKET_ERROR()) {
LOG_ER("DTM :socket create failederr :%s", strerror(errno));
TRACE_LEAVE2("rc :%d", NCSCC_RC_FAILURE);
@@ -1290,21 +1288,21 @@ uint32_t dtm_dgram_bcast_sender(DTM_INTERNODE_CB
*dtms_cb)
* @return NCSCC_RC_FAILURE
*
*/
-int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char *node_ip, uint8_t
*data,
- uint16_t len)
+DTM_NODE_DB *dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, uint8_t *data,
+ uint16_t len)
{
in_port_t foreign_port;
- int sock_desc = -1;
DTM_NODE_DB node = {0};
DTM_NODE_DB *new_node = NULL;
- uint8_t *buffer = data, mcast_flag;
+ uint8_t *buffer = data;
+ bool mcast_flag;
TRACE_ENTER();
memset(&node, 0, sizeof(DTM_NODE_DB));
/* Decode start */
- node.cluster_id = ncs_decode_32bit(&buffer);
+ node.cluster_id = ncs_decode_16bit(&buffer);
node.node_id = ncs_decode_32bit(&buffer);
if (dtms_cb->node_id == node.node_id) {
if (dtms_cb->mcast_flag != true) {
@@ -1316,8 +1314,8 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
"DTM: received the self node_id mcast message, dropping
message cluster_id: %d node_id: %u",
node.cluster_id, node.node_id);
}
- TRACE_LEAVE2("sock_desc :%d", sock_desc);
- return sock_desc;
+ TRACE_LEAVE();
+ return NULL;
}
/* Decode end */
@@ -1325,12 +1323,12 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
LOG_WA(
"DTM:cluster_id mis match dropping message cluster_id: %d,
node_id: %u",
node.cluster_id, node.node_id);
- TRACE_LEAVE2("sock_desc :%d", sock_desc);
- return sock_desc;
+ TRACE_LEAVE();
+ return NULL;
}
- mcast_flag = ncs_decode_8bit(&buffer);
- TRACE("mcast flag: %" PRIu8, mcast_flag);
+ mcast_flag = ncs_decode_8bit(&buffer) != 0;
+ TRACE("mcast flag: %d", mcast_flag ? 1 : 0);
/* foreign_port = htons((in_port_t)(ncs_decode_16bit(&buffer))); */
foreign_port = ((in_port_t)(ncs_decode_16bit(&buffer)));
@@ -1342,7 +1340,7 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
TRACE(
"DTM: received node_id is less than local node_id
dropping message cluster_id: %d node_id: %u",
node.cluster_id, node.node_id);
- return sock_desc;
+ return NULL;
}
}
@@ -1365,16 +1363,15 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char *node_ip, uint8_t *data,
"DTM:node duplicate discovered dropping
message cluster_id: %d, node_id :%u, node_ip:%s",
node.cluster_id, node.node_id,
node.node_ip);
- TRACE_LEAVE2("sock_desc :%d", sock_desc);
} else {
TRACE(
"DTM: discovery in progress dropping message
cluster_id: %d, node_id :%u, node_ip:%s",
node.cluster_id, node.node_id,
node.node_ip);
- TRACE_LEAVE2("sock_desc :%d", sock_desc);
}
- return sock_desc;
+ TRACE_LEAVE();
+ return NULL;
} else if ((new_node->comm_status == false) &&
((new_node->node_id != node.node_id) ||
(strncmp(node.node_ip, new_node->node_ip,
@@ -1385,9 +1382,6 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
if (dtm_node_delete(new_node, 0) != NCSCC_RC_SUCCESS) {
LOG_ER("DTM :dtm_node_delete failed (recv())");
}
- if (dtm_node_delete(new_node, 1) != NCSCC_RC_SUCCESS) {
- LOG_ER("DTM :dtm_node_delete failed (recv())");
- }
if (dtm_node_delete(new_node, 2) != NCSCC_RC_SUCCESS) {
LOG_ER("DTM :dtm_node_delete failed (recv())");
}
@@ -1399,12 +1393,11 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
if (new_node == NULL) {
LOG_ER(" dtm_node_new failed .node_ip : %s ", node.node_ip);
- sock_desc = -1;
- goto node_fail;
+ return NULL;
}
- sock_desc = comm_socket_setup_new(dtms_cb, (char *)&node.node_ip,
- foreign_port, node.i_addr_family);
+ int sock_desc = comm_socket_setup_new(dtms_cb, (char *)&node.node_ip,
+ foreign_port, node.i_addr_family);
new_node->comm_socket = sock_desc;
new_node->node_id = node.node_id;
@@ -1421,19 +1414,8 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
LOG_ER(
"DTM: dtm_node_add failed .node_ip: %s, node_id:
%u",
new_node->node_ip, new_node->node_id);
- dtm_comm_socket_close(&sock_desc);
- sock_desc = -1;
- free(new_node);
- goto node_fail;
- }
-
- if (dtm_node_add(new_node, 1) != NCSCC_RC_SUCCESS) {
- LOG_ER(
- "DTM: dtm_node_add failed .node_ip: %s, node_id:
%u",
- new_node->node_ip, new_node->node_id);
- dtm_comm_socket_close(&sock_desc);
+ dtm_comm_socket_close(new_node);
sock_desc = -1;
- free(new_node);
goto node_fail;
}
@@ -1441,9 +1423,8 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char *node_ip, uint8_t *data,
LOG_ER(
"DTM: dtm_node_add failed .node_ip: %s, node_id:
%u",
new_node->node_ip, new_node->node_id);
- dtm_comm_socket_close(&sock_desc);
+ dtm_comm_socket_close(new_node);
sock_desc = -1;
- free(new_node);
goto node_fail;
} else
TRACE("DTM: dtm_node_add add .node_ip: %s, node_id: %u",
@@ -1452,7 +1433,7 @@ int dtm_process_connect(DTM_INTERNODE_CB *dtms_cb, char
*node_ip, uint8_t *data,
node_fail:
TRACE_LEAVE2("sock_desc :%d", sock_desc);
- return sock_desc;
+ return new_node;
}
/**
@@ -1460,20 +1441,19 @@ node_fail:
*
* @param dtms_cb stream_sock
*
- * @return NCSCC_RC_SUCCESS
- * @return NCSCC_RC_FAILURE
+ * @return new node, or NULL if no more connections to available to be accepted
*
*/
-int dtm_process_accept(DTM_INTERNODE_CB *dtms_cb, int stream_sock)
+DTM_NODE_DB *dtm_process_accept(DTM_INTERNODE_CB *dtms_cb, int stream_sock)
{
struct sockaddr_storage clnt_addr; /* Client address */
/* Set length of client address structure (in-out parameter) */
socklen_t clnt_addrLen = sizeof(clnt_addr);
void *numericAddress = NULL; /* Pointer to binary address */
- char addrBuffer[INET6_ADDRSTRLEN] = { 0 };
- int err = 0;
+ char addrBuffer[INET6_ADDRSTRLEN] = {0};
DTM_NODE_DB node;
- DTM_NODE_DB *new_node;
+ DTM_NODE_DB *new_node = NULL;
+ int err;
int new_conn_sd, sndbuf_size = dtms_cb->sock_sndbuf_size,
rcvbuf_size = dtms_cb->sock_rcvbuf_size;
const struct sockaddr *clnt_addr1 = (struct sockaddr *)&clnt_addr;
@@ -1481,68 +1461,76 @@ int dtm_process_accept(DTM_INTERNODE_CB *dtms_cb, int
stream_sock)
memset(&node, 0, sizeof(DTM_NODE_DB));
- if ((new_conn_sd = accept(stream_sock, (struct sockaddr *)&clnt_addr,
- &clnt_addrLen)) < 0) {
-
- err = errno;
- LOG_ER("DTM:Accept failed (accept()) err :%s", strerror(err));
- new_conn_sd = -1;
- goto done;
- }
-
- if ((rcvbuf_size > 0) &&
- (setsockopt(new_conn_sd, SOL_SOCKET, SO_RCVBUF, &rcvbuf_size,
- sizeof(rcvbuf_size)) != 0)) {
- LOG_ER("DTM: Unable to set the SO_RCVBUF ");
- dtm_comm_socket_close(&new_conn_sd);
- goto done;
- }
- if ((sndbuf_size > 0) &&
- (setsockopt(new_conn_sd, SOL_SOCKET, SO_SNDBUF, &sndbuf_size,
- sizeof(sndbuf_size)) != 0)) {
- LOG_ER("DTM: Unable to set the SO_SNDBUF ");
- dtm_comm_socket_close(&new_conn_sd);
- goto done;
- }
+ for (;;) {
+ do {
+ new_conn_sd =
+ accept(stream_sock, (struct sockaddr *)&clnt_addr,
+ &clnt_addrLen);
+ err = errno;
+ } while (new_conn_sd < 0 &&
+ (err == EINTR || err == ECONNABORTED));
+ if (new_conn_sd < 0) {
+ if (err == EAGAIN || err == EWOULDBLOCK)
+ return NULL;
+ LOG_ER("DTM:Accept failed (accept()) err :%s",
+ strerror(err));
+ exit(EXIT_FAILURE);
+ }
- int flag = 1;
- if (setsockopt(new_conn_sd, IPPROTO_TCP, TCP_NODELAY, (void *)&flag,
- sizeof(flag)) != 0) {
- LOG_ER("DTM:Socket TCP_NODELAY set failed err :%s",
- strerror(errno));
- dtm_comm_socket_close(&new_conn_sd);
- goto done;
- }
+ if ((rcvbuf_size > 0) &&
+ (setsockopt(new_conn_sd, SOL_SOCKET, SO_RCVBUF,
+ &rcvbuf_size, sizeof(rcvbuf_size)) != 0)) {
+ LOG_ER("DTM: Unable to set the SO_RCVBUF ");
+ close(new_conn_sd);
+ continue;
+ }
+ if ((sndbuf_size > 0) &&
+ (setsockopt(new_conn_sd, SOL_SOCKET, SO_SNDBUF,
+ &sndbuf_size, sizeof(sndbuf_size)) != 0)) {
+ LOG_ER("DTM: Unable to set the SO_SNDBUF ");
+ close(new_conn_sd);
+ continue;
+ }
- if (NCSCC_RC_SUCCESS != set_keepalive(dtms_cb, new_conn_sd)) {
- LOG_ER("DTM:set_keepalive failed ");
- dtm_comm_socket_close(&new_conn_sd);
- goto done;
- }
+ int flag = 1;
+ if (setsockopt(new_conn_sd, IPPROTO_TCP, TCP_NODELAY,
+ (void *)&flag, sizeof(flag)) != 0) {
+ LOG_ER("DTM:Socket TCP_NODELAY set failed err :%s",
+ strerror(errno));
+ close(new_conn_sd);
+ continue;
+ }
- if (clnt_addr1->sa_family == AF_INET) {
- numericAddress = &((struct sockaddr_in *)clnt_addr1)->sin_addr;
- } else if (clnt_addr1->sa_family == AF_INET6) {
- numericAddress =
- &((struct sockaddr_in6 *)clnt_addr1)->sin6_addr;
- } else {
- LOG_ER("DTM: AF not supported=%d", clnt_addr1->sa_family);
- dtm_comm_socket_close(&new_conn_sd);
- osafassert(0);
- }
+ if (NCSCC_RC_SUCCESS != set_keepalive(dtms_cb, new_conn_sd)) {
+ LOG_ER("DTM:set_keepalive failed ");
+ close(new_conn_sd);
+ continue;
+ }
- /* Convert binary to printable address */
- if (inet_ntop(clnt_addr1->sa_family, numericAddress, addrBuffer,
- sizeof(addrBuffer)) == NULL) {
- LOG_ER("DTM: [invalid address]");
- dtm_comm_socket_close(&new_conn_sd);
- goto done;
- } else {
- memcpy(node.node_ip, (uint8_t *)addrBuffer, INET6_ADDRSTRLEN);
- node.i_addr_family = clnt_addr1->sa_family;
- }
+ if (clnt_addr1->sa_family == AF_INET) {
+ numericAddress =
+ &((struct sockaddr_in *)clnt_addr1)->sin_addr;
+ } else if (clnt_addr1->sa_family == AF_INET6) {
+ numericAddress =
+ &((struct sockaddr_in6 *)clnt_addr1)->sin6_addr;
+ } else {
+ LOG_ER("DTM: AF not supported=%d",
+ clnt_addr1->sa_family);
+ close(new_conn_sd);
+ continue;
+ }
- if (new_conn_sd != -1) {
+ /* Convert binary to printable address */
+ if (inet_ntop(clnt_addr1->sa_family, numericAddress, addrBuffer,
+ sizeof(addrBuffer)) == NULL) {
+ LOG_ER("DTM: [invalid address]");
+ close(new_conn_sd);
+ continue;
+ } else {
+ memcpy(node.node_ip, (uint8_t *)addrBuffer,
+ INET6_ADDRSTRLEN);
+ node.i_addr_family = clnt_addr1->sa_family;
+ }
node.cluster_id = dtms_cb->cluster_id;
node.comm_socket = new_conn_sd;
@@ -1553,32 +1541,22 @@ int dtm_process_accept(DTM_INTERNODE_CB *dtms_cb, int
stream_sock)
LOG_ER(
"DTM: dtm_node_new failed. node_ip: %s, node_id:
%u",
node.node_ip, node.node_id);
- dtm_comm_socket_close(&new_conn_sd);
- goto node_fail;
- }
-
- if (dtm_node_add(new_node, 1) != NCSCC_RC_SUCCESS) {
- LOG_ER(
- "DTM: dtm_node_add failed .node_ip: %s, node_id:
%u",
- new_node->node_ip, new_node->node_id);
- dtm_comm_socket_close(&new_conn_sd);
- goto node_fail;
+ close(new_conn_sd);
+ continue;
}
if (dtm_node_add(new_node, 2) != NCSCC_RC_SUCCESS) {
-
LOG_ER(
"DTM: dtm_node_add failed .node_ip: %s, node_id:
%u",
new_node->node_ip, new_node->node_id);
- dtm_comm_socket_close(&new_conn_sd);
- goto node_fail;
+ dtm_comm_socket_close(new_node);
+ continue;
}
+ break;
}
-done:
-node_fail:
TRACE_LEAVE2("DTM: new_conn_sd :%d", new_conn_sd);
- return new_conn_sd;
+ return new_node;
}
/**
@@ -1590,54 +1568,20 @@ node_fail:
* @return NCSCC_RC_FAILURE
*
*/
-int dtm_dgram_recvfrom_bmcast(DTM_INTERNODE_CB *dtms_cb, char *node_ip,
- void *buffer, int buffer_len)
+ssize_t dtm_dgram_recv_bmcast(DTM_INTERNODE_CB *dtms_cb, void *buffer,
+ int buffer_len)
{
- struct sockaddr_storage clnt_addr;
- socklen_t addrLen = sizeof(clnt_addr);
- int rtn;
TRACE_ENTER();
- memset(node_ip, 0, INET6_ADDRSTRLEN);
-
- if ((rtn = recvfrom(dtms_cb->dgram_sock_rcvr, (raw_type *)buffer,
- buffer_len, 0, (struct sockaddr *)&clnt_addr,
- (socklen_t *)&addrLen)) < 0) {
-
- LOG_ER("DTM:Receive failed (recvfrom()) err :%s",
- strerror(errno));
-
- } else {
- const struct sockaddr *clnt_addr1 =
- (struct sockaddr *)&clnt_addr;
- void *numericAddress = NULL; /* Pointer to binary address */
-
- if (clnt_addr1->sa_family == AF_INET) {
- numericAddress =
- &((struct sockaddr_in *)clnt_addr1)->sin_addr;
- }
-
- else if (clnt_addr1->sa_family == AF_INET6) {
-
- numericAddress =
- &((struct sockaddr_in6 *)clnt_addr1)->sin6_addr;
- } else {
- LOG_ER("DTM:AF not supported=%d",
- clnt_addr1->sa_family);
- osafassert(0);
- }
-
- /* Convert binary to printable address */
- char addrBuffer[INET6_ADDRSTRLEN];
- if (inet_ntop(clnt_addr1->sa_family, numericAddress, addrBuffer,
- sizeof(addrBuffer)) == NULL) {
- TRACE("DTM: invalid address :%s", addrBuffer);
- } else {
- memcpy(node_ip, (uint8_t *)addrBuffer,
- INET6_ADDRSTRLEN);
- }
+ ssize_t rtn;
+ do {
+ rtn = recv(dtms_cb->dgram_sock_rcvr, buffer, buffer_len,
+ MSG_DONTWAIT);
+ } while (rtn < 0 && errno == EINTR);
+ if (rtn < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ LOG_ER("DTM:Receive failed (recv()) err :%s", strerror(errno));
}
- TRACE_LEAVE2("rc :%d", rtn);
+ TRACE_LEAVE2("rc :%zd", rtn);
return rtn;
}
diff --git a/src/dtm/dtmnd/dtm_read_config.c b/src/dtm/dtmnd/dtm_read_config.c
index 3beaca07a..85968ccb5 100644
--- a/src/dtm/dtmnd/dtm_read_config.c
+++ b/src/dtm/dtmnd/dtm_read_config.c
@@ -1,6 +1,7 @@
/* -*- OpenSAF -*-
*
* (C) Copyright 2010 The OpenSAF Foundation
+ * Copyright Ericsson AB 2017 - All Rights Reserved.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -559,11 +560,12 @@ int dtm_read_config(DTM_INTERNODE_CB *config, char
*dtm_config_file)
tag_len = strlen("DTM_SOCK_SND_RCV_BUF_SIZE=");
uint32_t sndbuf_size = 0; /* Send buffer size */
uint32_t rcvbuf_size =
- 0; /* Receive buffer size */
- socklen_t optlen; /* Option length */
- int sockfd;
- sockfd = socket(AF_INET, SOCK_STREAM, 0);
- optlen = sizeof(rcvbuf_size);
+ 0; /* Receive buffer size */
+ socklen_t optlen = sizeof(rcvbuf_size);
+ int sockfd = socket(
+ AF_INET,
+ SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ 0);
getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF,
&rcvbuf_size, &optlen);
if ((rcvbuf_size < DTM_SOCK_SND_RCV_BUF_SIZE) &&
@@ -579,6 +581,8 @@ int dtm_read_config(DTM_INTERNODE_CB *config, char
*dtm_config_file)
optlen = sizeof(sndbuf_size);
getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF,
&sndbuf_size, &optlen);
+ if (sockfd >= 0)
+ close(sockfd);
if ((sndbuf_size < DTM_SOCK_SND_RCV_BUF_SIZE) &&
(atoi(&line[tag_len]) <
DTM_SOCK_SND_RCV_BUF_SIZE)) {