Hi Konstantin,

Thanks for review,

On 23/04/2020 14:31, Ananyev, Konstantin wrote:
Hi Vladimir,

Apologies for late review.
My comments below.

K32V64 hash is a hash table that supports 32 bit keys and 64 bit values.
This table is hash function agnostic so user must provide
precalculated hash signature for add/delete/lookup operations.

Signed-off-by: Vladimir Medvedkin <vladimir.medved...@intel.com>
---

--- /dev/null
+++ b/lib/librte_hash/rte_k32v64_hash.c
@@ -0,0 +1,315 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#include <string.h>
+
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_memory.h>
+#include <rte_tailq.h>
+
+#include <rte_k32v64_hash.h>
+
+TAILQ_HEAD(rte_k32v64_hash_list, rte_tailq_entry);
+
+static struct rte_tailq_elem rte_k32v64_hash_tailq = {
+.name = "RTE_K32V64_HASH",
+};
+
+EAL_REGISTER_TAILQ(rte_k32v64_hash_tailq);
+
+#define VALID_KEY_MSK           ((1 << RTE_K32V64_KEYS_PER_BUCKET) - 1)
+
+#ifdef CC_AVX512VL_SUPPORT
+int
+k32v64_hash_bulk_lookup_avx512vl(struct rte_k32v64_hash_table *table,
+uint32_t *keys, uint32_t *hashes, uint64_t *values, unsigned int n);
+#endif
+
+static int
+k32v64_hash_bulk_lookup(struct rte_k32v64_hash_table *table, uint32_t *keys,
+uint32_t *hashes, uint64_t *values, unsigned int n)
+{
+int ret, cnt = 0;
+unsigned int i;
+
+if (unlikely((table == NULL) || (keys == NULL) || (hashes == NULL) ||
+(values == NULL)))
+return -EINVAL;
+
+for (i = 0; i < n; i++) {
+ret = rte_k32v64_hash_lookup(table, keys[i], hashes[i],
+&values[i]);
+if (ret == 0)
+cnt++;
+}
+return cnt;
+}
+
+static rte_k32v64_hash_bulk_lookup_t
+get_lookup_bulk_fn(void)
+{
+#ifdef CC_AVX512VL_SUPPORT
+if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512F))
+return k32v64_hash_bulk_lookup_avx512vl;
+#endif
+return k32v64_hash_bulk_lookup;
+}
+
+int
+rte_k32v64_hash_add(struct rte_k32v64_hash_table *table, uint32_t key,
+uint32_t hash, uint64_t value)
+{
+uint32_t bucket;
+int i, idx, ret;
+uint8_t msk;
+struct rte_k32v64_ext_ent *tmp, *ent, *prev = NULL;
+
+if (table == NULL)
+return -EINVAL;
+
I think for add you also need to do update bucket.cnt
at the start/end of updates (as you do for del).


Agree. We can not guarantee atomic update for 64bit value on 32bit arch.
But I think it is better to make transaction as small as possible so I update bucket.cnt not on start/end but right before and after key/value rewrite.



+bucket = hash & table->bucket_msk;
+/* Search key in table. Update value if exists */
+for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
+if ((key == table->t[bucket].key[i]) &&
+(table->t[bucket].key_mask & (1 << i))) {
+table->t[bucket].val[i] = value;
+return 0;
+}
+}
+
+if (!SLIST_EMPTY(&table->t[bucket].head)) {
+SLIST_FOREACH(ent, &table->t[bucket].head, next) {
+if (ent->key == key) {
+ent->val = value;
+return 0;
+}
+}
+}
+
+msk = ~table->t[bucket].key_mask & VALID_KEY_MSK;
+if (msk) {
+idx = __builtin_ctz(msk);
+table->t[bucket].key[idx] = key;
+table->t[bucket].val[idx] = value;
+rte_smp_wmb();
+table->t[bucket].key_mask |= 1 << idx;
+table->nb_ent++;
+return 0;
+}
+
+ret = rte_mempool_get(table->ext_ent_pool, (void **)&ent);
+if (ret < 0)
+return ret;
+
+SLIST_NEXT(ent, next) = NULL;
+ent->key = key;
+ent->val = value;
+rte_smp_wmb();
+SLIST_FOREACH(tmp, &table->t[bucket].head, next)
+prev = tmp;
+
+if (prev == NULL)
+SLIST_INSERT_HEAD(&table->t[bucket].head, ent, next);
+else
+SLIST_INSERT_AFTER(prev, ent, next);
+
+table->nb_ent++;
+table->nb_ext_ent++;
+return 0;
+}
+
+int
+rte_k32v64_hash_delete(struct rte_k32v64_hash_table *table, uint32_t key,
+uint32_t hash)
+{
+uint32_t bucket;
+int i;
+struct rte_k32v64_ext_ent *ent;
+
+if (table == NULL)
+return -EINVAL;
+
+bucket = hash & table->bucket_msk;
+
+for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
+if ((key == table->t[bucket].key[i]) &&
+(table->t[bucket].key_mask & (1 << i))) {
+ent = SLIST_FIRST(&table->t[bucket].head);
+if (ent) {
+rte_atomic32_inc(&table->t[bucket].cnt);
I know that right now rte_atomic32 uses _sync gcc builtins underneath,
so it should be safe.
But I think the proper way would be:
table->t[bucket].cnt++;
rte_smp_wmb();
or as alternative probably use C11 atomic ACQUIRE/RELEASE


Agree.



+table->t[bucket].key[i] = ent->key;
+table->t[bucket].val[i] = ent->val;
+SLIST_REMOVE_HEAD(&table->t[bucket].head, next);
+rte_atomic32_inc(&table->t[bucket].cnt);
+table->nb_ext_ent--;
+} else
+table->t[bucket].key_mask &= ~(1 << i);
I think you protect that update with bucket.cnt.
 From my perspective -a s a rule of thumb any update to the bucket/list
Should be within that transaction-start/transaction-end.


I think it is possible to update key_mask with C11 atomic


+if (ent)
+rte_mempool_put(table->ext_ent_pool, ent);
+table->nb_ent--;
+return 0;
+}
+}
+
+SLIST_FOREACH(ent, &table->t[bucket].head, next)
+if (ent->key == key)
+break;
+
+if (ent == NULL)
+return -ENOENT;
+
+rte_atomic32_inc(&table->t[bucket].cnt);
+SLIST_REMOVE(&table->t[bucket].head, ent, rte_k32v64_ext_ent, next);
+rte_atomic32_inc(&table->t[bucket].cnt);
+rte_mempool_put(table->ext_ent_pool, ent);
+
+table->nb_ext_ent--;
+table->nb_ent--;
+
+return 0;
+}
+
+struct rte_k32v64_hash_table *
+rte_k32v64_hash_find_existing(const char *name)
+{
+struct rte_k32v64_hash_table *h = NULL;
+struct rte_tailq_entry *te;
+struct rte_k32v64_hash_list *k32v64_hash_list;
+
+k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
+rte_k32v64_hash_list);
+
+rte_mcfg_tailq_read_lock();
+TAILQ_FOREACH(te, k32v64_hash_list, next) {
+h = (struct rte_k32v64_hash_table *) te->data;
+if (strncmp(name, h->name, RTE_K32V64_HASH_NAMESIZE) == 0)
+break;
+}
+rte_mcfg_tailq_read_unlock();
+if (te == NULL) {
+rte_errno = ENOENT;
+return NULL;
+}
+return h;
+}
+
+struct rte_k32v64_hash_table *
+rte_k32v64_hash_create(const struct rte_k32v64_hash_params *params)
+{
+char hash_name[RTE_K32V64_HASH_NAMESIZE];
+struct rte_k32v64_hash_table *ht = NULL;
+struct rte_tailq_entry *te;
+struct rte_k32v64_hash_list *k32v64_hash_list;
+uint32_t mem_size, nb_buckets, max_ent;
+int ret;
+struct rte_mempool *mp;
+
+if ((params == NULL) || (params->name == NULL) ||
+(params->entries == 0)) {
+rte_errno = EINVAL;
+return NULL;
+}
+
+k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
+rte_k32v64_hash_list);
+
+ret = snprintf(hash_name, sizeof(hash_name), "K32V64_%s", params->name);
+if (ret < 0 || ret >= RTE_K32V64_HASH_NAMESIZE) {
+rte_errno = ENAMETOOLONG;
+return NULL;
+}
+
+max_ent = rte_align32pow2(params->entries);
+nb_buckets = max_ent / RTE_K32V64_KEYS_PER_BUCKET;
+mem_size = sizeof(struct rte_k32v64_hash_table) +
+sizeof(struct rte_k32v64_hash_bucket) * nb_buckets;
+
+mp = rte_mempool_create(hash_name, max_ent,
+sizeof(struct rte_k32v64_ext_ent), 0, 0, NULL, NULL, NULL, NULL,
+params->socket_id, 0);
+
+if (mp == NULL)
+return NULL;
+
+rte_mcfg_tailq_write_lock();
+TAILQ_FOREACH(te, k32v64_hash_list, next) {
+ht = (struct rte_k32v64_hash_table *) te->data;
+if (strncmp(params->name, ht->name,
+RTE_K32V64_HASH_NAMESIZE) == 0)
+break;
+}
+ht = NULL;
+if (te != NULL) {
+rte_errno = EEXIST;
+rte_mempool_free(mp);
+goto exit;
+}
+
+te = rte_zmalloc("K32V64_HASH_TAILQ_ENTRY", sizeof(*te), 0);
+if (te == NULL) {
+RTE_LOG(ERR, HASH, "Failed to allocate tailq entry\n");
+rte_mempool_free(mp);
+goto exit;
+}
+
+ht = rte_zmalloc_socket(hash_name, mem_size,
+RTE_CACHE_LINE_SIZE, params->socket_id);
+if (ht == NULL) {
+RTE_LOG(ERR, HASH, "Failed to allocate fbk hash table\n");
+rte_free(te);
+rte_mempool_free(mp);
+goto exit;
+}
+
+memcpy(ht->name, hash_name, sizeof(ht->name));
+ht->max_ent = max_ent;
+ht->bucket_msk = nb_buckets - 1;
+ht->ext_ent_pool = mp;
+ht->lookup = get_lookup_bulk_fn();
+
+te->data = (void *)ht;
+TAILQ_INSERT_TAIL(k32v64_hash_list, te, next);
+
+exit:
+rte_mcfg_tailq_write_unlock();
+
+return ht;
+}
+
+void
+rte_k32v64_hash_free(struct rte_k32v64_hash_table *ht)
+{
+struct rte_tailq_entry *te;
+struct rte_k32v64_hash_list *k32v64_hash_list;
+
+if (ht == NULL)
+return;
+
+k32v64_hash_list = RTE_TAILQ_CAST(rte_k32v64_hash_tailq.head,
+rte_k32v64_hash_list);
+
+rte_mcfg_tailq_write_lock();
+
+/* find out tailq entry */
+TAILQ_FOREACH(te, k32v64_hash_list, next) {
+if (te->data == (void *) ht)
+break;
+}
+
+
+if (te == NULL) {
+rte_mcfg_tailq_write_unlock();
+return;
+}
+
+TAILQ_REMOVE(k32v64_hash_list, te, next);
+
+rte_mcfg_tailq_write_unlock();
+
+rte_mempool_free(ht->ext_ent_pool);
+rte_free(ht);
+rte_free(te);
+}
diff --git a/lib/librte_hash/rte_k32v64_hash.h 
b/lib/librte_hash/rte_k32v64_hash.h
new file mode 100644
index 0000000..b2c52e9
--- /dev/null
+++ b/lib/librte_hash/rte_k32v64_hash.h
@@ -0,0 +1,211 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2020 Intel Corporation
+ */
+
+#ifndef _RTE_K32V64_HASH_H_
+#define _RTE_K32V64_HASH_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_compat.h>
+#include <rte_atomic.h>
+#include <rte_mempool.h>
+
+#define RTE_K32V64_HASH_NAMESIZE32
+#define RTE_K32V64_KEYS_PER_BUCKET4
+#define RTE_K32V64_WRITE_IN_PROGRESS1
+
+struct rte_k32v64_hash_params {
+const char *name;
+uint32_t entries;
+int socket_id;
+};
+
+struct rte_k32v64_ext_ent {
+SLIST_ENTRY(rte_k32v64_ext_ent) next;
+uint32_tkey;
+uint64_tval;
+};
+
+struct rte_k32v64_hash_bucket {
+uint32_tkey[RTE_K32V64_KEYS_PER_BUCKET];
+uint64_tval[RTE_K32V64_KEYS_PER_BUCKET];
+uint8_tkey_mask;
+rte_atomic32_tcnt;
+SLIST_HEAD(rte_k32v64_list_head, rte_k32v64_ext_ent) head;
+} __rte_cache_aligned;
+
+struct rte_k32v64_hash_table;
+
+typedef int (*rte_k32v64_hash_bulk_lookup_t)
+(struct rte_k32v64_hash_table *table, uint32_t *keys, uint32_t *hashes,
+uint64_t *values, unsigned int n);
+
+struct rte_k32v64_hash_table {
+char name[RTE_K32V64_HASH_NAMESIZE];/**< Name of the hash. */
+uint32_tnb_ent;/**< Number of entities in the table*/
+uint32_tnb_ext_ent;/**< Number of extended entities */
+uint32_tmax_ent;/**< Maximum number of entities */
+uint32_tbucket_msk;
+struct rte_mempool*ext_ent_pool;
+rte_k32v64_hash_bulk_lookup_tlookup;
+__extension__ struct rte_k32v64_hash_buckett[0];
+};
+
+typedef int (*rte_k32v64_cmp_fn_t)
+(struct rte_k32v64_hash_bucket *bucket, uint32_t key, uint64_t *val);
+
+static inline int
+__k32v64_cmp_keys(struct rte_k32v64_hash_bucket *bucket, uint32_t key,
+uint64_t *val)
+{
+int i;
+
+for (i = 0; i < RTE_K32V64_KEYS_PER_BUCKET; i++) {
+if ((key == bucket->key[i]) &&
+(bucket->key_mask & (1 << i))) {
+*val = bucket->val[i];
+return 1;
+}
+}
+
+return 0;
+}
+
+static inline int
+__k32v64_hash_lookup(struct rte_k32v64_hash_table *table, uint32_t key,
+uint32_t hash, uint64_t *value, rte_k32v64_cmp_fn_t cmp_f)
+{
+uint64_tval = 0;
+struct rte_k32v64_ext_ent *ent;
+int32_tcnt;
+int found = 0;
+uint32_t bucket = hash & table->bucket_msk;
+
+do {
+do
+cnt = rte_atomic32_read(&table->t[bucket].cnt);
+while (unlikely(cnt & RTE_K32V64_WRITE_IN_PROGRESS));
+
+found = cmp_f(&table->t[bucket], key, &val);
+if (unlikely((found == 0) &&
+(!SLIST_EMPTY(&table->t[bucket].head)))) {
+SLIST_FOREACH(ent, &table->t[bucket].head, next) {
+if (ent->key == key) {
+val = ent->val;
+found = 1;
+break;
+}
+}
+}
+
+} while (unlikely(cnt != rte_atomic32_read(&table->t[bucket].cnt)));
AFAIK atomic32_read is just a normal read op, so it can be reordered with other 
ops.
So this construction doesn't protect you from races.
What you probably need here:

do {
cnt1  =   table->t[bucket].cnt;
rte_smp_rmb();
....
rte_smp_rmb();
cnt2 = table->t[bucket].cnt;
while (cnt1 != cnt2 || (cnt1 & RTE_K32V64_WRITE_IN_PROGRESS) != 0)


Agree, this reads could be reordered. Replace it with C11 atomics.



+
+if (found == 1) {
+*value = val;
+return 0;
+} else
+return -ENOENT;
+}
+

--
Regards,
Vladimir

Reply via email to