The init function of the library now takes a function as argument to
allow a consumer using the library to control the function to be called
when data is ready in a buffer.
The kconsumerd_on_read_subbuffer_mmap and
kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
to use them directly if needed.

Signed-off-by: Julien Desfossez <[email protected]>
---
 liblttkconsumerd/liblttkconsumerd.c |   99 ++++-------------------------------
 liblttkconsumerd/liblttkconsumerd.h |   26 ++++++++-
 ltt-kconsumerd/ltt-kconsumerd.c     |   82 ++++++++++++++++++++++++++++-
 3 files changed, 116 insertions(+), 91 deletions(-)

diff --git a/liblttkconsumerd/liblttkconsumerd.c 
b/liblttkconsumerd/liblttkconsumerd.c
index f60888a..9d8cb00 100644
--- a/liblttkconsumerd/liblttkconsumerd.c
+++ b/liblttkconsumerd/liblttkconsumerd.c
@@ -34,6 +34,8 @@
 #include "liblttkconsumerd.h"
 #include "lttngerr.h"
 
+static int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);
+
 static
 struct kconsumerd_global_data {
        /*
@@ -265,8 +267,8 @@ static int kconsumerd_update_poll_array(struct pollfd 
**pollfd,
  * mmap the ring buffer, read it and write the data to the tracefile.
  * Returns the number of bytes written
  */
-static int kconsumerd_on_read_subbuffer_mmap(
-               struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+               unsigned long len)
 {
        unsigned long mmap_len, mmap_offset, padded_len, padding_len;
        char *mmap_base;
@@ -378,8 +380,8 @@ end:
  * Splice the data from the ring buffer to the tracefile.
  * Returns the number of bytes spliced
  */
-static int kconsumerd_on_read_subbuffer(
-               struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+               unsigned long len)
 {
        long ret = 0;
        loff_t offset = 0;
@@ -469,87 +471,6 @@ end:
 }
 
 /*
- * kconsumerd_read_subbuffer
- *
- * Consume data on a file descriptor and write it on a trace file
- */
-static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
-{
-       unsigned long len;
-       int err;
-       long ret = 0;
-       int infd = kconsumerd_fd->consumerd_fd;
-
-       DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
-       /* Get the next subbuffer */
-       err = kernctl_get_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               perror("Reserving sub buffer failed (everything is normal, "
-                               "it is due to concurrency)");
-               goto end;
-       }
-
-       switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
-       case LTTNG_EVENT_SPLICE:
-               /* read the whole subbuffer */
-               err = kernctl_get_padded_subbuf_size(infd, &len);
-               if (err != 0) {
-                       ret = errno;
-                       perror("Getting sub-buffer len failed.");
-                       goto end;
-               }
-
-               /* splice the subbuffer to the tracefile */
-               ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
-               if (ret < 0) {
-                       /*
-                        * display the error but continue processing to try
-                        * to release the subbuffer
-                        */
-                       ERR("Error splicing to tracefile");
-               }
-               break;
-       case LTTNG_EVENT_MMAP:
-               /* read the used subbuffer size */
-               err = kernctl_get_subbuf_size(infd, &len);
-               if (err != 0) {
-                       ret = errno;
-                       perror("Getting sub-buffer len failed.");
-                       goto end;
-               }
-               /* write the subbuffer to the tracefile */
-               ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
-               if (ret < 0) {
-                       /*
-                        * display the error but continue processing to try
-                        * to release the subbuffer
-                        */
-                       ERR("Error writing to tracefile");
-               }
-               break;
-       default:
-               ERR("Unknown output method");
-               ret = -1;
-       }
-
-       err = kernctl_put_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               if (errno == EFAULT) {
-                       perror("Error in unreserving sub buffer\n");
-               } else if (errno == EIO) {
-                       /* Should never happen with newer LTTng versions */
-                       perror("Reader has been pushed by the writer, last 
sub-buffer corrupted.");
-               }
-               goto end;
-       }
-
-end:
-       return ret;
-}
-
-/*
  * kconsumerd_poll_socket
  *
  * Poll on the should_quit pipe and the command socket
@@ -795,7 +716,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                        case POLLPRI:
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               ret = 
kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+                               ret = on_buffer_ready(local_kconsumerd_fd[i]);
                                /* it's ok to have an unavailable sub-buffer */
                                if (ret == EAGAIN) {
                                        ret = 0;
@@ -818,7 +739,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                        for (i = 0; i < nb_fd; i++) {
                                if (pollfd[i].revents == POLLIN) {
                                        DBG("Normal read on fd %d", 
pollfd[i].fd);
-                                       ret = 
kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+                                       ret = 
on_buffer_ready(local_kconsumerd_fd[i]);
                                        /* it's ok to have an unavailable 
subbuffer */
                                        if (ret == EAGAIN) {
                                                ret = 0;
@@ -848,10 +769,12 @@ end:
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
  */
-int kconsumerd_init(void)
+int kconsumerd_init(int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
 {
        int ret;
 
+       on_buffer_ready = buffer_ready;
+
        /* need to update the polling array at init time */
        kconsumerd_data.need_update = 1;
 
diff --git a/liblttkconsumerd/liblttkconsumerd.h 
b/liblttkconsumerd/liblttkconsumerd.h
index 9e0b9ff..f98621f 100644
--- a/liblttkconsumerd/liblttkconsumerd.h
+++ b/liblttkconsumerd/liblttkconsumerd.h
@@ -58,13 +58,35 @@ struct kconsumerd_fd {
 };
 
 /*
- * kconsumerd_init(void)
+ * kconsumerd_init
  * initialise the necessary environnement :
  * - inform the polling thread to update the polling array
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ */
+int kconsumerd_init(int (*kbuffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
+
+/*
+ * kconsumerd_on_read_subbuffer_mmap
+ * mmap the ring buffer, read it and write the data to the tracefile.
+ * Returns the number of bytes written
+ */
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+                       unsigned long len);
+
+/*
+ * kconsumerd_on_read_subbuffer
+ *
+ * Splice the data from the ring buffer to the tracefile.
+ * Returns the number of bytes spliced
  */
-int kconsumerd_init(void);
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+                       unsigned long len);
 
 /*
  * kconsumerd_send_error
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
index 4180f89..61ed005 100644
--- a/ltt-kconsumerd/ltt-kconsumerd.c
+++ b/ltt-kconsumerd/ltt-kconsumerd.c
@@ -190,6 +190,86 @@ static void parse_args(int argc, char **argv)
        }
 }
 
+/*
+ * read_subbuffer
+ *
+ * Consume data on a file descriptor and write it on a trace file
+ */
+static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
+{
+       unsigned long len;
+       int err;
+       long ret = 0;
+       int infd = kconsumerd_fd->consumerd_fd;
+
+       DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
+       /* Get the next subbuffer */
+       err = kernctl_get_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               perror("Reserving sub buffer failed (everything is normal, "
+                               "it is due to concurrency)");
+               goto end;
+       }
+
+       switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
+               case LTTNG_EVENT_SPLICE:
+                       /* read the whole subbuffer */
+                       err = kernctl_get_padded_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+
+                       /* splice the subbuffer to the tracefile */
+                       ret = 
kconsumerd_on_read_subbuffer_splice(kconsumerd_fd, len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to 
try
+                                * to release the subbuffer
+                                */
+                               ERR("Error splicing to tracefile");
+                       }
+                       break;
+               case LTTNG_EVENT_MMAP:
+                       /* read the used subbuffer size */
+                       err = kernctl_get_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+                       /* write the subbuffer to the tracefile */
+                       ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, 
len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to 
try
+                                * to release the subbuffer
+                                */
+                               ERR("Error writing to tracefile");
+                       }
+                       break;
+               default:
+                       ERR("Unknown output method");
+                       ret = -1;
+       }
+
+       err = kernctl_put_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               if (errno == EFAULT) {
+                       perror("Error in unreserving sub buffer\n");
+               } else if (errno == EIO) {
+                       /* Should never happen with newer LTTng versions */
+                       perror("Reader has been pushed by the writer, last 
sub-buffer corrupted.");
+               }
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * main
@@ -228,7 +308,7 @@ int main(int argc, char **argv)
        }
 
        /* create the pipe to wake to receiving thread when needed */
-       ret = kconsumerd_init();
+       ret = kconsumerd_init(read_subbuffer);
        if (ret < 0) {
                goto end;
        }
-- 
1.7.4.1


_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to