The user of the lib can now take control over a new FD or the update operation of an existing FD. Opening the output tracefile is now the responsiblity of the user and not the library itself.
Signed-off-by: Julien Desfossez <[email protected]> --- include/lttng/lttng-kconsumerd.h | 25 +++++++-- liblttngkconsumerd/lttngkconsumerd.c | 102 ++++++++++++++++++++-------------- ltt-kconsumerd/ltt-kconsumerd.c | 45 ++++++++++++++- 3 files changed, 124 insertions(+), 48 deletions(-) diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h index 7e195ab..a24a509 100644 --- a/include/lttng/lttng-kconsumerd.h +++ b/include/lttng/lttng-kconsumerd.h @@ -79,6 +79,21 @@ struct lttng_kconsumerd_fd { struct lttng_kconsumerd_local_data { /* function to call when data is available on a buffer */ int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd); + /* + * function to call when we receive a new fd, it receives a newly allocated + * kconsumerd_fd, if it returns the FD (as seen by the sessiond daemon : + * sessiond_fd), the FD will be handled by the lib in the local FD list, + * otherwise we assume the external consumer is taking care of it. + */ + int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd); + /* + * function to call when a FD is getting updated by the session daemon, + * this function receives the FD as seen by the session daemon + * (sessiond_fd) and the new state, if it returns the fd, it will be + * handled locally by the lib, otherwise we assume the consumer is taking + * care of it. + */ + int (*on_update_fd)(int sessiond_fd, uint32_t state); /* socket to communicate errors with sessiond */ int kconsumerd_error_socket; /* socket to exchange commands with sessiond */ @@ -98,15 +113,15 @@ struct lttng_kconsumerd_local_data { * - create the should_quit pipe (for signal handler) * - create the thread pipe (for splice) * - * Takes a function pointer as argument, this function is called when data is - * available on a buffer. This function is responsible to do the - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the - * buffer configuration and then kernctl_put_next_subbuf at the end. + * Takes the function pointers to the on_buffer_ready, on_recv_fd, and + * on_update_fd callbacks. * * Returns a pointer to the new context or NULL on error. */ extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)); + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)); /* * Close all fds associated with the instance and free the context. diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c index d36da9b..ed7951b 100644 --- a/liblttngkconsumerd/lttngkconsumerd.c +++ b/liblttngkconsumerd/lttngkconsumerd.c @@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) } /* - * Add a fd to the global list protected by a mutex. + * Create a struct lttcomm_kconsumerd_msg from the + * information received on the receiving socket */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( + struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { struct lttng_kconsumerd_fd *tmp_fd; - int ret = 0; - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + if (tmp_fd == NULL) { + perror("malloc struct lttng_kconsumerd_fd"); goto end; } - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; tmp_fd->consumerd_fd = consumerd_fd; tmp_fd->state = buf->state; @@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, tmp_fd->output = buf->output; strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); tmp_fd->path_name[PATH_MAX - 1] = '\0'; + DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", + tmp_fd->path_name, tmp_fd->sessiond_fd, + tmp_fd->consumerd_fd, tmp_fd->out_fd); - /* Opening the tracefile in write mode */ - if (tmp_fd->path_name != NULL) { - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - } +end: + return tmp_fd; +} - if (tmp_fd->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } +/* + * Add a fd to the global list protected by a mutex. + */ +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) +{ + int ret; - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); - if (tmp_fd->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); + if (ret == 1) { + goto end; } - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); kconsumerd_data.fds_count++; kconsumerd_data.need_update = 1; + end: pthread_mutex_unlock(&kconsumerd_data.lock); return ret; @@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd( int nb_fd; char recv_fd[CMSG_SPACE(sizeof(int))]; struct lttcomm_kconsumerd_msg lkm; + struct lttng_kconsumerd_fd *new_fd; /* the number of fds we are about to receive */ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); @@ -313,14 +302,40 @@ static int kconsumerd_consumerd_recv_fd( DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { + new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); + if (new_fd == NULL) { lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); goto end; } + + if (ctx->on_recv_fd != NULL) { + ret = ctx->on_recv_fd(new_fd); + /* + * if we receive the FD back, we insert it in the local + * FD list, otherwise we assume it is handled by the + * external consumer. + */ + if (ret == new_fd->sessiond_fd) { + kconsumerd_add_fd(new_fd); + } + } else { + kconsumerd_add_fd(new_fd); + } break; case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); + if (ctx->on_update_fd != NULL) { + ret = ctx->on_update_fd(lkm.fd, lkm.state); + /* + * if we receive the FD back, we have to handle it locally, + * otherwise we assume the external consumer is taking care + * of it. + */ + if (ret == lkm.fd) { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } + } else { + kconsumerd_change_fd_state(lkm.fd, lkm.state); + } break; default: break; @@ -756,7 +771,9 @@ end: * Returns a pointer to the new context or NULL on error. */ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), + int (*update_fd)(int sessiond_fd, uint32_t state)) { int ret; struct lttng_kconsumerd_local_data *ctx; @@ -767,7 +784,10 @@ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( goto end; } + /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; + ctx->on_recv_fd = recv_fd; + ctx->on_update_fd = update_fd; ret = pipe(ctx->kconsumerd_poll_pipe); if (ret < 0) { diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index cd4b00e..f2bb211 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -271,6 +271,47 @@ end: return ret; } +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (kconsumerd_fd->path_name != NULL) { + ret = open(kconsumerd_fd->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", kconsumerd_fd->path_name); + perror("open"); + goto error; + } + kconsumerd_fd->out_fd = ret; + } + + if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto error; + } + + kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len, + PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0); + if (kconsumerd_fd->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto error; + } + } + + /* we return the FD back to the lib to let it handle the FD internally */ + return kconsumerd_fd->sessiond_fd; + +error: + return ret; +} + /* * main */ @@ -297,8 +338,8 @@ int main(int argc, char **argv) snprintf(command_sock_path, PATH_MAX, KCONSUMERD_CMD_SOCK_PATH); } - /* create the pipe to wake to receiving thread when needed */ - ctx = lttng_kconsumerd_create(read_subbuffer); + /* create the consumer instance with and assign the callbacks */ + ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL); if (ctx == NULL) { goto error; } -- 1.7.4.1 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
