Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Gavin Hu <gavin...@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com>
Reviewed-by: Steve Capper <steve.cap...@arm.com>
Reviewed-by: Yipeng Wang <yipeng1.w...@intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 306 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   3 +
 2 files changed, 209 insertions(+), 100 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c 
b/lib/librte_hash/rte_cuckoo_hash.c
index e2b0260..dfd5f2a 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -96,6 +96,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
        unsigned int readwrite_concur_support = 0;
        unsigned int writer_takes_lock = 0;
        unsigned int recycle_on_del = 1;
+       uint32_t *tbl_chng_cnt = NULL;
 
        rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -210,6 +211,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
                goto err_unlock;
        }
 
+       tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+                       RTE_CACHE_LINE_SIZE, params->socket_id);
+
+       if (tbl_chng_cnt == NULL) {
+               RTE_LOG(ERR, HASH, "memory allocation failed\n");
+               goto err_unlock;
+       }
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -276,6 +285,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
                default_hash_func : params->hash_func;
        h->key_store = k;
        h->free_slots = r;
+       h->tbl_chng_cnt = tbl_chng_cnt;
+       *h->tbl_chng_cnt = 0;
        h->hw_trans_mem_support = hw_trans_mem_support;
        h->multi_writer_support = multi_writer_support;
        h->readwrite_concur_support = readwrite_concur_support;
@@ -321,6 +332,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
        rte_free(h);
        rte_free(buckets);
        rte_free(k);
+       rte_free(tbl_chng_cnt);
        return NULL;
 }
 
@@ -359,6 +371,7 @@ rte_hash_free(struct rte_hash *h)
        rte_ring_free(h->free_slots);
        rte_free(h->key_store);
        rte_free(h->buckets);
+       rte_free(h->tbl_chng_cnt);
        rte_free(h);
        rte_free(te);
 }
@@ -456,6 +469,7 @@ rte_hash_reset(struct rte_hash *h)
        __hash_rw_writer_lock(h);
        memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
        memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+       *h->tbl_chng_cnt = 0;
 
        /* clear the free ring */
        while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -650,11 +664,27 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                if (unlikely(&h->buckets[prev_alt_bkt_idx]
                                != curr_bkt)) {
                        /* revert it to empty, otherwise duplicated keys */
-                       curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+                       __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                               EMPTY_SLOT,
+                               __ATOMIC_RELEASE);
                        __hash_rw_writer_unlock(h);
                        return -1;
                }
 
+               /* Inform the previous move. The current move need
+                * not be informed now as the current bucket entry
+                * is present in both primary and secondary.
+                * Since there is one writer, load acquires on
+                * tbl_chng_cnt are not required.
+                */
+               __atomic_store_n(h->tbl_chng_cnt,
+                                *h->tbl_chng_cnt + 1,
+                                __ATOMIC_RELEASE);
+               /* The stores to sig_alt and sig_current should not
+                * move above the store to tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_RELEASE);
+
                /* Need to swap current/alt sig to allow later
                 * Cuckoo insert to move elements back to its
                 * primary bucket if available
@@ -673,6 +703,20 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                curr_bkt = curr_node->bkt;
        }
 
+       /* Inform the previous move. The current move need
+        * not be informed now as the current bucket entry
+        * is present in both primary and secondary.
+        * Since there is one writer, load acquires on
+        * tbl_chng_cnt are not required.
+        */
+       __atomic_store_n(h->tbl_chng_cnt,
+                        *h->tbl_chng_cnt + 1,
+                        __ATOMIC_RELEASE);
+       /* The stores to sig_alt and sig_current should not
+        * move above the store to tbl_chng_cnt.
+        */
+       __atomic_thread_fence(__ATOMIC_RELEASE);
+
        curr_bkt->sig_current[curr_slot] = sig;
        curr_bkt->sig_alt[curr_slot] = alt_hash;
        /* Release the new bucket entry */
@@ -937,30 +981,56 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, 
const void *key,
        uint32_t bucket_idx;
        hash_sig_t alt_hash;
        struct rte_hash_bucket *bkt;
+       uint32_t cnt_b, cnt_a;
        int ret;
 
-       bucket_idx = sig & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
-
        __hash_rw_reader_lock(h);
 
-       /* Check if key is in primary location */
-       ret = search_one_bucket(h, key, sig, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
-       /* Calculate secondary hash */
-       alt_hash = rte_hash_secondary_hash(sig);
-       bucket_idx = alt_hash & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in search_one_bucket are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                               __ATOMIC_ACQUIRE);
+
+               bucket_idx = sig & h->bucket_bitmask;
+               bkt = &h->buckets[bucket_idx];
+
+               /* Check if key is in primary location */
+               ret = search_one_bucket(h, key, sig, data, bkt);
+               if (ret != -1) {
+                       __hash_rw_reader_unlock(h);
+                       return ret;
+               }
+               /* Calculate secondary hash */
+               alt_hash = rte_hash_secondary_hash(sig);
+               bucket_idx = alt_hash & h->bucket_bitmask;
+               bkt = &h->buckets[bucket_idx];
+
+               /* Check if key is in secondary location */
+               ret = search_one_bucket(h, key, alt_hash, data, bkt);
+               if (ret != -1) {
+                       __hash_rw_reader_unlock(h);
+                       return ret;
+               }
+
+               /* The loads of sig_current in search_one_bucket
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, key index in primary bucket
+                * and key index in secondary bucket will make sure
+                * that it does not get hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
-       /* Check if key is in secondary location */
-       ret = search_one_bucket(h, key, alt_hash, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
        __hash_rw_reader_unlock(h);
        return -ENOENT;
 }
@@ -1242,6 +1312,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const 
void **keys,
        uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+       uint32_t cnt_b, cnt_a;
 
        /* Prefetch first keys */
        for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1277,102 +1348,137 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, 
const void **keys,
        }
 
        __hash_rw_reader_lock(h);
-       /* Compare signatures and prefetch key slot of first hit */
-       for (i = 0; i < num_keys; i++) {
-               compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in compare_signatures are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+
+               /* Compare signatures and prefetch key slot of first hit */
+               for (i = 0; i < num_keys; i++) {
+                       compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
                                primary_bkt[i], secondary_bkt[i],
                                prim_hash[i], sec_hash[i], h->sig_cmp_fn);
 
-               if (prim_hitmask[i]) {
-                       uint32_t first_hit = __builtin_ctzl(prim_hitmask[i]);
-                       uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
-                       continue;
-               }
+                       if (prim_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(prim_hitmask[i]);
+                               uint32_t key_idx =
+                                       primary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                               continue;
+                       }
 
-               if (sec_hitmask[i]) {
-                       uint32_t first_hit = __builtin_ctzl(sec_hitmask[i]);
-                       uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
+                       if (sec_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(sec_hitmask[i]);
+                               uint32_t key_idx =
+                                       secondary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                       }
                }
-       }
 
-       /* Compare keys, first hits in primary first */
-       for (i = 0; i < num_keys; i++) {
-               positions[i] = -ENOENT;
-               while (prim_hitmask[i]) {
-                       uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]);
+               /* Compare keys, first hits in primary first */
+               for (i = 0; i < num_keys; i++) {
+                       positions[i] = -ENOENT;
+                       while (prim_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(prim_hitmask[i]);
 
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &primary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &primary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               prim_hitmask[i] &= ~(1 << (hit_index));
                        }
-                       prim_hitmask[i] &= ~(1 << (hit_index));
-               }
 
-               while (sec_hitmask[i]) {
-                       uint32_t hit_index = __builtin_ctzl(sec_hitmask[i]);
+                       while (sec_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(sec_hitmask[i]);
 
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &secondary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &secondary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               sec_hitmask[i] &= ~(1 << (hit_index));
                        }
-                       sec_hitmask[i] &= ~(1 << (hit_index));
-               }
 
 next_key:
-               continue;
-       }
+                       continue;
+               }
+
+               /* The loads of sig_current in compare_signatures
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, primary key index and secondary
+                * key index will make sure that it does not get
+                * hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
        __hash_rw_reader_unlock(h);
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h 
b/lib/librte_hash/rte_cuckoo_hash.h
index a44c6be..cf50ada 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -196,6 +197,8 @@ struct rte_hash {
         * to the key table.
         */
        rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
+       uint32_t *tbl_chng_cnt;
+       /**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
-- 
2.7.4

Reply via email to