On Fri, May 28, 2010 at 11:31:54AM -0700, Steven Dake wrote:
> Really good abstraction and good potential for reuse through other 
> components such as ipc.  I don't want to merge this patch at this time 
> because I really want to get to the point we have a stable 1.2.z.  More 
> likely 1.3 material.
> 
> I have made many comments inline that should be addressed for future 
> merging.

Thanks guys for reviewing! In the mean time I'll add this to libqb
and fix the issues you guys brought up.

> 
> Also, make distcheck fails which needs fixing.
> 
> I also believe Honza is correct relating to non-word size commits.
> 
> It would be interesting if the ring buffer could be created with a 
> backing store optionally for reuse in other components such as coroipc 
> (the async messaging case).
Ya, I'll be working on that soon.

> 
> Regards
> -steve
> 
> On 05/27/2010 09:46 PM, Angus Salkeld wrote:
> > I have pulled the ringbuffer out of logsys and corosync-fplay
> > into a single file ringbuffer.c. I was intending this to be for
> > libqb, but it might help simplify logsys.c whilst we are hunting
> > out our current bug.
> >
> > This seems to work well with the following testing:
> > - debug=on/off with cpgbench.
> > - killall -SIGSEGV corosync
> > - corosync-fplay
> >
> > Signed-off-by: Angus Salkeld<asalk...@redhat.com>
> > ---
> >   exec/Makefile.am          |    2 +-
> >   exec/logsys.c             |  189 +++---------------------
> >   exec/ringbuffer.c         |  369 
> > +++++++++++++++++++++++++++++++++++++++++++++
> >   include/corosync/cororb.h |  129 ++++++++++++++++
> >   tools/Makefile.am         |    3 +
> >   tools/corosync-fplay.c    |  122 ++++++---------
> >   6 files changed, 568 insertions(+), 246 deletions(-)
> >   create mode 100644 exec/ringbuffer.c
> >   create mode 100644 include/corosync/cororb.h
> >
> > diff --git a/exec/Makefile.am b/exec/Makefile.am
> > index f367f29..e43865d 100644
> > --- a/exec/Makefile.am
> > +++ b/exec/Makefile.am
> > @@ -42,7 +42,7 @@ if BUILD_RDMA
> >   TOTEM_SRC         += totemiba.c
> >   endif
> >
> > -LOGSYS_SRC         = wthread.c logsys.c
> > +LOGSYS_SRC         = wthread.c logsys.c ringbuffer.c
> >   COROIPCS_SRC              = coroipcs.c
> >
> >   LCRSO_SRC         = objdb.c vsf_ykd.c coroparse.c vsf_quorum.c
> > diff --git a/exec/logsys.c b/exec/logsys.c
> > index 7463798..c28ebed 100644
> > --- a/exec/logsys.c
> > +++ b/exec/logsys.c
> > @@ -40,6 +40,7 @@
> >   #include<stdint.h>
> >   #include<stdio.h>
> >   #include<ctype.h>
> > +#include<assert.h>
> >   #include<string.h>
> >   #include<stdarg.h>
> >   #include<sys/time.h>
> > @@ -64,6 +65,7 @@
> >   #include<semaphore.h>
> >
> >   #include<corosync/list.h>
> > +#include<corosync/cororb.h>
> >   #include<corosync/engine/logsys.h>
> >
> >   #define YIELD_AFTER_LOG_OPS 10
> > @@ -144,17 +146,7 @@ struct logsys_logger {
> >                                                for subsystems */
> >   };
> >
> > -
> > -/*
> > - * These are not static so they can be read from the core file
> > - */
> > -int *flt_data;
> > -
> > -uint32_t flt_head;
> > -
> > -uint32_t flt_tail;
> > -
> > -unsigned int flt_data_size;
> > +static cs_ringbuffer_t* rb = NULL;
> >
> >   #define COMBINE_BUFFER_SIZE 2048
> >
> > @@ -202,8 +194,6 @@ static pthread_spinlock_t logsys_wthread_spinlock;
> >   static pthread_mutex_t logsys_wthread_mutex = PTHREAD_MUTEX_INITIALIZER;
> >   #endif
> >
> > -static int logsys_buffer_full = 0;
> > -
> >   static char *format_buffer=NULL;
> >
> >   static int logsys_dropped_messages = 0;
> > @@ -298,47 +288,6 @@ static void dump_full_config(void)
> >   }
> >   #endif
> >
> > -static uint32_t circular_memory_map (void **buf, size_t bytes)
> > -{
> > -   void *addr_orig;
> > -   void *addr;
> > -
> > -   addr_orig = mmap (*buf, bytes<<  1, PROT_NONE,
> > -           MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
> > -
> > -   if (addr_orig == MAP_FAILED) {
> > -printf ("a\n");
> > -           return (-1);
> > -   }
> > -
> > -   addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
> > -           MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
> > -
> > -   if (addr != addr_orig) {
> > -printf ("b %d\n", errno);
> > -exit (1);
> > -           return (-1);
> > -   }
> > -#ifdef COROSYNC_BSD
> > -   madvise(addr_orig, bytes, MADV_NOSYNC);
> > -#endif
> > -
> > -   addr = mmap (((char *)addr_orig) + bytes,
> > -                  bytes, PROT_READ | PROT_WRITE,
> > -                  MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
> > -   if ((char *)addr != (char *)((char *)addr_orig + bytes)) {
> > -printf ("c %d\n", errno);
> > -exit (1);
> > -           return (-1);
> > -   }
> > -#ifdef COROSYNC_BSD
> > -   madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
> > -#endif
> > -
> > -   *buf = addr_orig;
> > -   return (0);
> > -}
> > -
> >   #if defined(HAVE_PTHREAD_SPIN_LOCK)
> >   static void logsys_flt_lock (void)
> >   {
> > @@ -380,62 +329,6 @@ static void logsys_wthread_unlock (void)
> >   #endif
> >
> >   /*
> > - * Before any write operation, a reclaim on the buffer area must be 
> > executed
> > - */
> > -static inline void records_reclaim (unsigned int idx, unsigned int words)
> > -{
> > -   unsigned int should_reclaim;
> > -
> > -   should_reclaim = 0;
> > -
> > -   if ((idx + words)>= flt_data_size) {
> > -           logsys_buffer_full = 1;
> > -   }
> > -   if (logsys_buffer_full == 0) {
> > -           return;
> > -   }
> > -
> > -   if (flt_tail>  flt_head) {
> > -           if (idx + words>= flt_tail) {
> > -                   should_reclaim = 1;
> > -           }
> > -   } else {
> > -           if ((idx + words)>= (flt_tail + flt_data_size)) {
> > -                   should_reclaim = 1;
> > -           }
> > -   }
> > -
> > -   if (should_reclaim) {
> > -           int words_needed = 0;
> > -
> > -           words_needed = words + 1;
> > -           do {
> > -                   unsigned int old_tail;
> > -
> > -                   words_needed -= flt_data[flt_tail];
> > -                   old_tail = flt_tail;
> > -                   flt_tail =
> > -                           (flt_tail +
> > -                           flt_data[flt_tail]) % (flt_data_size);
> > -           } while (words_needed>  0);
> > -   }
> > -}
> > -
> > -#define idx_word_step(idx)                                         \
> > -do {                                                                       
> > \
> > -   if (idx>  (flt_data_size - 1)) {                                \
> > -           idx = 0;                                                \
> > -   }                                                               \
> > -} while (0);
> > -
> > -#define idx_buffer_step(idx)                                               
> > \
> > -do {                                                                       
> > \
> > -   if (idx>  (flt_data_size - 1)) {                                \
> > -           idx = ((idx) % (flt_data_size));                        \
> > -   }                                                               \
> > -} while (0);
> > -
> > -/*
> >    * Internal threaded logging implementation
> >    */
> >   static inline int strcpy_cutoff (char *dest, const char *src, size_t 
> > cutoff,
> > @@ -1066,8 +959,6 @@ int _logsys_wthread_create (void)
> >
> >   int _logsys_rec_init (unsigned int fltsize)
> >   {
> > -   size_t flt_real_size;
> > -
> >     sem_init (&logsys_thread_start, 0, 0);
> >
> >     sem_init (&logsys_print_finished, 0, 0);
> > @@ -1077,33 +968,7 @@ int _logsys_rec_init (unsigned int fltsize)
> >     pthread_spin_init (&logsys_wthread_spinlock, 1);
> >   #endif
> >
> > -   /*
> > -    * XXX: kill me for 1.1 because I am a dirty hack
> > -    * temporary workaround that will be replaced by supporting
> > -    * 0 byte size flight recorder buffer.
> > -    * 0 byte size buffer will enable direct printing to logs
> > -    *   without flight recoder.
> > -    */
> > -   if (fltsize<  64000) {
> > -           fltsize = 64000;
> > -   }
> > -
> > -   flt_real_size = ROUNDUP(fltsize, sysconf(_SC_PAGESIZE));
> > -
> > -   circular_memory_map ((void **)&flt_data, flt_real_size);
> > -
> > -   memset (flt_data, 0, flt_real_size * 2);
> > -   /*
> > -    * flt_data_size tracks data by ints and not bytes/chars.
> > -    */
> > -
> > -   flt_data_size = flt_real_size / sizeof (uint32_t);
> > -   /*
> > -    * First record starts at zero
> > -    * Last record ends at zero
> > -    */
> > -   flt_head = 0;
> > -   flt_tail = 0;
> > +   rb = cs_rb_create (fltsize, CS_RB_OVERWRITE);
> >
> >     return (0);
> >   }
> > @@ -1139,9 +1004,8 @@ void _logsys_log_rec (
> >     unsigned int idx;
> >     unsigned int arguments = 0;
> >     unsigned int record_reclaim_size = 0;
> > -   unsigned int index_start;
> > -   int words_written;
> >     int subsysid;
> > +   int32_t *flt_data;
> >
> >     subsysid = LOGSYS_DECODE_SUBSYSID(rec_ident);
> >
> > @@ -1177,20 +1041,12 @@ void _logsys_log_rec (
> >             record_reclaim_size += ((buf_len[i] + 3)>>  2) + 1;
> >     }
> >
> > +   record_reclaim_size+= 4;
> 
> In your implementation, I dont' think records_reclaim_size += 4 is 
> necessary.  The + 4 in logsys is to store the record size.  Since the 
> chunked ring buffer takes care of managing the size recording, that is 
> probably not an issue.
> 
> >     logsys_flt_lock();
> > -   idx = flt_head;
> > -   index_start = idx;
> >
> > -   /*
> > -    * Reclaim data needed for record including 4 words for the header
> > -    */
> > -   records_reclaim (idx, record_reclaim_size + 4);
> > -
> > -   /*
> > -    * Write record size of zero and rest of header information
> > -    */
> > -   flt_data[idx++] = 0;
> > -   idx_word_step(idx);
> > +   flt_data = cs_rb_chunk_writable_alloc (rb, (record_reclaim_size * 
> > sizeof (uint32_t)));
> > +   assert(flt_data != NULL);
> > +   idx = 0;
> >
> >     flt_data[idx++] = rec_ident;
> >     idx_word_step(idx);
> 
> idx_word_step / idx_buffer_step is unnecessary and can be removed from 
> all of these operations.  The mmap feature of the ring buffer will 
> handle wrapping.
> 
> > @@ -1219,17 +1075,19 @@ void _logsys_log_rec (
> >             idx_buffer_step (idx);
> >
> >     }
> > -   words_written = idx - index_start;
> > -   if (words_written<  0) {
> > -           words_written += flt_data_size;
> > -   }
> > +
> >     /*
> >      * Commit the write of the record size now that the full record
> >      * is in the memory buffer
> >      */
> > -   flt_data[index_start] = words_written;
> > +   if (record_reclaim_size<  idx) {
> > +           printf ("record_reclaim_size:%d idx:%d commit_size:%d",
> > +                   record_reclaim_size, idx, (idx * sizeof(uint32_t)));
> > +           sleep(1);
> > +           assert(0);
> > +   }
> > +   cs_rb_chunk_writable_commit (rb, (idx * sizeof(uint32_t)));
> >
> > -   flt_head = idx;
> >     logsys_flt_unlock();
> >     records_written++;
> >   }
> > @@ -1617,28 +1475,17 @@ int logsys_log_rec_store (const char *filename)
> >   {
> >     int fd;
> >     ssize_t written_size;
> > -   size_t size_to_write = (flt_data_size + 2) * sizeof (unsigned int);
> >
> >     fd = open (filename, O_CREAT|O_RDWR, 0700);
> >     if (fd<  0) {
> >             return (-1);
> >     }
> >
> > -   written_size = write (fd,&flt_data_size, sizeof(unsigned int));
> > -   if ((written_size<  0) || (written_size != sizeof(unsigned int))) {
> > -           close (fd);
> > -           return (-1);
> > -   }
> > -
> > -   written_size = write (fd, flt_data, flt_data_size * sizeof (unsigned 
> > int));
> > -   written_size += write (fd,&flt_head, sizeof (uint32_t));
> > -   written_size += write (fd,&flt_tail, sizeof (uint32_t));
> > +   written_size = cs_rb_write_to_file (rb, fd);
> >     if (close (fd) != 0)
> >             return (-1);
> >     if (written_size<  0) {
> >             return (-1);
> > -   } else if ((size_t)written_size != size_to_write) {
> > -           return (-1);
> >     }
> >
> >     return (0);
> > diff --git a/exec/ringbuffer.c b/exec/ringbuffer.c
> > new file mode 100644
> > index 0000000..177279e
> > --- /dev/null
> > +++ b/exec/ringbuffer.c
> > @@ -0,0 +1,369 @@
> > +/*
> > + * Copyright (C) 2010 Red Hat, Inc.
> > + *
> > + * All rights reserved.
> > + *
> > + * Author: Angus Salkeld<asalk...@redhat.com>
> > + *
> > + * This software licensed under BSD license, the text of which follows:
> > + *
> > + * Redistribution and use in source and binary forms, with or without
> > + * modification, are permitted provided that the following conditions are 
> > met:
> > + *
> > + * - Redistributions of source code must retain the above copyright notice,
> > + *   this list of conditions and the following disclaimer.
> > + * - Redistributions in binary form must reproduce the above copyright 
> > notice,
> > + *   this list of conditions and the following disclaimer in the 
> > documentation
> > + *   and/or other materials provided with the distribution.
> > + * - Neither the name of Red Hat, Inc. nor the names of its
> > + *   contributors may be used to endorse or promote products derived from 
> > this
> > + *   software without specific prior written permission.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
> > IS"
> > + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
> > THE
> > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
> > PURPOSE
> > + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
> > + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> > + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
> > + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
> > + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> > + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
> > + * THE POSSIBILITY OF SUCH DAMAGE.
> > + */
> > +#include<config.h>
> > +#include<stdlib.h>
> > +#include<stdio.h>
> > +#include<errno.h>
> > +#include<stdint.h>
> > +#include<string.h>
> > +#include<assert.h>
> > +#include<unistd.h>
> > +#include<sys/mman.h>
> > +#include<corosync/cororb.h>
> > +#include "util.h"
> > +
> > +
> > +#define ROUNDUP(x, y) ((((x) + ((y) - 1)) / (y)) * (y))
> > +
> > +/* the chunk header is two words
> > + * 1) the chunk data size
> > + * 2) the magic number
> > + */
> > +#define RB_CHUNK_HEADER_WORDS 2
> > +#define RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * RB_CHUNK_HEADER_WORDS)
> > +#define RB_CHUNK_MAGIC 0xdeadbeef
> 
> little nitpicky, but this magic code is often used by programmers.  One 
> advantage of having a magic code is to help diagnose in an automated way 
> partial file data.  Not sure what a good magic number would be.
> 
> > +#define FDHEAD_INDEX               (rb->size)
> > +#define FDTAIL_INDEX               (rb->size + 1)
> > +
> > +
> 
> I'm not sure how this works.  The mmap operation will place the head and 
> tail not at the end of the data, but in the start of the new data, 
> potentially causing corruption?
> 
> In the logsys code, I removed FDHEAD_INDEX and FDTAIL_INDEX and instead 
> stored them in static globals.
> 
> > +static uint32_t circular_memory_map (void **buf, size_t bytes)
> > +{
> > +   void *addr_orig;
> > +   void *addr;
> > +
> > +   addr_orig = mmap (*buf, bytes<<  1, PROT_NONE,
> > +           MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
> > +
> > +   if (addr_orig == MAP_FAILED) {
> > +           return (-1);
> > +   }
> > +
> > +   addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
> > +           MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
> > +
> > +   if (addr != addr_orig) {
> > +           return (-1);
> > +   }
> > +#ifdef COROSYNC_BSD
> > +   madvise(addr_orig, bytes, MADV_NOSYNC);
> > +#endif
> > +
> > +   addr = mmap (((char *)addr_orig) + bytes,
> > +                  bytes, PROT_READ | PROT_WRITE,
> > +                  MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED, -1, 0);
> > +   if ((char *)addr != (char *)((char *)addr_orig + bytes)) {
> > +           return (-1);
> > +   }
> > +#ifdef COROSYNC_BSD
> > +   madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
> > +#endif
> > +
> > +   *buf = addr_orig;
> > +   return (0);
> > +}
> > +
> > +cs_ringbuffer_t* cs_rb_create (size_t size, cs_ringbuffer_type type)
> > +{
> > +   cs_ringbuffer_t* rb = malloc (sizeof (cs_ringbuffer_t));
> > +   size_t real_size = ROUNDUP(size, sysconf(_SC_PAGESIZE));
> > +
> > +
> > +   if (circular_memory_map ((void**)&rb->buf, real_size) != 0) {
> > +           return NULL;
> > +   }
> > +
> > +   memset (rb->buf, 0, real_size * 2);
> > +
> > +   rb->type = type;
> > +   /*
> > +    * rb->size tracks data by ints and not bytes/chars.
> > +    */
> > +   rb->size = real_size / sizeof (uint32_t);
> > +   /*
> > +    * First record starts at zero
> > +    * Last record ends at zero
> > +    */
> > +   rb->write_pt = 0;
> > +   rb->read_pt = 0;
> > +
> > +   return rb;
> > +}
> > +
> > +size_t cs_rb_space_free (cs_ringbuffer_t *rb)
> > +{
> > +   uint32_t write_size;
> > +   uint32_t read_size;
> > +   size_t space_free = 0;
> > +
> > +   write_size = rb->write_pt;
> > +   read_size = rb->read_pt;
> > +
> > +   if (write_size>  read_size) {
> > +           space_free = (read_size - write_size + rb->size) - 1;
> > +   }
> > +   else if (write_size<  read_size) {
> > +           space_free = (read_size - write_size) - 1;
> > +   }
> > +   else {
> > +           space_free = rb->size;
> > +   }
> > +   /* word ->  bytes */
> > +   return (space_free * sizeof (uint32_t));
> > +}
> > +
> > +size_t cs_rb_space_used (cs_ringbuffer_t *rb)
> > +{
> > +   uint32_t write_size;
> > +   uint32_t read_size;
> > +   size_t space_used;
> > +
> > +   write_size = rb->write_pt;
> > +   read_size = rb->read_pt;
> > +
> > +   if (write_size>  read_size) {
> > +           space_used = write_size - read_size;
> > +   } else
> > +   if (write_size<  read_size) {
> > +           space_used = (write_size - read_size + rb->size) - 1;
> > +   } else {
> > +           space_used = 0;
> > +   }
> > +   /* word ->  bytes */
> > +   return (space_used * sizeof (uint32_t));
> > +}
> > +
> > +void* cs_rb_chunk_writable_alloc (cs_ringbuffer_t *rb, size_t len)
> > +{
> > +   uint32_t idx;
> > +
> > +   /*
> > +    * Reclaim data if we are over writing and we need space
> > +    */
> > +   if (rb->type == CS_RB_OVERWRITE) {
> > +           while (cs_rb_space_free (rb)<  (len + RB_CHUNK_HEADER_SIZE)) {
> > +                   cs_rb_chunk_reclaim (rb);
> > +           }
> > +   } else {
> > +           if (cs_rb_space_free (rb)<  (len + RB_CHUNK_HEADER_SIZE)) {
> > +                   return NULL;
> > +           }
> > +   }
> > +   idx = rb->write_pt;
> > +   /*
> > +    * insert the chunk header
> > +    */
> > +   rb->buf[idx++] = len;
> > +   idx_word_step(idx);
> > +   rb->buf[idx++] = RB_CHUNK_MAGIC;
> > +   idx_word_step(idx);
> > +
> > +   /*
> > +    * return a pointer to the begining of the chunk data
> > +    */
> > +   return (void*)&rb->buf[idx];
> > +
> > +}
> > +
> > +void cs_rb_chunk_writable_commit (cs_ringbuffer_t *rb, size_t len)
> > +{
> > +   uint32_t idx = rb->write_pt;
> > +
> > +   /*
> > +    * skip over the chunk header
> > +    */
> > +   rb->buf[idx++] = len;
> > +   idx_word_step(idx);
> > +   rb->buf[idx++] = RB_CHUNK_MAGIC;
> > +   idx_word_step(idx);
> > +
> > +   /*
> > +    * skip over the user's chunk.
> > +    */
> > +   idx += (len / sizeof (uint32_t));
> 
> Honza is correct, this commit action won't work for len =3, len = 7, len 
> = 6, etc.  It should work for those.  I believe how this is handled in 
> logsys is something like
> 
> idx += ((len + 3) / sizeof (uint32_t));
> 
> 
> > +   idx_buffer_step (idx);
> > +
> > +   /*
> > +    * commit the write_pt
> > +    */
> > +   rb->write_pt = idx;
> > +}
> > +
> > +size_t cs_rb_chunk_write (cs_ringbuffer_t *rb, const void* data, size_t 
> > len)
> > +{
> > +   char *dest = cs_rb_chunk_writable_alloc (rb, len);
> > +
> > +   if (dest == NULL) {
> > +           return 0;
> > +   }
> > +
> > +   /*
> > +    * copy the data
> > +    */
> > +   memcpy (dest, data, len);
> > +
> > +   cs_rb_chunk_writable_commit (rb, len);
> > +
> > +   return len;
> > +}
> > +
> > +void cs_rb_chunk_reclaim (cs_ringbuffer_t *rb)
> > +{
> > +   int words_needed = 0;
> > +   uint32_t chunk_size = rb->buf[rb->read_pt];
> > +   uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
> > +
> > +   //printf ("%s: size:%d magic:%d\n", __func__, chunk_size, chunk_magic);
> > +   assert (chunk_magic == RB_CHUNK_MAGIC);
> > +   words_needed = (chunk_size + RB_CHUNK_HEADER_SIZE) / sizeof (uint32_t);
> > +
> 
> As Honza pointed out, this may need some adjustment to deal with 
> non-word size commits.
> 
> > +   rb->read_pt = (rb->read_pt + words_needed) % (rb->size);
> > +}
> > +
> > +size_t cs_rb_chunk_peek (cs_ringbuffer_t *rb, void **data_out)
> > +{
> > +   uint32_t chunk_size = rb->buf[rb->read_pt];
> > +   uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
> > +
> > +   *data_out =&rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS];
> > +
> > +   if (chunk_magic != RB_CHUNK_MAGIC) {
> > +           return 0;
> > +   } else {
> > +           return chunk_size;
> > +   }
> > +}
> > +
> > +size_t cs_rb_chunk_read (cs_ringbuffer_t *rb, void *data_out, size_t len)
> > +{
> > +   uint32_t chunk_size = rb->buf[rb->read_pt];
> > +   uint32_t chunk_magic = rb->buf[(rb->read_pt + 1) % rb->size];
> > +
> > +   if (cs_rb_space_used (rb) == 0) {
> > +           return 0;
> > +   }
> > +
> > +   if (chunk_magic != RB_CHUNK_MAGIC) {
> > +           return 0;
> > +   }
> > +
> > +   if (len<  chunk_size) {
> > +           return 0;
> > +   }
> > +
> > +   memcpy (data_out,&rb->buf[rb->read_pt + RB_CHUNK_HEADER_WORDS], 
> > chunk_size);
> > +
> > +   cs_rb_chunk_reclaim (rb);
> > +
> > +   return chunk_size;
> > +}
> > +
> > +static void print_header (cs_ringbuffer_t *rb)
> > +{
> > +   printf ("Ringbuffer: \n");
> > +   if (rb->type == CS_RB_OVERWRITE) {
> > +           printf (" ->OVERWRITE\n");
> > +   } else {
> > +           printf (" ->NORMAL\n");
> > +   }
> > +   printf (" ->write_pt [%d]\n", rb->write_pt);
> > +   printf (" ->read_pt [%d]\n", rb->read_pt);
> > +   printf (" ->size [%d words]\n", rb->size);
> > +
> > +   printf (" =>free [%d bytes]\n", cs_rb_space_free (rb));
> > +   printf (" =>used [%d bytes]\n", cs_rb_space_used (rb));
> > +}
> > +
> > +
> > +size_t cs_rb_write_to_file (cs_ringbuffer_t *rb, int fd)
> > +{
> > +   ssize_t written_size;
> > +
> > +   print_header (rb);
> > +
> > +   written_size = write (fd,&rb->size, sizeof (uint32_t));
> > +   if ((written_size<  0) || (written_size != sizeof (uint32_t))) {
> > +           return -1;
> > +   }
> > +
> > +   written_size = write (fd, rb->buf, rb->size * sizeof (unsigned int));
> > +   /*
> > +    * store the read&  write pointers
> > +    */
> > +   written_size += write (fd, (void*)&rb->write_pt, sizeof (uint32_t));
> > +   written_size += write (fd, (void*)&rb->read_pt, sizeof (uint32_t));
> > +
> > +   return written_size;
> > +}
> 
> I know this is a problem with the logsys code as well, but the write 
> operation can return -1 indicating error.  In this case, written_size 
> will be out of wack.
> 
> > +
> > +cs_ringbuffer_t *cs_rb_create_from_file (int fd, cs_ringbuffer_type type)
> > +{
> > +   ssize_t n_read;
> > +   size_t n_required;
> > +   cs_ringbuffer_t *rb = malloc (sizeof (cs_ringbuffer_t));
> > +
> > +   rb->type = type;
> > +
> > +   n_required = sizeof (unsigned int);
> > +   n_read = read (fd,&rb->size, n_required);
> > +   if (n_read != n_required) {
> > +           fprintf (stderr, "Unable to read fdata header\n");
> > +           return NULL;
> > +   }
> > +
> > +   n_required = ((rb->size + 2) * sizeof(unsigned int));
> 
> please use uint32_t in future code.
> 
> > +
> > +   if ((rb->buf = malloc (n_required)) == NULL) {
> > +           fprintf (stderr, "exhausted virtual memory\n");
> > +           return NULL;
> > +   }
> > +   n_read = read (fd, rb->buf, n_required);
> > +   if (n_read<  0) {
> > +           fprintf (stderr, "reading file failed: %s\n",
> > +                    strerror (errno));
> > +           return NULL;
> > +   }
> > +
> > +   if (n_read != n_required) {
> > +           printf ("Warning: read %lu bytes, but expected %lu\n",
> > +                   (unsigned long) n_read, (unsigned long) n_required);
> > +   }
> > +
> > +   rb->write_pt = rb->buf[FDHEAD_INDEX];
> > +   rb->read_pt = rb->buf[FDTAIL_INDEX];
> > +
> > +   print_header (rb);
> > +
> > +   return rb;
> > +}
> > +
> 
> This code doesn't execute a memory map operation.  Chunked reads or 
> writes near ending of ring buffer will result in segfault or data 
> corruption.
> 
> > diff --git a/include/corosync/cororb.h b/include/corosync/cororb.h
> > new file mode 100644
> > index 0000000..8cca162
> > --- /dev/null
> > +++ b/include/corosync/cororb.h
> > @@ -0,0 +1,129 @@
> > +/*
> > + * Copyright (C) 2010 Red Hat, Inc.
> > + *
> > + * All rights reserved.
> > + *
> > + * Author: Angus Salkeld<asalk...@redhat.com>
> > + *
> > + * This software licensed under BSD license, the text of which follows:
> > + *
> > + * Redistribution and use in source and binary forms, with or without
> > + * modification, are permitted provided that the following conditions are 
> > met:
> > + *
> > + * - Redistributions of source code must retain the above copyright notice,
> > + *   this list of conditions and the following disclaimer.
> > + * - Redistributions in binary form must reproduce the above copyright 
> > notice,
> > + *   this list of conditions and the following disclaimer in the 
> > documentation
> > + *   and/or other materials provided with the distribution.
> > + * - Neither the name of Red Hat, Inc. nor the names of its
> > + *   contributors may be used to endorse or promote products derived from 
> > this
> > + *   software without specific prior written permission.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
> > IS"
> > + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
> > THE
> > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
> > PURPOSE
> > + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
> > + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> > + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
> > + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
> > + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> > + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
> > + * THE POSSIBILITY OF SUCH DAMAGE.
> > + */
> > +#ifndef CS_RB_DEFINED
> > +#define CS_RB_DEFINED
> > +
> > +#include<sys/types.h>
> > +#include<stdint.h>
> > +
> > +typedef enum {
> > +   CS_RB_NORMAL,
> > +   CS_RB_OVERWRITE,
> > +} cs_ringbuffer_type;
> > +
> > +#define CS_RB_SHM_PATH_MAX 128
> > +
> 
> The proper define for this is PATH_MAX and comes from #include <limits.h>.
> 
> 
> > +typedef struct {
> > +   cs_ringbuffer_type   type;
> > +   volatile uint32_t  write_pt;
> > +   volatile uint32_t  read_pt;
> > +   uint32_t           size;
> > +   uint32_t          *buf;
> > +} cs_ringbuffer_t;
> > +
> > +
> 
> For proper data hiding this struct should be in the C file, not header 
> file.
> 
> > +
> > +#define idx_word_step(idx)                                         \
> > +do {                                                                       
> > \
> > +   if (idx>  (rb->size - 1)) {                             \
> > +           idx = 0;                                                \
> > +   }                                                               \
> > +} while (0);
> > +
> > +#define idx_buffer_step(idx)                                               
> > \
> > +do {                                                                       
> > \
> > +   if (idx>  (rb->size - 1)) {                             \
> > +           idx = ((idx) % (rb->size));                     \
> > +   }                                                               \
> > +} while (0);
> > +
> 
> The above defines are internal to the implementation - they should be in 
> the C file.
> 
> > +/**
> > + * initialize
> > + */
> > +cs_ringbuffer_t* cs_rb_create (size_t size, cs_ringbuffer_type type);
> > +
> > +
> > +/**
> > + * Try to add data to the buffer.
> > + * @return the amount of bytes actually buffered.
> > + */
> > +size_t cs_rb_chunk_write (cs_ringbuffer_t *rb, const void* data, size_t 
> > len);
> > +
> > +/**
> > + * write the header
> > + * @return pointer to chunk to write to, or NULL (if no space).
> > + */
> > +void* cs_rb_chunk_writable_alloc (cs_ringbuffer_t *rb, size_t len);
> > +
> > +/**
> > + * finalize the chunk.
> > + */
> > +void cs_rb_chunk_writable_commit (cs_ringbuffer_t *rb, size_t len);
> > +
> > +/**
> > + * read (without reclaiming) the last chunk.
> > + * @return the size of the chunk.
> > + */
> > +size_t cs_rb_chunk_peek (cs_ringbuffer_t *rb, void **data_out);
> > +
> > +/**
> > + * reclaim the last chunk.
> > + */
> > +void cs_rb_chunk_reclaim (cs_ringbuffer_t *rb);
> > +
> > +/**
> > + * This is the same as cs_rb_chunk_peek() memcpy() and 
> > cs_rb_chunk_reclaim().
> > + */
> > +size_t cs_rb_chunk_read (cs_ringbuffer_t *rb, void *data_out, size_t len);
> > +
> > +/**
> > + * The amount of data currently available.
> > + */
> > +size_t cs_rb_space_free (cs_ringbuffer_t *rb);
> > +
> > +/**
> > + * The total amount of data in the buffer.
> > + */
> > +size_t cs_rb_space_used (cs_ringbuffer_t *rb);
> > +
> > +
> > +/**
> > + * write the contents of the Ring Buffer to file.
> > + */
> > +size_t cs_rb_write_to_file (cs_ringbuffer_t *rb, int fd);
> > +
> > +cs_ringbuffer_t *cs_rb_create_from_file (int fd, cs_ringbuffer_type type);
> > +
> > +#endif /* CS_RB_DEFINED */
> > +
> > diff --git a/tools/Makefile.am b/tools/Makefile.am
> > index 3a52f4a..01dacbc 100644
> > --- a/tools/Makefile.am
> > +++ b/tools/Makefile.am
> > @@ -40,6 +40,9 @@ bin_SCRIPTS               = corosync-blackbox
> >
> >   EXTRA_DIST                = $(bin_SCRIPTS)
> >
> > +corosync_fplay_LDADD       = -llogsys
> 
> I would prefer a logsys_fplay library rather then using -llogsys.
> 
> > +corosync_fplay_LDFLAGS     = -L../exec
> > +
> >   corosync_pload_LDADD      = -lpload -lcoroipcc
> >   corosync_pload_LDFLAGS    = -L../lib
> >   corosync_objctl_LDADD     = -lconfdb ../lcr/liblcr.a -lcoroipcc
> > diff --git a/tools/corosync-fplay.c b/tools/corosync-fplay.c
> > index f6f3bae..a2f2891 100644
> > --- a/tools/corosync-fplay.c
> > +++ b/tools/corosync-fplay.c
> > @@ -1,3 +1,36 @@
> > +/*
> > + * Copyright (c) 2010 Red Hat, Inc.
> > + *
> > + * All rights reserved.
> > + *
> > + * Author: Steven Dake (sd...@redhat.com)
> > + *
> > + * This software licensed under BSD license, the text of which follows:
> > + *
> > + * Redistribution and use in source and binary forms, with or without
> > + * modification, are permitted provided that the following conditions are 
> > met:
> > + *
> > + * - Redistributions of source code must retain the above copyright notice,
> > + *   this list of conditions and the following disclaimer.
> > + * - Redistributions in binary form must reproduce the above copyright 
> > notice,
> > + *   this list of conditions and the following disclaimer in the 
> > documentation
> > + *   and/or other materials provided with the distribution.
> > + * - Neither the name of the MontaVista Software, Inc. nor the names of its
> > + *   contributors may be used to endorse or promote products derived from 
> > this
> > + *   software without specific prior written permission.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
> > IS"
> > + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
> > THE
> > + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
> > PURPOSE
> > + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
> > + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> > + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
> > + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
> > + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> > + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
> > + * THE POSSIBILITY OF SUCH DAMAGE.
> > + */
> >   #include<config.h>
> >
> >   #include<sys/types.h>
> > @@ -17,12 +50,7 @@
> >   #include<arpa/inet.h>
> >
> >   #include<corosync/engine/logsys.h>
> > -
> > -unsigned int flt_data_size;
> > -
> > -unsigned int *flt_data;
> > -#define FDHEAD_INDEX               (flt_data_size)
> > -#define FDTAIL_INDEX               (flt_data_size + 1)
> > +#include<corosync/cororb.h>
> >
> >   #define TOTEMIP_ADDRLEN (sizeof(struct in6_addr))
> >
> > @@ -341,34 +369,12 @@ static struct printer_subsys printer_subsystems[] = {
> >   static unsigned int printer_subsys_count =
> >     sizeof (printer_subsystems) / sizeof (struct printer_subsys);
> >
> > -static unsigned int g_record[10000];
> > -
> > -/*
> > - * Copy record, dealing with wrapping
> > - */
> > -static int logsys_rec_get (int rec_idx) {
> > -   unsigned int rec_size;
> > -   int firstcopy, secondcopy;
> > -
> > -   rec_size = flt_data[rec_idx];
> > -
> > -   firstcopy = rec_size;
> > -   secondcopy = 0;
> > -   if (firstcopy + rec_idx>  flt_data_size) {
> > -           firstcopy = flt_data_size - rec_idx;
> > -           secondcopy -= firstcopy - rec_size;
> > -   }
> > -   memcpy (&g_record[0],&flt_data[rec_idx], firstcopy<<2);
> > -   if (secondcopy) {
> > -           memcpy (&g_record[firstcopy],&flt_data[0], secondcopy<<2);
> > -   }
> > -   return ((rec_idx + rec_size) % flt_data_size);
> > -}
> > +#define G_RECORD_SIZE 10000
> > +static unsigned int g_record[G_RECORD_SIZE];
> >
> > -static void logsys_rec_print (const void *record)
> > +static void logsys_rec_print (const void *record, size_t rec_size)
> >   {
> >     const unsigned int *buf_uint32t = record;
> > -   unsigned int rec_size;
> >     unsigned int rec_ident;
> >     unsigned int level;
> >     unsigned int line;
> > @@ -382,16 +388,15 @@ static void logsys_rec_print (const void *record)
> >     const char *arguments[64];
> >     int arg_count = 0;
> >
> > -   rec_size = buf_uint32t[rec_idx];
> > -   rec_ident = buf_uint32t[rec_idx+1];
> > -   line = buf_uint32t[rec_idx+2];
> > -   record_number = buf_uint32t[rec_idx+3];
> > +   rec_ident = buf_uint32t[rec_idx];
> > +   line = buf_uint32t[rec_idx+1];
> > +   record_number = buf_uint32t[rec_idx+2];
> >
> >     level = LOGSYS_DECODE_LEVEL(rec_ident);
> >
> >     printf ("rec=[%d] ", record_number);
> > -   arg_size_idx = rec_idx + 4;
> > -   words_processed = 4;
> > +   arg_size_idx = rec_idx + 3;
> > +   words_processed = 3;
> >     for (i = 0; words_processed<  rec_size; i++) {
> >             arguments[arg_count++] =
> >               (const char *)&buf_uint32t[arg_size_idx + 1];
> > @@ -459,12 +464,10 @@ printf ("\n");
> >   int main (void)
> >   {
> >     int fd;
> > -   int rec_idx;
> > -   int end_rec;
> >     int record_count = 1;
> > -   ssize_t n_read;
> > +   size_t chunk_size;
> >     const char *data_file = LOCALSTATEDIR "/lib/corosync/fdata";
> > -   size_t n_required;
> > +   cs_ringbuffer_t *rb = NULL;
> >
> >     if ((fd = open (data_file, O_RDONLY))<  0) {
> >             fprintf (stderr, "failed to open %s: %s\n",
> > @@ -472,45 +475,16 @@ int main (void)
> >             return EXIT_FAILURE;
> >     }
> >
> > -   n_required = sizeof (unsigned int);
> > -   n_read = read (fd,&flt_data_size, n_required);
> > -   if (n_read != n_required) {
> > -           fprintf (stderr, "Unable to read fdata header\n");
> > -           return EXIT_FAILURE;
> > -   }
> > -
> > -   n_required = ((flt_data_size + 2) * sizeof(unsigned int));
> > +   rb = cs_rb_create_from_file (fd, CS_RB_OVERWRITE);
> >
> > -   if ((flt_data = malloc (n_required)) == NULL) {
> > -           fprintf (stderr, "exhausted virtual memory\n");
> > -           return EXIT_FAILURE;
> > -   }
> > -   n_read = read (fd, flt_data, n_required);
> >     close (fd);
> > -   if (n_read<  0) {
> > -           fprintf (stderr, "reading %s failed: %s\n",
> > -                    data_file, strerror (errno));
> > -           return EXIT_FAILURE;
> > -   }
> > -
> > -   if (n_read != n_required) {
> > -           printf ("Warning: read %lu bytes, but expected %lu\n",
> > -                   (unsigned long) n_read, (unsigned long) n_required);
> > -   }
> > -
> > -   rec_idx = flt_data[FDTAIL_INDEX];
> > -   end_rec = flt_data[FDHEAD_INDEX];
> > -
> > -   printf ("Starting replay: head [%d] tail [%d]\n",
> > -           flt_data[FDHEAD_INDEX],
> > -           flt_data[FDTAIL_INDEX]);
> >
> >     for (;;) {
> > -           rec_idx = logsys_rec_get (rec_idx);
> > -           logsys_rec_print (g_record);
> > -           if (rec_idx == end_rec) {
> > +           chunk_size = cs_rb_chunk_read (rb, g_record, G_RECORD_SIZE);
> > +           if (chunk_size == 0) {
> >                     break;
> >             }
> > +           logsys_rec_print (g_record, (chunk_size / sizeof (uint32_t)));
> >             record_count += 1;
> >     }
> >
> 
> _______________________________________________
> Openais mailing list
> Openais@lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais
_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to