On 06/27/2011 08:32 AM, Jan Kiszka wrote:
> On 2011-06-24 21:58, Gilles Chanteperdrix wrote:
>> On 06/23/2011 01:36 PM, Jan Kiszka wrote:
>>> On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
>>>> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>>>>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>>>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>>>>> +      if (xeno_get_current() != XN_NO_HANDLE &&
>>>>>>>> +          !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>>>>> +              struct print_buffer *old;
>>>>>>>> +
>>>>>>>> +              old = buf_pool_get();
>>>>>>>> +              while (old != buffer) {
>>>>>>>> +                      buffer = old;
>>>>>>>> +                      old = buf_pool_cmpxchg(buffer, 
>>>>>>>> buffer->pool_next);
>>>>>>>
>>>>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>>>>
>>>>>> I do not get it: it seems to me that if the current head (that is
>>>>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>>>>> and try again.
>>>>>
>>>>> Problematic is the dereferencing of the stale buffer pointer obtained
>>>>> during the last cmpxchg or via buf_pool_get. This happens before the new
>>>>> cmpxchg.
>>>>
>>>> Ok. Got it, that would be a problem only if the stale pointer pointed to
>>>> an unmapped area, but ok, better avoid freeing the buffers. I guess it
>>>> would not be a problem as applications tend to have a fixed number of
>>>> threads anyway.
>>>
>>> That is a risky assumption, proven wrong e.g. by one of our
>>> applications. Threads may always be created or destroyed depending on
>>> the mode of an application, specifically if it is a larger one.
>>
>> Here is another attempt, based on a bitmap.
>>
>> diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
>> index 93b711a..fb4820f 100644
>> --- a/src/rtdk/rt_print.c
>> +++ b/src/rtdk/rt_print.c
>> @@ -28,7 +28,9 @@
>>  #include <syslog.h>
>>  
>>  #include <rtdk.h>
>> +#include <nucleus/types.h>  /* For BITS_PER_LONG */
>>  #include <asm/xenomai/system.h>
>> +#include <asm/xenomai/atomic.h>     /* For atomic_cmpxchg */
>>  #include <asm-generic/stack.h>
>>  
>>  #define RT_PRINT_BUFFER_ENV         "RT_PRINT_BUFFER"
>> @@ -37,6 +39,9 @@
>>  #define RT_PRINT_PERIOD_ENV         "RT_PRINT_PERIOD"
>>  #define RT_PRINT_DEFAULT_PERIOD             100 /* ms */
>>  
>> +#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
>> +#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
>> +
>>  #define RT_PRINT_LINE_BREAK         256
>>  
>>  #define RT_PRINT_SYSLOG_STREAM              NULL
>> @@ -75,6 +80,12 @@ static pthread_mutex_t buffer_lock;
>>  static pthread_cond_t printer_wakeup;
>>  static pthread_key_t buffer_key;
>>  static pthread_t printer_thread;
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +static xnarch_atomic_t *pool_bitmap;
>> +static unsigned pool_bitmap_len;
>> +static unsigned pool_buf_size;
>> +static unsigned long pool_start, pool_len;
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>>  
>>  static void cleanup_buffer(struct print_buffer *buffer);
>>  static void print_buffers(void);
>> @@ -243,10 +254,36 @@ static void set_buffer_name(struct print_buffer 
>> *buffer, const char *name)
>>      }
>>  }
>>  
>> +void rt_print_init_inner(struct print_buffer *buffer, size_t size)
>> +{
>> +    buffer->size = size;
>> +
>> +    memset(buffer->ring, 0, size);
>> +
>> +    buffer->read_pos  = 0;
>> +    buffer->write_pos = 0;
>> +
>> +    buffer->prev = NULL;
>> +
>> +    pthread_mutex_lock(&buffer_lock);
>> +
>> +    buffer->next = first_buffer;
>> +    if (first_buffer)
>> +            first_buffer->prev = buffer;
>> +    first_buffer = buffer;
>> +
>> +    buffers++;
>> +    pthread_cond_signal(&printer_wakeup);
>> +
>> +    pthread_mutex_unlock(&buffer_lock);
>> +}
>> +
>>  int rt_print_init(size_t buffer_size, const char *buffer_name)
>>  {
>>      struct print_buffer *buffer = pthread_getspecific(buffer_key);
>>      size_t size = buffer_size;
>> +    unsigned long old_bitmap;
>> +    unsigned j;
>>  
>>      if (!size)
>>              size = default_buffer_size;
>> @@ -260,39 +297,56 @@ int rt_print_init(size_t buffer_size, const char 
>> *buffer_name)
>>                      return 0;
>>              }
>>              cleanup_buffer(buffer);
>> +            buffer = NULL;
>>      }
>>  
>> -    buffer = malloc(sizeof(*buffer));
>> -    if (!buffer)
>> -            return ENOMEM;
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +    /* Find a free buffer in the pool */
>> +    do {
>> +            unsigned long bitmap;
>> +            unsigned i;
>>  
>> -    buffer->ring = malloc(size);
>> -    if (!buffer->ring) {
>> -            free(buffer);
>> -            return ENOMEM;
>> -    }
>> -    memset(buffer->ring, 0, size);
>> +            for (i = 0; i < pool_bitmap_len; i++) {
>> +                    old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
>> +                    if (old_bitmap)
>> +                            goto acquire;
>> +            }
>>  
>> -    buffer->read_pos  = 0;
>> -    buffer->write_pos = 0;
>> +            goto not_found;
>>  
>> -    buffer->size = size;
>> +      acquire:
>> +            do {
>> +                    bitmap = old_bitmap;
>> +                    j = __builtin_ffsl(bitmap) - 1;
>> +                    old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
>> +                                                       bitmap,
>> +                                                       bitmap & ~(1UL << 
>> j));
>> +            } while (old_bitmap != bitmap && old_bitmap);
>> +            j += i * BITS_PER_LONG;
>> +    } while (!old_bitmap);
>>  
>> -    set_buffer_name(buffer, buffer_name);
>> +    buffer = (struct print_buffer *)(pool_start + j * pool_buf_size);
>>  
>> -    buffer->prev = NULL;
>> +  not_found:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>>  
>> -    pthread_mutex_lock(&buffer_lock);
>> +    if (!buffer) {
>> +            assert_nrt();
>>  
>> -    buffer->next = first_buffer;
>> -    if (first_buffer)
>> -            first_buffer->prev = buffer;
>> -    first_buffer = buffer;
>> +            buffer = malloc(sizeof(*buffer));
>> +            if (!buffer)
>> +                    return ENOMEM;
>>  
>> -    buffers++;
>> -    pthread_cond_signal(&printer_wakeup);
>> +            buffer->ring = malloc(size);
>> +            if (!buffer->ring) {
>> +                    free(buffer);
>> +                    return ENOMEM;
>> +            }
>>  
>> -    pthread_mutex_unlock(&buffer_lock);
>> +            rt_print_init_inner(buffer, size);
>> +    }
>> +
>> +    set_buffer_name(buffer, buffer_name);
>>  
>>      pthread_setspecific(buffer_key, buffer);
>>  
>> @@ -346,12 +400,44 @@ static void cleanup_buffer(struct print_buffer *buffer)
>>  {
>>      struct print_buffer *prev, *next;
>>  
>> +    assert_nrt();
>> +
>>      pthread_setspecific(buffer_key, NULL);
>>  
>>      pthread_mutex_lock(&buffer_lock);
>>  
>>      print_buffers();
>>  
>> +    pthread_mutex_unlock(&buffer_lock);
>> +
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +    /* Return the buffer to the pool */
>> +    {
>> +            unsigned long old_bitmap, bitmap;
>> +            unsigned i, j;
>> +
>> +            if ((unsigned long)buffer - pool_start >= pool_len)
>> +                    goto dofree;
>> +
>> +            j = ((unsigned long)buffer - pool_start) / pool_buf_size;
>> +            i = j / BITS_PER_LONG;
>> +            j = j % BITS_PER_LONG;
>> +
>> +            old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
>> +            do {
>> +                    bitmap = old_bitmap;
>> +                    old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
>> +                                                       bitmap,
>> +                                                       bitmap | (1UL << j));
>> +            } while (old_bitmap != bitmap);
>> +
>> +            return;
>> +    }
>> +  dofree:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>> +
>> +    pthread_mutex_lock(&buffer_lock);
>> +
>>      prev = buffer->prev;
>>      next = buffer->next;
>>  
>> @@ -523,6 +609,64 @@ void __rt_print_init(void)
>>      print_period.tv_sec  = period / 1000;
>>      print_period.tv_nsec = (period % 1000) * 1000000;
>>  
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +    /* Fill the buffer pool */
>> +    {
>> +            unsigned buffers_count, i;
>> +
>> +            buffers_count = RT_PRINT_DEFAULT_BUFFERS_COUNT;
>> +
>> +            value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
>> +            if (value_str) {
>> +                    errno = 0;
>> +                    buffers_count = strtoul(value_str, NULL, 0);
>> +                    if (errno) {
>> +                            fprintf(stderr, "Invalid %s\n",
>> +                                    RT_PRINT_BUFFERS_COUNT_ENV);
>> +                            exit(1);
>> +                    }
>> +            }
>> +
>> +            pool_bitmap_len = (buffers_count + BITS_PER_LONG - 1)
>> +                    / BITS_PER_LONG;
>> +            if (!pool_bitmap_len)
>> +                    goto done;
>> +
>> +            pool_bitmap = malloc(pool_bitmap_len * sizeof(*pool_bitmap));
>> +            if (!pool_bitmap) {
>> +                    fprintf(stderr, "Error allocating rt_printf "
>> +                            "buffers\n");
>> +                    exit(1);
>> +            }
>> +
>> +            pool_buf_size = sizeof(struct print_buffer) + 
>> default_buffer_size;
>> +            pool_len = buffers_count * pool_buf_size;
>> +            pool_start = (unsigned long)malloc(pool_len);
>> +            if (!pool_start) {
>> +                    fprintf(stderr, "Error allocating rt_printf "
>> +                            "buffers\n");
>> +                    exit(1);
>> +            }
>> +
>> +            for (i = 0; i < buffers_count / BITS_PER_LONG; i++)
>> +                    xnarch_atomic_set(&pool_bitmap[i], ~0UL);
>> +            if (buffers_count % BITS_PER_LONG)
>> +                    xnarch_atomic_set(&pool_bitmap[i],
>> +                                      (1UL << (buffers_count % 
>> BITS_PER_LONG))                                        - 1);
>> +
>> +            for (i = 0; i < buffers_count; i++) {
>> +                    struct print_buffer *buffer =
>> +                            (struct print_buffer *)
>> +                            (pool_start + i * pool_buf_size);
>> +
>> +                    buffer->ring = (char *)(buffer + 1);
>> +
>> +                    rt_print_init_inner(buffer, default_buffer_size);
>> +            }
>> +    }
>> +  done:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>> +
>>      pthread_mutex_init(&buffer_lock, NULL);
>>      pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
>>  
>>
> 
> Looks good to me.
> 
> [ As you know ensure that buffers taken from the pool are always
> returned to it, thus the objects remain valid, you could theoretically
> also use a linked list again. ]

In the mean-time, I have read a few things about a LIFO implementation
using cmpxchg, and learned about the ABA problem. So, it is not going to
be as simple as I imagined, I prefer to stick to bitmap approach, if you
have no objection.

-- 
                                                                Gilles.

_______________________________________________
Xenomai-core mailing list
Xenomai-core@gna.org
https://mail.gna.org/listinfo/xenomai-core

Reply via email to