On Wed, Aug 17, 2011 at 09:43:59AM +0800, Lai Jiangshan wrote:
> On 08/17/2011 03:20 AM, Paul E. McKenney wrote:
> > On Tue, Aug 16, 2011 at 03:58:07PM +0800, Lai Jiangshan wrote:
> >> synchronize_rcu() find out ongoing readers and wait for them
> > 
> > Cute!
> > 
> > But some questions and spelling nits below.
> > 
> >                                                     Thanx, Paul
> > 
> >> Signed-off-by: Lai Jiangshan <[email protected]>
> >> ---
> >>  urcu.c             |   49 
> >> +++++++++++++++++++++++++++++++++++++++++++++++++
> >>  urcu/static/urcu.h |   42 ++++++++++++++++++++++++++++++++++++++++++
> >>  2 files changed, 91 insertions(+), 0 deletions(-)
> >>
> >> diff --git a/urcu.c b/urcu.c
> >> index bd54467..c13be67 100644
> >> --- a/urcu.c
> >> +++ b/urcu.c
> >> @@ -419,6 +419,53 @@ static void 
> >> urcu_sync_unlock_if_not_proxy_unlocked(int32_t *lock, int32_t self)
> >>            urcu_sync_slow_unlock(lock);
> >>  }
> >>
> >> +/* also implies mb() */
> >> +void __urcu_read_unlock_specail(void)
> > 
> > s/__urcu_read_unlock_specail/__urcu_read_unlock_special/, please.
> > 
> >> +{
> >> +  urcu_sync_unlock_if_not_proxy_unlocked(&rcu_reader.sync,
> >> +                  rcu_reader.tid);
> >> +}
> >> +
> >> +/*
> >> + * synchronize_rcu                rcu_read_unlock(outmost)
> >> + *
> >> + *
> >> + * sync = tid;                    ctr = 0;
> >> + * smp_mb_master();               smp_mb_slave();
> > 
> 
> Sorry, I use this comments to explain why synchronize_rcu()
> will not sleep endless when synchronize_rcu() find ctr != 0,
> 
> > But smp_mb_slave() expands to 'asm volatile("" : : : "memory")'
> > for RCU_SIGNAL, which would open up a number of vulnerabilities.
> > 
> > And yes, smp_mb_master() does a memory barrier on all CPUs in
> > the RCU_SIGNAL case, but that only guarantees that if there was
> > a reader pre-dating the beginning of the grace period, that if
> > that reader accessed any pre-grace-period state, we also see
> > that reader.  Misordering of the rcu_read_unlock() with the
> > prior critical section, and this is why there is an smp_mb_master()
> > prior to exit from synchronize_rcu().
> > 
> > So for this to work, I believe that you need an smp_mb_master()
> > between checking the index->ctr and doing the urcu_sync_proxy_unlock().
> 
> Calling urcu_sync_proxy_unlock() earlier is OK.
> __urcu_read_unlock_special() do nothing when it found the lock is proxy 
> unlocked.
> 
> If we find ctr == 0:
> I did add a smp_mb_master() after we find ctr == 0(also after 
> urcu_sync_proxy_unlock()).

So it is OK to have urcu_sync_proxy_unlock() execute concurrently
with urcu_sync_unlock_if_not_proxy_unlocked()?  If so, no problem.

> > Or am I missing something that restricts URCU priority boosting
> > to RCU_MB (where it looks like it might work)?  Or missing some
> > extra synchronization in there somewhere?
> 
> initial: sync=0,ctr>0
> 
> sync = tid;                   ctr = 0
> smp_mb_master();              smp_mb_slave();
> local_ctr = ctr;              local_sync = sync;
>       assert(local_ctr==0 || local_sync == tid);
> 
> So if we find the ctr != 0(local_ctr!=0):
> the reader will call __urcu_read_unlock_special(),
> synchronize_rcu() can be woke up and will not sleep endless.

Yep, I was worried about synchronizing access to the boost mutex rather
than the effects on the grace period.

> >> + * test ctr and wait;             test sync and wakeup;
> >> + */
> >> +
> >> +void synchronize_rcu(void)
> >> +{
> >> +  struct rcu_reader *index;
> >> +  int32_t self = syscall(SYS_gettid);
> >> +
> >> +  mutex_lock(&rcu_gp_lock);
> >> +
> >> +  if (cds_list_empty(&registry))
> >> +          goto out;
> >> +
> >> +  cds_list_for_each_entry(index, &registry, node)
> >> +          urcu_sync_proxy_lock(&index->sync, index->tid);
> >> +
> >> +  smp_mb_master(RCU_MB_GROUP);
> >> +
> >> +  cds_list_for_each_entry(index, &registry, node) {
> >> +          if (_CMM_LOAD_SHARED(index->ctr) == 0) {
> >> +                  urcu_sync_proxy_unlock(&index->sync);
> >> +          } else {
> >> +                  /* reader still running, we need to wait reader */
> >> +                  urcu_sync_lock(&index->sync, self);
> >> +                  urcu_sync_unlock(&index->sync, self);
> >> +          }
> >> +  }
> >> +
> >> +  /* ensure rcu_read_unlock() finish when we found the ctr==0 */
> >> +  smp_mb_master(RCU_MB_GROUP); /* ensure rcu_read_unlock() finish */
> 
> This smp_mb_master() is needed after we find ctr == 0.

Again, my concern was the manipulations of index->sync rather than
the grace period.

> >> +
> >> +out:
> >> +  mutex_unlock(&rcu_gp_lock);
> >> +}
> >>  #endif /* #else #ifndef URCU_WAIT_READER */
> >>
> >>  /*
> >> @@ -437,7 +484,9 @@ void rcu_read_unlock(void)
> >>
> >>  void rcu_register_thread(void)
> >>  {
> >> +  rcu_reader.tid = syscall(SYS_gettid);
> >>    rcu_reader.pthread = pthread_self();
> >> +  rcu_reader.sync = 0;
> >>    assert(rcu_reader.need_mb == 0);
> >>    assert(!(rcu_reader.ctr & RCU_GP_CTR_NEST_MASK));
> >>
> >> diff --git a/urcu/static/urcu.h b/urcu/static/urcu.h
> >> index 32d1af8..0941fd2 100644
> >> --- a/urcu/static/urcu.h
> >> +++ b/urcu/static/urcu.h
> >> @@ -221,9 +221,11 @@ static inline void smp_mb_slave(int group)
> >>  struct rcu_reader {
> >>    /* Data used by both reader and synchronize_rcu() */
> >>    unsigned long ctr;
> >> +  int32_t sync;
> >>    char need_mb;
> >>    /* Data used for registry */
> >>    struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> >> +  pid_t tid;
> >>    pthread_t pthread;
> >>  };
> >>
> >> @@ -313,6 +315,46 @@ static inline int32_t urcu_sync_lock_onwer(int32_t 
> >> *lock)
> >>    return _CMM_LOAD_SHARED(*lock) & ~FUTEX_WAITERS;
> >>  }
> >>
> >> +void __urcu_read_unlock_specail(void);
> >> +
> >> +static inline void _rcu_read_lock(void)
> >> +{
> >> +  unsigned long tmp;
> >> +
> >> +  cmm_barrier();
> >> +  tmp = rcu_reader.ctr;
> >> +
> >> +  if (!tmp) {
> >> +          _CMM_STORE_SHARED(rcu_reader.ctr, 1);
> >> +          smp_mb_slave(RCU_MB_GROUP);
> >> +  } else {
> >> +          _CMM_STORE_SHARED(rcu_reader.ctr, tmp + 1);
> >> +          cmm_barrier();
> >> +  }
> >> +}
> >> +
> >> +static inline void _rcu_read_unlock(void)
> >> +{
> >> +  unsigned long tmp;
> >> +
> >> +  cmm_barrier();
> >> +  tmp = rcu_reader.ctr;
> >> +
> >> +  if (tmp == 1) {
> >> +          smp_mb_slave(RCU_MB_GROUP);
> >> +
> >> +          rcu_reader.ctr = 0;
> >> +          smp_mb_slave(RCU_MB_GROUP);
> >> +
> >> +          if (unlikely(urcu_sync_lock_onwer(&rcu_reader.sync)
> >> +                          == rcu_reader.tid))
> >> +                  __urcu_read_unlock_specail();
> >> +  } else {
> >> +          _CMM_STORE_SHARED(rcu_reader.ctr, tmp - 1);
> >> +          cmm_barrier();
> >> +  }
> >> +}
> >> +
> >>  #endif /* #else #ifndef URCU_WAIT_READER */
> >>
> >>  #ifdef __cplusplus
> >> -- 
> >> 1.7.4.4
> >>
> > 
> 

_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to