---
Notes:
v5: add this patch
No changes in mp_send and send_msg - just code move.
lib/librte_eal/common/eal_common_proc.c | 280 +++++++++++++++++---------------
1 file changed, 151 insertions(+), 129 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c
b/lib/librte_eal/common/eal_common_proc.c
index fe27d68..1ea6045 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -52,6 +52,7 @@ enum mp_type {
MP_MSG, /* Share message with peers, will not block */
MP_REQ, /* Request for information, Will block for a reply */
MP_REP, /* Response to previously-received request */
+ MP_IGN, /* Response telling requester to ignore this response */
};
struct mp_msg_internal {
@@ -205,6 +206,130 @@ rte_mp_action_unregister(const char *name)
free(entry);
}
+/**
+ * Return -1, as fail to send message and it's caused by the local side.
+ * Return 0, as fail to send message and it's caused by the remote side.
+ * Return 1, as succeed to send message.
+ *
+ */
+static int
+send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
+{
+ int snd;
+ struct iovec iov;
+ struct msghdr msgh;
+ struct cmsghdr *cmsg;
+ struct sockaddr_un dst;
+ struct mp_msg_internal m;
+ int fd_size = msg->num_fds * sizeof(int);
+ char control[CMSG_SPACE(fd_size)];
+
+ m.type = type;
+ memcpy(&m.msg, msg, sizeof(*msg));
+
+ memset(&dst, 0, sizeof(dst));
+ dst.sun_family = AF_UNIX;
+ snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path);
+
+ memset(&msgh, 0, sizeof(msgh));
+ memset(control, 0, sizeof(control));
+
+ iov.iov_base = &m;
+ iov.iov_len = sizeof(m) - sizeof(msg->fds);
+
+ msgh.msg_name = &dst;
+ msgh.msg_namelen = sizeof(dst);
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ msgh.msg_control = control;
+ msgh.msg_controllen = sizeof(control);
+
+ cmsg = CMSG_FIRSTHDR(&msgh);
+ cmsg->cmsg_len = CMSG_LEN(fd_size);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
+
+ do {
+ snd = sendmsg(mp_fd, &msgh, 0);
+ } while (snd < 0 && errno == EINTR);
+
+ if (snd < 0) {
+ rte_errno = errno;
+ /* Check if it caused by peer process exits */
+ if (errno == ECONNREFUSED &&
+ rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ unlink(dst_path);
+ return 0;
+ }
+ if (errno == ENOBUFS) {
+ RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
+ dst_path);
+ return 0;
+ }
+ RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
+ dst_path, strerror(errno));
+ return -1;
+ }
+
+ return 1;
+}
+
+static int
+mp_send(struct rte_mp_msg *msg, const char *peer, int type)
+{
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+
+ if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
+ peer = eal_mp_socket_path();
+
+ if (peer) {
+ if (send_msg(peer, msg, type) < 0)
+ return -1;
+ else
+ return 0;
+ }
+
+ /* broadcast to all secondary processes */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ return -1;
+ }
+
+ dir_fd = dirfd(mp_dir);
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ closedir(mp_dir);
+ return -1;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+ if (send_msg(path, msg, type) < 0)
+ ret = -1;
+ }
+ /* unlock the dir */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -260,12 +385,13 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un
*s)
RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
- if (m->type == MP_REP) {
+ if (m->type == MP_REP || m->type == MP_IGN) {
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
memcpy(sync_req->reply, msg, sizeof(*msg));
- sync_req->reply_received = 1;
+ /* -1 indicates that we've been asked to ignore */
+ sync_req->reply_received = m->type == MP_REP ? 1 : -1;
pthread_cond_signal(&sync_req->cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
@@ -279,10 +405,22 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un
*s)
action = entry->action;
pthread_mutex_unlock(&mp_mutex_action);
- if (!action)
- RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name);
- else if (action(msg, s->sun_path) < 0)
+ if (!action) {
+ if (m->type == MP_REQ && !internal_config.init_complete) {
+ /* if this is a request, and init is not yet complete,
+ * and callback wasn't registered, we should tell the
+ * requester to ignore our existence because we're not
+ * yet ready to process this request.
+ */
+ struct rte_mp_msg dummy = {0};
+ mp_send(&dummy, s->sun_path, MP_IGN);
+ } else {
+ RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
+ msg->name);
+ }
+ } else if (action(msg, s->sun_path) < 0) {
RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
+ }
}
static void *
@@ -419,130 +557,6 @@ rte_mp_channel_init(void)
return 0;
}
-/**
- * Return -1, as fail to send message and it's caused by the local side.
- * Return 0, as fail to send message and it's caused by the remote side.
- * Return 1, as succeed to send message.
- *
- */
-static int
-send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
-{
- int snd;
- struct iovec iov;
- struct msghdr msgh;
- struct cmsghdr *cmsg;
- struct sockaddr_un dst;
- struct mp_msg_internal m;
- int fd_size = msg->num_fds * sizeof(int);
- char control[CMSG_SPACE(fd_size)];
-
- m.type = type;
- memcpy(&m.msg, msg, sizeof(*msg));
-
- memset(&dst, 0, sizeof(dst));
- dst.sun_family = AF_UNIX;
- snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path);
-
- memset(&msgh, 0, sizeof(msgh));
- memset(control, 0, sizeof(control));
-
- iov.iov_base = &m;
- iov.iov_len = sizeof(m) - sizeof(msg->fds);
-
- msgh.msg_name = &dst;
- msgh.msg_namelen = sizeof(dst);
- msgh.msg_iov = &iov;
- msgh.msg_iovlen = 1;
- msgh.msg_control = control;
- msgh.msg_controllen = sizeof(control);
-
- cmsg = CMSG_FIRSTHDR(&msgh);
- cmsg->cmsg_len = CMSG_LEN(fd_size);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
-
- do {
- snd = sendmsg(mp_fd, &msgh, 0);
- } while (snd < 0 && errno == EINTR);
-
- if (snd < 0) {
- rte_errno = errno;
- /* Check if it caused by peer process exits */
- if (errno == ECONNREFUSED &&
- rte_eal_process_type() == RTE_PROC_PRIMARY) {
- unlink(dst_path);
- return 0;
- }
- if (errno == ENOBUFS) {
- RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
- dst_path);
- return 0;
- }
- RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
- dst_path, strerror(errno));
- return -1;
- }
-
- return 1;
-}
-
-static int
-mp_send(struct rte_mp_msg *msg, const char *peer, int type)
-{
- int dir_fd, ret = 0;
- DIR *mp_dir;
- struct dirent *ent;
-
- if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
- peer = eal_mp_socket_path();
-
- if (peer) {
- if (send_msg(peer, msg, type) < 0)
- return -1;
- else
- return 0;
- }
-
- /* broadcast to all secondary processes */
- mp_dir = opendir(mp_dir_path);
- if (!mp_dir) {
- RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
- mp_dir_path);
- rte_errno = errno;
- return -1;
- }
-
- dir_fd = dirfd(mp_dir);
- /* lock the directory to prevent processes spinning up while we send */
- if (flock(dir_fd, LOCK_EX)) {
- RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
- mp_dir_path);
- rte_errno = errno;
- closedir(mp_dir);
- return -1;
- }
-
- while ((ent = readdir(mp_dir))) {
- char path[PATH_MAX];
-
- if (fnmatch(mp_filter, ent->d_name, 0) != 0)
- continue;
-
- snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
- ent->d_name);
- if (send_msg(path, msg, type) < 0)
- ret = -1;
- }
- /* unlock the dir */
- flock(dir_fd, LOCK_UN);
-
- /* dir_fd automatically closed on closedir */
- closedir(mp_dir);
- return ret;
-}
-
static bool
check_input(const struct rte_mp_msg *msg)
{
@@ -631,6 +645,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
rte_errno = ETIMEDOUT;
return -1;
}
+ if (sync_req.reply_received == -1) {
+ RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
+ /* not receiving this message is not an error, so decrement
+ * number of sent messages
+ */
+ reply->nb_sent--;
+ return 0;
+ }
tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
if (!tmp) {