On 12/19/2017 12:41 PM, Jesper Dangaard Brouer wrote:
On Tue, 19 Dec 2017 09:52:27 -0800 rao.sho...@oracle.com wrote:

+/* Main RCU function that is called to free RCU structures */
+static void
+__rcu_bulk_free(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
+{
+       unsigned long offset;
+       void *ptr;
+       struct rcu_bulk_free *rbf;
+       struct rcu_bulk_free_container *rbfc = NULL;
+
+       rbf = this_cpu_ptr(&cpu_rbf);
+
+       if (unlikely(!rbf->rbf_init)) {
+               spin_lock_init(&rbf->rbf_lock);
+               rbf->rbf_cpu = smp_processor_id();
+               rbf->rbf_init = true;
+       }
+
+       /* hold lock to protect against other cpu's */
+       spin_lock_bh(&rbf->rbf_lock);
I'm not sure this will be faster.  Having to take a cross CPU lock here
(+ BH-disable) could cause scaling issues.   Hopefully this lock will
not be used intensively by other CPUs, right?


The current cost of __call_rcu() is a local_irq_save/restore (which is
quite expensive, but doesn't cause cross CPU chatter).

Later in __rcu_process_callbacks() we have a local_irq_save/restore for
the entire list, plus a per object cost doing local_bh_disable/enable.
And for each object we call __rcu_reclaim(), which in some cases
directly call kfree().

As Paul has pointed out the lock is a per-cpu lock, the only reason for another CPU to access this lock is if the rcu callbacks run on a different CPU and there is nothing the code can do to avoid that but that should be rare anyways.



If I had to implement this: I would choose to do the optimization in
__rcu_process_callbacks() create small on-call-stack ptr-array for
kfree_bulk().  I would only optimize the case that call kfree()
directly.  In the while(list) loop I would defer calling
__rcu_reclaim() for __is_kfree_rcu_offset(head->func), and instead add
them to the ptr-array (and flush if the array is full in loop, and
kfree_bulk flush after loop).
This is exactly what the current code is doing. It accumulates only the calls made to
__kfree_rcu(head, offset) ==> kfree_call_rcu() ==> __bulk_free_rcu

__kfree_rcu has a check to make sure that an offset is being passed.

When a function pointer is passed the caller has to call call_rcu/call_rcu_sched

Accumulating early avoids the individual cost of calling __call_rcu

Perhaps I do not understand your point.

Shoaib

The real advantage of kfree_bulk() comes from amortizing the per kfree
(behind-the-scenes) sync cost.  There is an additional benefit, because
objects comes from RCU and will hit a slower path in SLUB.   The SLUB
allocator is very fast for objects that gets recycled quickly (short
lifetime), non-locked (cpu-local) double-cmpxchg.  But slower for
longer-lived/more-outstanding objects, as this hits a slower code-path,
fully locked (cross-cpu) double-cmpxchg.

+
+       rbfc = rbf->rbf_container;
+
+       if (rbfc == NULL) {
+               if (rbf->rbf_cached_container == NULL) {
+                       rbf->rbf_container =
+                           kmalloc(sizeof(struct rcu_bulk_free_container),
+                           GFP_ATOMIC);
+                       rbf->rbf_container->rbfc_rbf = rbf;
+               } else {
+                       rbf->rbf_container = rbf->rbf_cached_container;
+                       rbf->rbf_container->rbfc_rbf = rbf;
+                       cmpxchg(&rbf->rbf_cached_container,
+                           rbf->rbf_cached_container, NULL);
+               }
+
+               if (unlikely(rbf->rbf_container == NULL)) {
+
+                       /* Memory allocation failed maintain a list */
+
+                       head->func = (void *)func;
+                       head->next = rbf->rbf_list_head;
+                       rbf->rbf_list_head = head;
+                       rbf->rbf_list_size++;
+                       if (rbf->rbf_list_size == RCU_MAX_ACCUMULATE_SIZE)
+                               __rcu_bulk_schedule_list(rbf);
+
+                       goto done;
+               }
+
+               rbfc = rbf->rbf_container;
+               rbfc->rbfc_entries = 0;
+
+               if (rbf->rbf_list_head != NULL)
+                       __rcu_bulk_schedule_list(rbf);
+       }
+
+       offset = (unsigned long)func;
+       ptr = (void *)head - offset;
+
+       rbfc->rbfc_data[rbfc->rbfc_entries++] = ptr;
+       if (rbfc->rbfc_entries == RCU_MAX_ACCUMULATE_SIZE) {
+
+               WRITE_ONCE(rbf->rbf_container, NULL);
+               spin_unlock_bh(&rbf->rbf_lock);
+               call_rcu(&rbfc->rbfc_rcu, __rcu_bulk_free_impl);
+               return;
+       }
+
+done:
+       if (!rbf->rbf_monitor) {
+
+               call_rcu(&rbf->rbf_rcu, __rcu_bulk_free_monitor);
+               rbf->rbf_monitor = true;
+       }
+
+       spin_unlock_bh(&rbf->rbf_lock);
+}


Reply via email to