* Julien Desfossez ([email protected]) wrote: > The user of the lib can now take control over a new FD or the update > operation of an existing FD. > Opening the output tracefile is now the responsiblity of the user > and not the library itself.
Merged, with edit. Thanks, Mathieu > > Signed-off-by: Julien Desfossez <[email protected]> > --- > include/lttng/lttng-kconsumerd.h | 29 ++++++++-- > liblttngkconsumerd/lttngkconsumerd.c | 96 +++++++++++++++++++-------------- > ltt-kconsumerd/ltt-kconsumerd.c | 45 +++++++++++++++- > 3 files changed, 122 insertions(+), 48 deletions(-) > > diff --git a/include/lttng/lttng-kconsumerd.h > b/include/lttng/lttng-kconsumerd.h > index 7e195ab..edff0ba 100644 > --- a/include/lttng/lttng-kconsumerd.h > +++ b/include/lttng/lttng-kconsumerd.h > @@ -79,6 +79,25 @@ struct lttng_kconsumerd_fd { > struct lttng_kconsumerd_local_data { > /* function to call when data is available on a buffer */ > int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd); > + /* > + * function to call when we receive a new fd, it receives a newly > allocated > + * kconsumerd_fd, depending on the return code of this function, the > new FD > + * will be handled by the application or the library : > + * - > 0 (success, FD is kept by application) > + * - == 0 (success, FD is left to library) > + * - < 0 (error) > + */ > + int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd); > + /* > + * function to call when a FD is getting updated by the session daemon, > + * this function receives the FD as seen by the session daemon > + * (sessiond_fd) and the new state, depending on the return code of > this function > + * the update of state for the FD is handled by the application or the > library : > + * - > 0 (success, FD is kept by application) > + * - == 0 (success, FD is left to library) > + * - < 0 (error) > + */ > + int (*on_update_fd)(int sessiond_fd, uint32_t state); > /* socket to communicate errors with sessiond */ > int kconsumerd_error_socket; > /* socket to exchange commands with sessiond */ > @@ -98,15 +117,15 @@ struct lttng_kconsumerd_local_data { > * - create the should_quit pipe (for signal handler) > * - create the thread pipe (for splice) > * > - * Takes a function pointer as argument, this function is called when data is > - * available on a buffer. This function is responsible to do the > - * kernctl_get_next_subbuf, read the data with mmap or splice depending on > the > - * buffer configuration and then kernctl_put_next_subbuf at the end. > + * Takes the function pointers to the on_buffer_ready, on_recv_fd, and > + * on_update_fd callbacks. > * > * Returns a pointer to the new context or NULL on error. > */ > extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( > - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)); > + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), > + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), > + int (*update_fd)(int sessiond_fd, uint32_t state)); > > /* > * Close all fds associated with the instance and free the context. > diff --git a/liblttngkconsumerd/lttngkconsumerd.c > b/liblttngkconsumerd/lttngkconsumerd.c > index 69ef9a0..751cea1 100644 > --- a/liblttngkconsumerd/lttngkconsumerd.c > +++ b/liblttngkconsumerd/lttngkconsumerd.c > @@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct > lttng_kconsumerd_fd *lcf) > } > > /* > - * Add a fd to the global list protected by a mutex. > + * Create a struct lttcomm_kconsumerd_msg from the > + * information received on the receiving socket > */ > -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, > +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( > + struct lttcomm_kconsumerd_msg *buf, > int consumerd_fd) > { > struct lttng_kconsumerd_fd *tmp_fd; > - int ret = 0; > > - pthread_mutex_lock(&kconsumerd_data.lock); > - /* Check if already exist */ > - ret = kconsumerd_find_session_fd(buf->fd); > - if (ret == 1) { > + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); > + if (tmp_fd == NULL) { > + perror("malloc struct lttng_kconsumerd_fd"); > goto end; > } > > - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); > tmp_fd->sessiond_fd = buf->fd; > tmp_fd->consumerd_fd = consumerd_fd; > tmp_fd->state = buf->state; > @@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct > lttcomm_kconsumerd_msg *buf, > tmp_fd->output = buf->output; > strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); > tmp_fd->path_name[PATH_MAX - 1] = '\0'; > + DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", > + tmp_fd->path_name, tmp_fd->sessiond_fd, > + tmp_fd->consumerd_fd, tmp_fd->out_fd); > > - /* Opening the tracefile in write mode */ > - if (tmp_fd->path_name != NULL) { > - ret = open(tmp_fd->path_name, > - O_WRONLY|O_CREAT|O_TRUNC, > S_IRWXU|S_IRWXG|S_IRWXO); > - if (ret < 0) { > - ERR("Opening %s", tmp_fd->path_name); > - perror("open"); > - goto end; > - } > - tmp_fd->out_fd = ret; > - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, > - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, > tmp_fd->out_fd); > - } > +end: > + return tmp_fd; > +} > > - if (tmp_fd->output == LTTNG_EVENT_MMAP) { > - /* get the len of the mmap region */ > - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, > &tmp_fd->mmap_len); > - if (ret != 0) { > - ret = errno; > - perror("kernctl_get_mmap_len"); > - goto end; > - } > +/* > + * Add a fd to the global list protected by a mutex. > + */ > +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) > +{ > + int ret; > > - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, > - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, > 0); > - if (tmp_fd->mmap_base == MAP_FAILED) { > - perror("Error mmaping"); > - ret = -1; > - goto end; > - } > + pthread_mutex_lock(&kconsumerd_data.lock); > + /* Check if already exist */ > + ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); > + if (ret == 1) { > + goto end; > } > - > cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); > kconsumerd_data.fds_count++; > kconsumerd_data.need_update = 1; > + > end: > pthread_mutex_unlock(&kconsumerd_data.lock); > return ret; > @@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd( > int nb_fd; > char recv_fd[CMSG_SPACE(sizeof(int))]; > struct lttcomm_kconsumerd_msg lkm; > + struct lttng_kconsumerd_fd *new_fd; > > /* the number of fds we are about to receive */ > nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); > @@ -313,14 +302,34 @@ static int kconsumerd_consumerd_recv_fd( > DBG("kconsumerd_add_fd %s (%d)", > lkm.path_name, > ((int *) > CMSG_DATA(cmsg))[0]); > > - ret = kconsumerd_add_fd(&lkm, ((int *) > CMSG_DATA(cmsg))[0]); > - if (ret < 0) { > + new_fd = kconsumerd_allocate_fd(&lkm, > ((int *) CMSG_DATA(cmsg))[0]); > + if (new_fd == NULL) { > > lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); > goto end; > } > + > + if (ctx->on_recv_fd != NULL) { > + ret = ctx->on_recv_fd(new_fd); > + if (ret == 0) { > + > kconsumerd_add_fd(new_fd); > + } else if (ret < 0) { > + goto end; > + } > + } else { > + kconsumerd_add_fd(new_fd); > + } > break; > case UPDATE_STREAM: > - kconsumerd_change_fd_state(lkm.fd, > lkm.state); > + if (ctx->on_update_fd != NULL) { > + ret = ctx->on_update_fd(lkm.fd, > lkm.state); > + if (ret == 0) { > + > kconsumerd_change_fd_state(lkm.fd, lkm.state); > + } else if (ret < 0) { > + goto end; > + } > + } else { > + > kconsumerd_change_fd_state(lkm.fd, lkm.state); > + } > break; > default: > break; > @@ -754,7 +763,9 @@ end: > * Returns a pointer to the new context or NULL on error. > */ > struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( > - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) > + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), > + int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), > + int (*update_fd)(int sessiond_fd, uint32_t state)) > { > int ret; > struct lttng_kconsumerd_local_data *ctx; > @@ -765,7 +776,10 @@ struct lttng_kconsumerd_local_data > *lttng_kconsumerd_create( > goto end; > } > > + /* assign the callbacks */ > ctx->on_buffer_ready = buffer_ready; > + ctx->on_recv_fd = recv_fd; > + ctx->on_update_fd = update_fd; > > ret = pipe(ctx->kconsumerd_poll_pipe); > if (ret < 0) { > diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c > index ca93965..84ea95d 100644 > --- a/ltt-kconsumerd/ltt-kconsumerd.c > +++ b/ltt-kconsumerd/ltt-kconsumerd.c > @@ -277,6 +277,47 @@ end: > return ret; > } > > +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd) > +{ > + int ret; > + > + /* Opening the tracefile in write mode */ > + if (kconsumerd_fd->path_name != NULL) { > + ret = open(kconsumerd_fd->path_name, > + O_WRONLY|O_CREAT|O_TRUNC, > S_IRWXU|S_IRWXG|S_IRWXO); > + if (ret < 0) { > + ERR("Opening %s", kconsumerd_fd->path_name); > + perror("open"); > + goto error; > + } > + kconsumerd_fd->out_fd = ret; > + } > + > + if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) { > + /* get the len of the mmap region */ > + ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, > &kconsumerd_fd->mmap_len); > + if (ret != 0) { > + ret = errno; > + perror("kernctl_get_mmap_len"); > + goto error; > + } > + > + kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len, > + PROT_READ, MAP_PRIVATE, > kconsumerd_fd->consumerd_fd, 0); > + if (kconsumerd_fd->mmap_base == MAP_FAILED) { > + perror("Error mmaping"); > + ret = -1; > + goto error; > + } > + } > + > + /* we return 0 to let the library handle the FD internally */ > + return 0; > + > +error: > + return ret; > +} > + > /* > * main > */ > @@ -303,8 +344,8 @@ int main(int argc, char **argv) > snprintf(command_sock_path, PATH_MAX, > KCONSUMERD_CMD_SOCK_PATH); > } > - /* create the pipe to wake to receiving thread when needed */ > - ctx = lttng_kconsumerd_create(read_subbuffer); > + /* create the consumer instance with and assign the callbacks */ > + ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL); > if (ctx == NULL) { > goto error; > } > -- > 1.7.4.1 > -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
