[ https://issues.apache.org/jira/browse/TS-4331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351288#comment-15351288 ]
ASF GitHub Bot commented on TS-4331: ------------------------------------ Github user JamesPeach commented on a diff in the pull request: https://github.com/apache/trafficserver/pull/653#discussion_r68601646 --- Diff: iocore/hostdb/P_RefCountCache.h --- @@ -0,0 +1,896 @@ +/** @file + + A cache (with map-esque interface) for RefCountObjs + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef _P_RefCountCache_h_ +#define _P_RefCountCache_h_ + +#include "I_EventSystem.h" +#include "P_EventSystem.h" // TODO: less? just need ET_TASK + +#include <ts/Map.h> +#include "ts/PriorityQueue.h" + +#include <ts/List.h> +#include <ts/ink_hrtime.h> + +#include "ts/Vec.h" +#include "ts/I_Version.h" +#include <unistd.h> + +#define REFCOUNT_CACHE_EVENT_SYNC REFCOUNT_CACHE_EVENT_EVENTS_START + +#define REFCOUNTCACHE_MAGIC_NUMBER 0x0BAD2D9 +#define REFCOUNTCACHE_MAJOR_VERSION 1 +#define REFCOUNTCACHE_MINOR_VERSION 0 + +// Stats +enum RefCountCache_Stats { + refcountcache_current_items_stat, // current number of items + refcountcache_current_size_stat, // current size of cache + refcountcache_total_inserts_stat, // total items inserted + refcountcache_total_failed_inserts_stat, // total items unable to insert + refcountcache_total_lookups_stat, // total get() calls + refcountcache_total_hits_stat, // total hits + + // Persistence metrics + refcountcache_last_sync_time, // seconds since epoch of last successful sync + refcountcache_last_total_items, // number of items sync last time + refcountcache_last_total_size, // total size at last sync + + RefCountCache_Stat_Count +}; + +struct RefCountCacheItemMeta { + uint64_t key; + unsigned int size; + ink_time_t expiry_time; // expire time as seconds since epoch + RefCountCacheItemMeta(uint64_t key, unsigned int size, int expire_time = -1) : key(key), size(size), expiry_time(expire_time) {} +}; + +// Layer of indirection for the hashmap-- since it needs lots of things inside of it +// We'll also use this as the item header, for persisting objects to disk +class RefCountCacheHashEntry +{ +public: + Ptr<RefCountObj> item; + LINK(RefCountCacheHashEntry, item_link); + + PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry; + + RefCountCacheItemMeta meta; + void + set(RefCountObj *i, uint64_t key, unsigned int size, int expire_time) + { + this->item = make_ptr(i); + this->meta = RefCountCacheItemMeta(key, size, expire_time); + }; + // Need a no-argument constructor to use the classAllocator + RefCountCacheHashEntry() : expiry_entry(NULL), meta(0, 0){}; + + // make these values comparable -- so we can sort them + bool + operator<(const RefCountCacheHashEntry &v2) const + { + return this->meta.expiry_time < v2.meta.expiry_time; + }; +}; +// Since the hashing values are all fixed size, we can simply use a classAllocator to avoid mallocs +extern ClassAllocator<RefCountCacheHashEntry> refCountCacheHashingValueAllocator; +extern ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> expiryQueueEntry; + +struct RefCountCacheHashing { + typedef uint64_t ID; + typedef uint64_t const Key; + typedef RefCountCacheHashEntry Value; + typedef DList(RefCountCacheHashEntry, item_link) ListHead; + + static ID + hash(Key key) + { + return key; + } + static Key + key(Value const *value) + { + return value->meta.key; + } + static bool + equal(Key lhs, Key rhs) + { + return lhs == rhs; + } +}; + +// The RefCountCachePartition is simply a map of key -> Ptr<YourClass> +// We partition the cache to reduce lock contention +template <class C> class RefCountCachePartition +{ +public: + RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, RecRawStatBlock *rsb = NULL); + Ptr<C> get(uint64_t key); + void put(uint64_t key, C *item, int size = 0, int expire_time = 0); + void erase(uint64_t key, int expiry_time = -1); + + void clear(); + bool is_full(); + bool reserve(unsigned int); + + const size_t count(); + void copy(Vec<RefCountCacheHashEntry *> &items); + + typedef typename TSHashTable<RefCountCacheHashing>::iterator iterator_type; + typedef typename TSHashTable<RefCountCacheHashing>::self hash_type; + typedef typename TSHashTable<RefCountCacheHashing>::Location location_type; + TSHashTable<RefCountCacheHashing> *get_map(); + + Ptr<ProxyMutex> lock; // Lock + +private: + void metric_inc(RefCountCache_Stats metric_enum, int64_t data); + + unsigned int part_num; + + uint64_t max_size; + unsigned int max_items; + + uint64_t size; + unsigned int items; + + hash_type item_map; + + PriorityQueue<RefCountCacheHashEntry *> expiry_queue; + RecRawStatBlock *rsb; +}; + +template <class C> +RefCountCachePartition<C>::RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, + RecRawStatBlock *rsb) +{ + this->part_num = part_num; + this->max_size = max_size; + this->max_items = max_items; + this->size = 0; + this->items = 0; + this->rsb = rsb; + + // Initialize lock + this->lock = new_ProxyMutex(); +} + +template <class C> +Ptr<C> +RefCountCachePartition<C>::get(uint64_t key) +{ + this->metric_inc(refcountcache_total_lookups_stat, 1); + location_type l = this->item_map.find(key); + if (l.isValid()) { + // found + this->metric_inc(refcountcache_total_hits_stat, 1); + return make_ptr((C *)l.m_value->item.get()); + } else { + return Ptr<C>(); + } +} + +template <class C> +void +RefCountCachePartition<C>::put(uint64_t key, C *item, int size, int expire_time) +{ + this->metric_inc(refcountcache_total_inserts_stat, 1); + size += sizeof(C); + // Remove any colliding entries + this->erase(key); + + // if we are full, and can't make space-- then don't store the item + if (this->is_full() && !this->reserve(size)) { + Debug("refcountcache", "partition %d is full-- not storing item key=%ld", this->part_num, key); + this->metric_inc(refcountcache_total_failed_inserts_stat, 1); + return; + } + + // Create our value-- which has a ref to the `item` + RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc(); + val->set(item, key, size, expire_time); + + // add expiry_entry to expiry queue, if the expire time is positive (otherwise it means don't expire) + if (expire_time >= 0) { + Debug("refcountcache", "partition %d adding entry with expire_time=%d\n", this->part_num, expire_time); + PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = expiryQueueEntry.alloc(); + new ((void *)expiry_entry) PriorityQueueEntry<RefCountCacheHashEntry *>(val); + expiry_queue.push(expiry_entry); + val->expiry_entry = expiry_entry; + } + + // add the item to the map + this->item_map.insert(val); + this->size += val->meta.size; + this->items++; + this->metric_inc(refcountcache_current_size_stat, (int64_t)val->meta.size); + this->metric_inc(refcountcache_current_items_stat, 1); +} + +template <class C> +void +RefCountCachePartition<C>::erase(uint64_t key, int expiry_time) +{ + location_type l = this->item_map.find(key); + if (l.isValid()) { + if (expiry_time >= 0 && l.m_value->meta.expiry_time != expiry_time) { + return; + } + // found + this->item_map.remove(l); + + // decrement usage counters + this->size -= l.m_value->meta.size; + this->items--; + + this->metric_inc(refcountcache_current_size_stat, -((int64_t)l.m_value->meta.size)); + this->metric_inc(refcountcache_current_items_stat, -1); + + // remove from expiry queue + if (l.m_value->expiry_entry != NULL) { + Debug("refcountcache", "partition %d deleting item from expiry_queue idx=%d\n", this->part_num, + l.m_value->expiry_entry->index); + this->expiry_queue.erase(l.m_value->expiry_entry); + expiryQueueEntry.free(l.m_value->expiry_entry); + l.m_value->expiry_entry = NULL; // To avoid the destruction of `l` calling the destructor again-- and causing issues + } + // Since the Value is actually RefCountObj-- when this gets deleted normally it calls the wrong + // `free` method, this forces the delete/decr to happen with the right type + Ptr<C> *tmp = (Ptr<C> *)&l.m_value->item; + tmp->clear(); + refCountCacheHashingValueAllocator.free(l.m_value); + } +} + +template <class C> +void +RefCountCachePartition<C>::clear() +{ + this->metric_inc(refcountcache_current_size_stat, -this->size); + this->metric_inc(refcountcache_current_items_stat, -this->items); + // Clear the in memory hashmap + // TODO: delete all items (not sure if clear calls delete on all items) + this->item_map.clear(); + // clear the queue + PriorityQueueEntry<RefCountCacheHashEntry *> *tmp_entry; + while (!this->expiry_queue.empty()) { + tmp_entry = this->expiry_queue.top(); + this->expiry_queue.pop(); + expiryQueueEntry.free(tmp_entry); + } + + this->items = 0; + this->size = 0; +} + +// Are we full? +template <class C> +bool +RefCountCachePartition<C>::is_full() +{ + Debug("refcountcache", "partition %d is full? items %d/%d size %ld/%ld\n\n", this->part_num, this->items, this->max_items, + this->size, this->max_size); + return (this->max_items > 0 && this->items >= this->max_items) || (this->max_size > 0 && this->size >= this->max_size); +} + +// Attempt to make space for item of `size` +template <class C> +bool +RefCountCachePartition<C>::reserve(unsigned int size) +{ + int curr_time = Thread::get_hrtime() / HRTIME_SECOND; + while (this->is_full() || (size > 0 && this->size + size > this->max_size)) { + PriorityQueueEntry<RefCountCacheHashEntry *> *top_item = expiry_queue.top(); + // if there is nothing in the expiry queue, then we can't make space + if (top_item == NULL) { + return false; + } + + // If the first item has expired, lets evict it, and then go around again + if (top_item->node->meta.expiry_time < curr_time) { + this->erase(top_item->node->meta.key); + expiry_queue.pop(); + } else { // if the first item isn't expired-- the rest won't be either (queue is sorted) + return false; + } + } + return true; +} + +template <class C> +const size_t +RefCountCachePartition<C>::count() +{ + return this->items; +} + +template <class C> +void +RefCountCachePartition<C>::copy(Vec<RefCountCacheHashEntry *> &items) +{ + for (RefCountCachePartition<C>::iterator_type i = this->item_map.begin(); i != this->item_map.end(); ++i) { + RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc(); + val->set(i.m_value->item.get(), i.m_value->meta.key, i.m_value->meta.size, i.m_value->meta.expiry_time); + items.push_back(val); + } +} + +template <class C> +void +RefCountCachePartition<C>::metric_inc(RefCountCache_Stats metric_enum, int64_t data) +{ + if (this->rsb) { + RecIncrGlobalRawStatCount(this->rsb, metric_enum, data); + } +} + +template <class C> +TSHashTable<RefCountCacheHashing> * +RefCountCachePartition<C>::get_map() +{ + return &this->item_map; +} + +// The header for the cache, this is used to check if the serialized cache is compatible +// The implementation of this class must be in here, as this is used by template +// classes, and c++ rocks that way +class RefCountCacheHeader +{ +public: + unsigned int magic; + VersionNumber version; + VersionNumber object_version; // version passed in of whatever it is we are caching + + RefCountCacheHeader(VersionNumber object_version = VersionNumber()) + : magic(REFCOUNTCACHE_MAGIC_NUMBER), object_version(object_version) + { + this->version.ink_major = REFCOUNTCACHE_MAJOR_VERSION; + this->version.ink_minor = REFCOUNTCACHE_MINOR_VERSION; + }; + + bool + operator==(const RefCountCacheHeader other) const + { + return (this->magic == other.magic && this->version.ink_major == other.version.ink_major && + this->version.ink_minor == other.version.ink_minor && + this->object_version.ink_major == other.object_version.ink_major && + this->object_version.ink_minor == other.object_version.ink_minor); + } + + bool + compatible(RefCountCacheHeader *other) + { + return (this->magic == other->magic && this->version.ink_major == other->version.ink_major && + this->object_version.ink_major == other->version.ink_major); + }; +}; + +// This continuation is responsible for persisting RefCountCache to disk +// To avoid locking the partitions for a long time we'll do the following per-partition: +// - lock +// - copy ptrs (bump refcount) +// - unlock +// - persist +// - remove ptrs (drop refcount) +// This way we only have to hold the lock on the partition for the time it takes to get Ptr<>s to all items in the partition +template <class C> class RefCountCacheSync : public Continuation +{ +public: + typedef int (RefCountCacheSync::*CacheSyncHandler)(int, void *); + + size_t partition; // Current partition + C *cc; // Pointer to the entire cache + Continuation *cont; + + int copy_partition(int event, Event *e); + int write_partition(int event, Event *e); + int pause_event(int event, Event *e); + + // Create the tmp file on disk we'll be writing to + int initialize_storage(int event, Event *e); + // do the final mv and close of file handle + int finalize_sync(); + + // helper method to spin on writes to disk + int write_to_disk(char *i, int size); + + RefCountCacheSync(Continuation *acont, C *cc, int frequency, std::string dirname, std::string filename); + +private: + Vec<RefCountCacheHashEntry *> partition_items; + + int fd; // fd for the file we are writing to + + std::string dirname; + std::string filename; + std::string tmp_filename; + + ink_hrtime time_per_partition; + ink_hrtime start; + + int total_items; + int64_t total_size; + + RecRawStatBlock *rsb; + + SocketManager socket_manager; +}; + +template <class C> +int +RefCountCacheSync<C>::copy_partition(int event, Event *e) +{ + (void)event; + if (partition >= cc->partition_count()) { + int sync_ret = this->finalize_sync(); + if (sync_ret != 0) { + Warning("Unable to finalize sync of cache to disk %s: %d", this->filename.c_str(), sync_ret); + } + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + Debug("refcountcache", "RefCountCacheSync done"); + delete this; + return EVENT_DONE; + } + Debug("refcountcache", "sync partition=%ld/%ld", partition, cc->partition_count()); + // copy the partition into our buffer, then we'll let `pauseEvent` write it out + this->partition_items.reserve(cc->get_partition(partition).count()); + cc->get_partition(partition).copy(this->partition_items); + partition++; + SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::write_partition); + mutex = e->ethread->mutex; + e->schedule_imm(ET_TASK); + + return EVENT_CONT; +} + +template <class C> +int +RefCountCacheSync<C>::write_partition(int event, Event *e) +{ + (void)event; // unused + int curr_time = Thread::get_hrtime() / HRTIME_SECOND; + // write the partition to disk + // for item in this->partitionItems + // write to disk with headers per item + RefCountCacheHashEntry *it; + for (unsigned int i = 0; i < this->partition_items.length(); i++) { + it = this->partition_items[i]; + + // check if the item has expired, if so don't persist it to disk + if (it->meta.expiry_time < curr_time) { + continue; + } + + // Write the RefCountCacheItemMeta (as our header) + int ret = this->write_to_disk((char *)&it->meta, sizeof(it->meta)); + if (ret < 0) { + Warning("Error writing cache item header to %s: %d", this->tmp_filename.c_str(), ret); + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + delete this; + return EVENT_DONE; + } + // write the actual object now + ret = this->write_to_disk((char *)it->item.get(), it->meta.size); + if (ret < 0) { + Warning("Error writing cache item to %s: %d", this->tmp_filename.c_str(), ret); + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + delete this; + return EVENT_DONE; + } + + this->total_items++; + this->total_size += it->meta.size; + refCountCacheHashingValueAllocator.free(it); + } + + // Clear partition-- for the next user + this->partition_items.clear(); + + SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::pause_event); + + // Figure out how much time we spent + ink_hrtime elapsed = Thread::get_hrtime() - this->start; + ink_hrtime expected_elapsed = (this->partition * this->time_per_partition); + + // If we were quicker than our pace-- lets reschedule in the future + if (elapsed < expected_elapsed) { + e->schedule_in(expected_elapsed - elapsed, ET_TASK); + } else { // Otherwise we were too slow-- and need to go now! + e->schedule_imm(ET_TASK); + } + return EVENT_CONT; +} + +template <class C> +int +RefCountCacheSync<C>::pause_event(int event, Event *e) +{ + (void)event; + (void)e; + + // Schedule up the next partition + if (partition < cc->partition_count()) + mutex = cc->get_partition(partition).lock.get(); + else + mutex = cont->mutex; + SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::copy_partition); + e->schedule_imm(ET_TASK); + return EVENT_CONT; +} + +// Open the tmp file, etc. +template <class C> +int +RefCountCacheSync<C>::initialize_storage(int event, Event *e) +{ + (void)event; // unused + this->fd = this->socket_manager.open(this->tmp_filename.c_str(), O_TRUNC | O_RDWR | O_CREAT, 0644); // TODO: configurable perms + + if (this->fd <= 0) { + Warning("Unable to create temporary file %s, unable to persist hostdb: %d\n", this->tmp_filename.c_str(), this->fd); + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + delete this; + return EVENT_DONE; + } + + // Write out the header + int ret = this->write_to_disk((char *)&this->cc->get_header(), sizeof(RefCountCacheHeader)); + if (ret < 0) { + Warning("Error writing cache header to %s: %d", this->tmp_filename.c_str(), ret); + cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0); + delete this; + return EVENT_DONE; + } + + SET_HANDLER((CacheSyncHandler)&RefCountCacheSync::copy_partition); + e->schedule_imm(ET_TASK); + return EVENT_CONT; +} + +// do the final mv and close of file handle +template <class C> +int +RefCountCacheSync<C>::finalize_sync() +{ + // fsync the fd we have + int fsync_ret = this->socket_manager.fsync(this->fd); + if (fsync_ret != 0) { + return fsync_ret; + } + + int dir_fd = this->socket_manager.open(this->dirname.c_str(), O_DIRECTORY); // Correct permissions? + if (dir_fd <= 0) { + return dir_fd; + } + // move the file + int ret = rename(this->tmp_filename.c_str(), this->filename.c_str()); + + if (ret != 0) { + return ret; + } + + // fsync the dir + int fsync_dir_ret = this->socket_manager.fsync(dir_fd); + if (fsync_dir_ret != 0) { + return fsync_dir_ret; + } + + int dir_close_ret = this->socket_manager.close(dir_fd); + if (dir_close_ret != 0) { + return dir_close_ret; + } + + int close_ret = this->socket_manager.close(this->fd); + if (close_ret != 0) { + return close_ret; + } + + if (this->rsb) { + RecSetRawStatCount(this->rsb, refcountcache_last_sync_time, Thread::get_hrtime() / HRTIME_SECOND); + RecSetRawStatCount(this->rsb, refcountcache_last_total_items, this->total_items); + RecSetRawStatCount(this->rsb, refcountcache_last_total_size, this->total_size); + } + + return 0; +} + +// Write *i to this->fd, if there is an error we'll just stop this continuation +// TODO: reschedule the continuation if the disk was busy? +template <class C> +int +RefCountCacheSync<C>::write_to_disk(char *i, int size) +{ + int written = 0; + while (written < size) { + int ret = this->socket_manager.write(this->fd, i + written, size - written); --- End diff -- Because the man page also deals with writing to sockets which is where this applies. > Hostdb consistency problems due to MultiCache > --------------------------------------------- > > Key: TS-4331 > URL: https://issues.apache.org/jira/browse/TS-4331 > Project: Traffic Server > Issue Type: Bug > Components: HostDB > Reporter: Thomas Jackson > Assignee: Thomas Jackson > Fix For: 7.0.0 > > > This ticket is for the correct long term fix to TS-4207 > pulled from a comment, which wraps up the issue > {quote} > Leif Hedstrom I have spent a decent amount of time on this while I was OOO on > vacation the last couple of weeks. It seems that the root cause of this issue > has always existed, and that the addition of always doing hostname storing > (https://github.com/apache/trafficserver/commit/0e703e1e) we are just causing > the issue to happen all the time. > To understand the issue I'll give a little background in how hostdb is > currently working. Basically hostdb is just a wrapper around this templated > struct called MultiCache. MultiCache is "multi" not because it is templated, > but because it has two types of storage (static-- blocks and dynamic-- > alloc). The static side of the cache can hold N HostDBInfo structs (the > results of DNS queries). The dynamic side is used to store the round robin > records and various strings associated with the record. The size of this > dynamic space is defined as (N x [estimated_heap_bytes_per_entry. The basic > problem we are running into is that we are putting too much preassure on the > dynamic heap-- such that the heap is getting re-used while people still have > references to items in that space. > So, I've actually been working on re-writing MultiCache to allocate the > entire required block at once (so we don't have this problem where the parent > exists but not the children), but I'm not certain if we want such a change to > go into the 6.x branch (I'm willing to discuss if we want). If we aren't > comfortable with such a large change I suggest just accounting for the > hostname size in the estimated_heap_bytes_per_entry as a stopgap solution. > The maximum allowable size is 253 (so 254 with null terminator), but we could > pick a smaller number (~120 or so seems to be more reasonable). Alternatively > you can increase the number of records in hostdb (and the size accordingly) > to increase the dynamic heap size. > TLDR; almost done with the long term solution, but I'm not sure if we want to > merge that into 6.x-- alternatively we can do a simple workaround in 6.x > (https://github.com/apache/trafficserver/pull/553) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)