The init function of the library now takes a function as argument to allow a consumer using the library to control the function to be called when data is ready in a buffer. The kconsumerd_on_read_subbuffer_mmap and kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer to use them directly if needed.
Signed-off-by: Julien Desfossez <[email protected]> --- liblttkconsumerd/liblttkconsumerd.c | 99 ++++------------------------------- liblttkconsumerd/liblttkconsumerd.h | 26 ++++++++- ltt-kconsumerd/ltt-kconsumerd.c | 82 ++++++++++++++++++++++++++++- 3 files changed, 116 insertions(+), 91 deletions(-) diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c index f60888a..9d8cb00 100644 --- a/liblttkconsumerd/liblttkconsumerd.c +++ b/liblttkconsumerd/liblttkconsumerd.c @@ -34,6 +34,8 @@ #include "liblttkconsumerd.h" #include "lttngerr.h" +static int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd); + static struct kconsumerd_global_data { /* @@ -265,8 +267,8 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, * mmap the ring buffer, read it and write the data to the tracefile. * Returns the number of bytes written */ -static int kconsumerd_on_read_subbuffer_mmap( - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd, + unsigned long len) { unsigned long mmap_len, mmap_offset, padded_len, padding_len; char *mmap_base; @@ -378,8 +380,8 @@ end: * Splice the data from the ring buffer to the tracefile. * Returns the number of bytes spliced */ -static int kconsumerd_on_read_subbuffer( - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd, + unsigned long len) { long ret = 0; loff_t offset = 0; @@ -469,87 +471,6 @@ end: } /* - * kconsumerd_read_subbuffer - * - * Consume data on a file descriptor and write it on a trace file - */ -static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) -{ - unsigned long len; - int err; - long ret = 0; - int infd = kconsumerd_fd->consumerd_fd; - - DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - ret = errno; - perror("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - goto end; - } - - switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - - /* splice the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile"); - } - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; - } - - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } - -end: - return ret; -} - -/* * kconsumerd_poll_socket * * Poll on the should_quit pipe and the command socket @@ -795,7 +716,7 @@ void *kconsumerd_thread_poll_fds(void *data) case POLLPRI: DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); + ret = on_buffer_ready(local_kconsumerd_fd[i]); /* it's ok to have an unavailable sub-buffer */ if (ret == EAGAIN) { ret = 0; @@ -818,7 +739,7 @@ void *kconsumerd_thread_poll_fds(void *data) for (i = 0; i < nb_fd; i++) { if (pollfd[i].revents == POLLIN) { DBG("Normal read on fd %d", pollfd[i].fd); - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); + ret = on_buffer_ready(local_kconsumerd_fd[i]); /* it's ok to have an unavailable subbuffer */ if (ret == EAGAIN) { ret = 0; @@ -848,10 +769,12 @@ end: * - create the poll_pipe * - create the should_quit pipe (for signal handler) */ -int kconsumerd_init(void) +int kconsumerd_init(int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd)) { int ret; + on_buffer_ready = buffer_ready; + /* need to update the polling array at init time */ kconsumerd_data.need_update = 1; diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h index 9e0b9ff..f98621f 100644 --- a/liblttkconsumerd/liblttkconsumerd.h +++ b/liblttkconsumerd/liblttkconsumerd.h @@ -58,13 +58,35 @@ struct kconsumerd_fd { }; /* - * kconsumerd_init(void) + * kconsumerd_init * initialise the necessary environnement : * - inform the polling thread to update the polling array * - create the poll_pipe * - create the should_quit pipe (for signal handler) + * + * Takes a function pointer as argument, this function is called when data is + * available on a buffer. This function is responsible to do the + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the + * buffer configuration and then kernctl_put_next_subbuf at the end. + */ +int kconsumerd_init(int (*kbuffer_ready)(struct kconsumerd_fd *kconsumerd_fd)); + +/* + * kconsumerd_on_read_subbuffer_mmap + * mmap the ring buffer, read it and write the data to the tracefile. + * Returns the number of bytes written + */ +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd, + unsigned long len); + +/* + * kconsumerd_on_read_subbuffer + * + * Splice the data from the ring buffer to the tracefile. + * Returns the number of bytes spliced */ -int kconsumerd_init(void); +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd, + unsigned long len); /* * kconsumerd_send_error diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index 4180f89..61ed005 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -190,6 +190,86 @@ static void parse_args(int argc, char **argv) } } +/* + * read_subbuffer + * + * Consume data on a file descriptor and write it on a trace file + */ +static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) +{ + unsigned long len; + int err; + long ret = 0; + int infd = kconsumerd_fd->consumerd_fd; + + DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { + case LTTNG_EVENT_SPLICE: + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer_splice(kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + break; + case LTTNG_EVENT_MMAP: + /* read the used subbuffer size */ + err = kernctl_get_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + break; + default: + ERR("Unknown output method"); + ret = -1; + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} /* * main @@ -228,7 +308,7 @@ int main(int argc, char **argv) } /* create the pipe to wake to receiving thread when needed */ - ret = kconsumerd_init(); + ret = kconsumerd_init(read_subbuffer); if (ret < 0) { goto end; } -- 1.7.4.1 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
