http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist.h new file mode 100644 index 0000000..af85be6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist.h @@ -0,0 +1,379 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Thread safety +// ------------- +// +// Writes require external synchronization, most likely a mutex. +// Reads require a guarantee that the SkipList will not be destroyed +// while the read is in progress. Apart from that, reads progress +// without any internal locking or synchronization. +// +// Invariants: +// +// (1) Allocated nodes are never deleted until the SkipList is +// destroyed. This is trivially guaranteed by the code since we +// never delete any skip list nodes. +// +// (2) The contents of a Node except for the next/prev pointers are +// immutable after the Node has been linked into the SkipList. +// Only Insert() modifies the list, and it is careful to initialize +// a node and use release-stores to publish the nodes in one or +// more lists. +// +// ... prev vs. next pointer ordering ... + +#include <assert.h> +#include <stdlib.h> +#include "port/port.h" +#include "util/arena.h" +#include "util/random.h" + +namespace leveldb { + +class Arena; + +template<typename Key, class Comparator> +class SkipList { + private: + struct Node; + + public: + // Create a new SkipList object that will use "cmp" for comparing keys, + // and will allocate memory using "*arena". Objects allocated in the arena + // must remain allocated for the lifetime of the skiplist object. + explicit SkipList(Comparator cmp, Arena* arena); + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + void Insert(const Key& key); + + // Returns true iff an entry that compares equal to key is in the list. + bool Contains(const Key& key) const; + + // Iteration over the contents of a skip list + class Iterator { + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator(const SkipList* list); + + // Returns true iff the iterator is positioned at a valid node. + bool Valid() const; + + // Returns the key at the current position. + // REQUIRES: Valid() + const Key& key() const; + + // Advances to the next position. + // REQUIRES: Valid() + void Next(); + + // Advances to the previous position. + // REQUIRES: Valid() + void Prev(); + + // Advance to the first entry with a key >= target + void Seek(const Key& target); + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToFirst(); + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToLast(); + + private: + const SkipList* list_; + Node* node_; + // Intentionally copyable + }; + + private: + enum { kMaxHeight = 12 }; + + // Immutable after construction + Comparator const compare_; + Arena* const arena_; // Arena used for allocations of nodes + + Node* const head_; + + // Modified only by Insert(). Read racily by readers, but stale + // values are ok. + port::AtomicPointer max_height_; // Height of the entire list + + inline int GetMaxHeight() const { + return static_cast<int>( + reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load())); + } + + // Read/written only by Insert(). + Random rnd_; + + Node* NewNode(const Key& key, int height); + int RandomHeight(); + bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } + + // Return true if key is greater than the data stored in "n" + bool KeyIsAfterNode(const Key& key, Node* n) const; + + // Return the earliest node that comes at or after key. + // Return NULL if there is no such node. + // + // If prev is non-NULL, fills prev[level] with pointer to previous + // node at "level" for every level in [0..max_height_-1]. + Node* FindGreaterOrEqual(const Key& key, Node** prev) const; + + // Return the latest node with a key < key. + // Return head_ if there is no such node. + Node* FindLessThan(const Key& key) const; + + // Return the last node in the list. + // Return head_ if list is empty. + Node* FindLast() const; + + // No copying allowed + SkipList(const SkipList&); + void operator=(const SkipList&); +}; + +// Implementation details follow +template<typename Key, class Comparator> +struct SkipList<Key,Comparator>::Node { + explicit Node(const Key& k) : key(k) { } + + Key const key; + + // Accessors/mutators for links. Wrapped in methods so we can + // add the appropriate barriers as necessary. + Node* Next(int n) { + assert(n >= 0); + // Use an 'acquire load' so that we observe a fully initialized + // version of the returned Node. + return reinterpret_cast<Node*>(next_[n].Acquire_Load()); + } + void SetNext(int n, Node* x) { + assert(n >= 0); + // Use a 'release store' so that anybody who reads through this + // pointer observes a fully initialized version of the inserted node. + next_[n].Release_Store(x); + } + + // No-barrier variants that can be safely used in a few locations. + Node* NoBarrier_Next(int n) { + assert(n >= 0); + return reinterpret_cast<Node*>(next_[n].NoBarrier_Load()); + } + void NoBarrier_SetNext(int n, Node* x) { + assert(n >= 0); + next_[n].NoBarrier_Store(x); + } + + private: + // Array of length equal to the node height. next_[0] is lowest level link. + port::AtomicPointer next_[1]; +}; + +template<typename Key, class Comparator> +typename SkipList<Key,Comparator>::Node* +SkipList<Key,Comparator>::NewNode(const Key& key, int height) { + char* mem = arena_->AllocateAligned( + sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); + return new (mem) Node(key); +} + +template<typename Key, class Comparator> +inline SkipList<Key,Comparator>::Iterator::Iterator(const SkipList* list) { + list_ = list; + node_ = NULL; +} + +template<typename Key, class Comparator> +inline bool SkipList<Key,Comparator>::Iterator::Valid() const { + return node_ != NULL; +} + +template<typename Key, class Comparator> +inline const Key& SkipList<Key,Comparator>::Iterator::key() const { + assert(Valid()); + return node_->key; +} + +template<typename Key, class Comparator> +inline void SkipList<Key,Comparator>::Iterator::Next() { + assert(Valid()); + node_ = node_->Next(0); +} + +template<typename Key, class Comparator> +inline void SkipList<Key,Comparator>::Iterator::Prev() { + // Instead of using explicit "prev" links, we just search for the + // last node that falls before key. + assert(Valid()); + node_ = list_->FindLessThan(node_->key); + if (node_ == list_->head_) { + node_ = NULL; + } +} + +template<typename Key, class Comparator> +inline void SkipList<Key,Comparator>::Iterator::Seek(const Key& target) { + node_ = list_->FindGreaterOrEqual(target, NULL); +} + +template<typename Key, class Comparator> +inline void SkipList<Key,Comparator>::Iterator::SeekToFirst() { + node_ = list_->head_->Next(0); +} + +template<typename Key, class Comparator> +inline void SkipList<Key,Comparator>::Iterator::SeekToLast() { + node_ = list_->FindLast(); + if (node_ == list_->head_) { + node_ = NULL; + } +} + +template<typename Key, class Comparator> +int SkipList<Key,Comparator>::RandomHeight() { + // Increase height with probability 1 in kBranching + static const unsigned int kBranching = 4; + int height = 1; + while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) { + height++; + } + assert(height > 0); + assert(height <= kMaxHeight); + return height; +} + +template<typename Key, class Comparator> +bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const { + // NULL n is considered infinite + return (n != NULL) && (compare_(n->key, key) < 0); +} + +template<typename Key, class Comparator> +typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) + const { + Node* x = head_; + int level = GetMaxHeight() - 1; + while (true) { + Node* next = x->Next(level); + if (KeyIsAfterNode(key, next)) { + // Keep searching in this list + x = next; + } else { + if (prev != NULL) prev[level] = x; + if (level == 0) { + return next; + } else { + // Switch to next list + level--; + } + } + } +} + +template<typename Key, class Comparator> +typename SkipList<Key,Comparator>::Node* +SkipList<Key,Comparator>::FindLessThan(const Key& key) const { + Node* x = head_; + int level = GetMaxHeight() - 1; + while (true) { + assert(x == head_ || compare_(x->key, key) < 0); + Node* next = x->Next(level); + if (next == NULL || compare_(next->key, key) >= 0) { + if (level == 0) { + return x; + } else { + // Switch to next list + level--; + } + } else { + x = next; + } + } +} + +template<typename Key, class Comparator> +typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast() + const { + Node* x = head_; + int level = GetMaxHeight() - 1; + while (true) { + Node* next = x->Next(level); + if (next == NULL) { + if (level == 0) { + return x; + } else { + // Switch to next list + level--; + } + } else { + x = next; + } + } +} + +template<typename Key, class Comparator> +SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena) + : compare_(cmp), + arena_(arena), + head_(NewNode(0 /* any key will do */, kMaxHeight)), + max_height_(reinterpret_cast<void*>(1)), + rnd_(0xdeadbeef) { + for (int i = 0; i < kMaxHeight; i++) { + head_->SetNext(i, NULL); + } +} + +template<typename Key, class Comparator> +void SkipList<Key,Comparator>::Insert(const Key& key) { + // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() + // here since Insert() is externally synchronized. + Node* prev[kMaxHeight]; + Node* x = FindGreaterOrEqual(key, prev); + + // Our data structure does not allow duplicate insertion + assert(x == NULL || !Equal(key, x->key)); + + int height = RandomHeight(); + if (height > GetMaxHeight()) { + for (int i = GetMaxHeight(); i < height; i++) { + prev[i] = head_; + } + //fprintf(stderr, "Change height from %d to %d\n", max_height_, height); + + // It is ok to mutate max_height_ without any synchronization + // with concurrent readers. A concurrent reader that observes + // the new value of max_height_ will see either the old value of + // new level pointers from head_ (NULL), or a new value set in + // the loop below. In the former case the reader will + // immediately drop to the next level since NULL sorts after all + // keys. In the latter case the reader will use the new node. + max_height_.NoBarrier_Store(reinterpret_cast<void*>(height)); + } + + x = NewNode(key, height); + for (int i = 0; i < height; i++) { + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); + prev[i]->SetNext(i, x); + } +} + +template<typename Key, class Comparator> +bool SkipList<Key,Comparator>::Contains(const Key& key) const { + Node* x = FindGreaterOrEqual(key, NULL); + if (x != NULL && Equal(key, x->key)) { + return true; + } else { + return false; + } +} + +} // namespace leveldb
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist_test.cc new file mode 100644 index 0000000..c78f4b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/skiplist_test.cc @@ -0,0 +1,378 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/skiplist.h" +#include <set> +#include "leveldb/env.h" +#include "util/arena.h" +#include "util/hash.h" +#include "util/random.h" +#include "util/testharness.h" + +namespace leveldb { + +typedef uint64_t Key; + +struct Comparator { + int operator()(const Key& a, const Key& b) const { + if (a < b) { + return -1; + } else if (a > b) { + return +1; + } else { + return 0; + } + } +}; + +class SkipTest { }; + +TEST(SkipTest, Empty) { + Arena arena; + Comparator cmp; + SkipList<Key, Comparator> list(cmp, &arena); + ASSERT_TRUE(!list.Contains(10)); + + SkipList<Key, Comparator>::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToFirst(); + ASSERT_TRUE(!iter.Valid()); + iter.Seek(100); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToLast(); + ASSERT_TRUE(!iter.Valid()); +} + +TEST(SkipTest, InsertAndLookup) { + const int N = 2000; + const int R = 5000; + Random rnd(1000); + std::set<Key> keys; + Arena arena; + Comparator cmp; + SkipList<Key, Comparator> list(cmp, &arena); + for (int i = 0; i < N; i++) { + Key key = rnd.Next() % R; + if (keys.insert(key).second) { + list.Insert(key); + } + } + + for (int i = 0; i < R; i++) { + if (list.Contains(i)) { + ASSERT_EQ(keys.count(i), 1); + } else { + ASSERT_EQ(keys.count(i), 0); + } + } + + // Simple iterator tests + { + SkipList<Key, Comparator>::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + + iter.Seek(0); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), iter.key()); + + iter.SeekToFirst(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), iter.key()); + + iter.SeekToLast(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), iter.key()); + } + + // Forward iteration test + for (int i = 0; i < R; i++) { + SkipList<Key, Comparator>::Iterator iter(&list); + iter.Seek(i); + + // Compare against model iterator + std::set<Key>::iterator model_iter = keys.lower_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.end()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, iter.key()); + ++model_iter; + iter.Next(); + } + } + } + + // Backward iteration test + { + SkipList<Key, Comparator>::Iterator iter(&list); + iter.SeekToLast(); + + // Compare against model iterator + for (std::set<Key>::reverse_iterator model_iter = keys.rbegin(); + model_iter != keys.rend(); + ++model_iter) { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, iter.key()); + iter.Prev(); + } + ASSERT_TRUE(!iter.Valid()); + } +} + +// We want to make sure that with a single writer and multiple +// concurrent readers (with no synchronization other than when a +// reader's iterator is created), the reader always observes all the +// data that was present in the skip list when the iterator was +// constructor. Because insertions are happening concurrently, we may +// also observe new values that were inserted since the iterator was +// constructed, but we should never miss any values that were present +// at iterator construction time. +// +// We generate multi-part keys: +// <key,gen,hash> +// where: +// key is in range [0..K-1] +// gen is a generation number for key +// hash is hash(key,gen) +// +// The insertion code picks a random key, sets gen to be 1 + the last +// generation number inserted for that key, and sets hash to Hash(key,gen). +// +// At the beginning of a read, we snapshot the last inserted +// generation number for each key. We then iterate, including random +// calls to Next() and Seek(). For every key we encounter, we +// check that it is either expected given the initial snapshot or has +// been concurrently added since the iterator started. +class ConcurrentTest { + private: + static const uint32_t K = 4; + + static uint64_t key(Key key) { return (key >> 40); } + static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } + static uint64_t hash(Key key) { return key & 0xff; } + + static uint64_t HashNumbers(uint64_t k, uint64_t g) { + uint64_t data[2] = { k, g }; + return Hash(reinterpret_cast<char*>(data), sizeof(data), 0); + } + + static Key MakeKey(uint64_t k, uint64_t g) { + assert(sizeof(Key) == sizeof(uint64_t)); + assert(k <= K); // We sometimes pass K to seek to the end of the skiplist + assert(g <= 0xffffffffu); + return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); + } + + static bool IsValidKey(Key k) { + return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); + } + + static Key RandomTarget(Random* rnd) { + switch (rnd->Next() % 10) { + case 0: + // Seek to beginning + return MakeKey(0, 0); + case 1: + // Seek to end + return MakeKey(K, 0); + default: + // Seek to middle + return MakeKey(rnd->Next() % K, 0); + } + } + + // Per-key generation + struct State { + port::AtomicPointer generation[K]; + void Set(int k, intptr_t v) { + generation[k].Release_Store(reinterpret_cast<void*>(v)); + } + intptr_t Get(int k) { + return reinterpret_cast<intptr_t>(generation[k].Acquire_Load()); + } + + State() { + for (int k = 0; k < K; k++) { + Set(k, 0); + } + } + }; + + // Current state of the test + State current_; + + Arena arena_; + + // SkipList is not protected by mu_. We just use a single writer + // thread to modify it. + SkipList<Key, Comparator> list_; + + public: + ConcurrentTest() : list_(Comparator(), &arena_) { } + + // REQUIRES: External synchronization + void WriteStep(Random* rnd) { + const uint32_t k = rnd->Next() % K; + const intptr_t g = current_.Get(k) + 1; + const Key key = MakeKey(k, g); + list_.Insert(key); + current_.Set(k, g); + } + + void ReadStep(Random* rnd) { + // Remember the initial committed state of the skiplist. + State initial_state; + for (int k = 0; k < K; k++) { + initial_state.Set(k, current_.Get(k)); + } + + Key pos = RandomTarget(rnd); + SkipList<Key, Comparator>::Iterator iter(&list_); + iter.Seek(pos); + while (true) { + Key current; + if (!iter.Valid()) { + current = MakeKey(K, 0); + } else { + current = iter.key(); + ASSERT_TRUE(IsValidKey(current)) << current; + } + ASSERT_LE(pos, current) << "should not go backwards"; + + // Verify that everything in [pos,current) was not present in + // initial_state. + while (pos < current) { + ASSERT_LT(key(pos), K) << pos; + + // Note that generation 0 is never inserted, so it is ok if + // <*,0,*> is missing. + ASSERT_TRUE((gen(pos) == 0) || + (gen(pos) > initial_state.Get(key(pos))) + ) << "key: " << key(pos) + << "; gen: " << gen(pos) + << "; initgen: " + << initial_state.Get(key(pos)); + + // Advance to next key in the valid key space + if (key(pos) < key(current)) { + pos = MakeKey(key(pos) + 1, 0); + } else { + pos = MakeKey(key(pos), gen(pos) + 1); + } + } + + if (!iter.Valid()) { + break; + } + + if (rnd->Next() % 2) { + iter.Next(); + pos = MakeKey(key(pos), gen(pos) + 1); + } else { + Key new_target = RandomTarget(rnd); + if (new_target > pos) { + pos = new_target; + iter.Seek(new_target); + } + } + } + } +}; +const uint32_t ConcurrentTest::K; + +// Simple test that does single-threaded testing of the ConcurrentTest +// scaffolding. +TEST(SkipTest, ConcurrentWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + test.WriteStep(&rnd); + } +} + +class TestState { + public: + ConcurrentTest t_; + int seed_; + port::AtomicPointer quit_flag_; + + enum ReaderState { + STARTING, + RUNNING, + DONE + }; + + explicit TestState(int s) + : seed_(s), + quit_flag_(NULL), + state_(STARTING), + state_cv_(&mu_) {} + + void Wait(ReaderState s) { + mu_.Lock(); + while (state_ != s) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + void Change(ReaderState s) { + mu_.Lock(); + state_ = s; + state_cv_.Signal(); + mu_.Unlock(); + } + + private: + port::Mutex mu_; + ReaderState state_; + port::CondVar state_cv_; +}; + +static void ConcurrentReader(void* arg) { + TestState* state = reinterpret_cast<TestState*>(arg); + Random rnd(state->seed_); + int64_t reads = 0; + state->Change(TestState::RUNNING); + while (!state->quit_flag_.Acquire_Load()) { + state->t_.ReadStep(&rnd); + ++reads; + } + state->Change(TestState::DONE); +} + +static void RunConcurrent(int run) { + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int i = 0; i < kSize; i++) { + state.t_.WriteStep(&rnd); + } + state.quit_flag_.Release_Store(&state); // Any non-NULL arg will do + state.Wait(TestState::DONE); + } +} + +TEST(SkipTest, Concurrent1) { RunConcurrent(1); } +TEST(SkipTest, Concurrent2) { RunConcurrent(2); } +TEST(SkipTest, Concurrent3) { RunConcurrent(3); } +TEST(SkipTest, Concurrent4) { RunConcurrent(4); } +TEST(SkipTest, Concurrent5) { RunConcurrent(5); } + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/snapshot.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/snapshot.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/snapshot.h new file mode 100644 index 0000000..e7f8fd2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/snapshot.h @@ -0,0 +1,66 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_SNAPSHOT_H_ +#define STORAGE_LEVELDB_DB_SNAPSHOT_H_ + +#include "leveldb/db.h" + +namespace leveldb { + +class SnapshotList; + +// Snapshots are kept in a doubly-linked list in the DB. +// Each SnapshotImpl corresponds to a particular sequence number. +class SnapshotImpl : public Snapshot { + public: + SequenceNumber number_; // const after creation + + private: + friend class SnapshotList; + + // SnapshotImpl is kept in a doubly-linked circular list + SnapshotImpl* prev_; + SnapshotImpl* next_; + + SnapshotList* list_; // just for sanity checks +}; + +class SnapshotList { + public: + SnapshotList() { + list_.prev_ = &list_; + list_.next_ = &list_; + } + + bool empty() const { return list_.next_ == &list_; } + SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } + SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } + + const SnapshotImpl* New(SequenceNumber seq) { + SnapshotImpl* s = new SnapshotImpl; + s->number_ = seq; + s->list_ = this; + s->next_ = &list_; + s->prev_ = list_.prev_; + s->prev_->next_ = s; + s->next_->prev_ = s; + return s; + } + + void Delete(const SnapshotImpl* s) { + assert(s->list_ == this); + s->prev_->next_ = s->next_; + s->next_->prev_ = s->prev_; + delete s; + } + + private: + // Dummy head of doubly-linked list of snapshots + SnapshotImpl list_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_SNAPSHOT_H_ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.cc new file mode 100644 index 0000000..e3d82cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.cc @@ -0,0 +1,127 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/table_cache.h" + +#include "db/filename.h" +#include "leveldb/env.h" +#include "leveldb/table.h" +#include "util/coding.h" + +namespace leveldb { + +struct TableAndFile { + RandomAccessFile* file; + Table* table; +}; + +static void DeleteEntry(const Slice& key, void* value) { + TableAndFile* tf = reinterpret_cast<TableAndFile*>(value); + delete tf->table; + delete tf->file; + delete tf; +} + +static void UnrefEntry(void* arg1, void* arg2) { + Cache* cache = reinterpret_cast<Cache*>(arg1); + Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2); + cache->Release(h); +} + +TableCache::TableCache(const std::string& dbname, + const Options* options, + int entries) + : env_(options->env), + dbname_(dbname), + options_(options), + cache_(NewLRUCache(entries)) { +} + +TableCache::~TableCache() { + delete cache_; +} + +Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, + Cache::Handle** handle) { + Status s; + char buf[sizeof(file_number)]; + EncodeFixed64(buf, file_number); + Slice key(buf, sizeof(buf)); + *handle = cache_->Lookup(key); + if (*handle == NULL) { + std::string fname = TableFileName(dbname_, file_number); + RandomAccessFile* file = NULL; + Table* table = NULL; + s = env_->NewRandomAccessFile(fname, &file); + if (!s.ok()) { + std::string old_fname = SSTTableFileName(dbname_, file_number); + if (env_->NewRandomAccessFile(old_fname, &file).ok()) { + s = Status::OK(); + } + } + if (s.ok()) { + s = Table::Open(*options_, file, file_size, &table); + } + + if (!s.ok()) { + assert(table == NULL); + delete file; + // We do not cache error results so that if the error is transient, + // or somebody repairs the file, we recover automatically. + } else { + TableAndFile* tf = new TableAndFile; + tf->file = file; + tf->table = table; + *handle = cache_->Insert(key, tf, 1, &DeleteEntry); + } + } + return s; +} + +Iterator* TableCache::NewIterator(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + Table** tableptr) { + if (tableptr != NULL) { + *tableptr = NULL; + } + + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (!s.ok()) { + return NewErrorIterator(s); + } + + Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; + Iterator* result = table->NewIterator(options); + result->RegisterCleanup(&UnrefEntry, cache_, handle); + if (tableptr != NULL) { + *tableptr = table; + } + return result; +} + +Status TableCache::Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*saver)(void*, const Slice&, const Slice&)) { + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (s.ok()) { + Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; + s = t->InternalGet(options, k, arg, saver); + cache_->Release(handle); + } + return s; +} + +void TableCache::Evict(uint64_t file_number) { + char buf[sizeof(file_number)]; + EncodeFixed64(buf, file_number); + cache_->Erase(Slice(buf, sizeof(buf))); +} + +} // namespace leveldb http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.h new file mode 100644 index 0000000..8cf4aaf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/table_cache.h @@ -0,0 +1,61 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Thread-safe (provides internal synchronization) + +#ifndef STORAGE_LEVELDB_DB_TABLE_CACHE_H_ +#define STORAGE_LEVELDB_DB_TABLE_CACHE_H_ + +#include <string> +#include <stdint.h> +#include "db/dbformat.h" +#include "leveldb/cache.h" +#include "leveldb/table.h" +#include "port/port.h" + +namespace leveldb { + +class Env; + +class TableCache { + public: + TableCache(const std::string& dbname, const Options* options, int entries); + ~TableCache(); + + // Return an iterator for the specified file number (the corresponding + // file length must be exactly "file_size" bytes). If "tableptr" is + // non-NULL, also sets "*tableptr" to point to the Table object + // underlying the returned iterator, or NULL if no Table object underlies + // the returned iterator. The returned "*tableptr" object is owned by + // the cache and should not be deleted, and is valid for as long as the + // returned iterator is live. + Iterator* NewIterator(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + Table** tableptr = NULL); + + // If a seek to internal key "k" in specified file finds an entry, + // call (*handle_result)(arg, found_key, found_value). + Status Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*handle_result)(void*, const Slice&, const Slice&)); + + // Evict any entry for the specified file number + void Evict(uint64_t file_number); + + private: + Env* const env_; + const std::string dbname_; + const Options* options_; + Cache* cache_; + + Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_TABLE_CACHE_H_ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.cc new file mode 100644 index 0000000..f10a2d5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.cc @@ -0,0 +1,266 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_edit.h" + +#include "db/version_set.h" +#include "util/coding.h" + +namespace leveldb { + +// Tag numbers for serialized VersionEdit. These numbers are written to +// disk and should not be changed. +enum Tag { + kComparator = 1, + kLogNumber = 2, + kNextFileNumber = 3, + kLastSequence = 4, + kCompactPointer = 5, + kDeletedFile = 6, + kNewFile = 7, + // 8 was used for large value refs + kPrevLogNumber = 9 +}; + +void VersionEdit::Clear() { + comparator_.clear(); + log_number_ = 0; + prev_log_number_ = 0; + last_sequence_ = 0; + next_file_number_ = 0; + has_comparator_ = false; + has_log_number_ = false; + has_prev_log_number_ = false; + has_next_file_number_ = false; + has_last_sequence_ = false; + deleted_files_.clear(); + new_files_.clear(); +} + +void VersionEdit::EncodeTo(std::string* dst) const { + if (has_comparator_) { + PutVarint32(dst, kComparator); + PutLengthPrefixedSlice(dst, comparator_); + } + if (has_log_number_) { + PutVarint32(dst, kLogNumber); + PutVarint64(dst, log_number_); + } + if (has_prev_log_number_) { + PutVarint32(dst, kPrevLogNumber); + PutVarint64(dst, prev_log_number_); + } + if (has_next_file_number_) { + PutVarint32(dst, kNextFileNumber); + PutVarint64(dst, next_file_number_); + } + if (has_last_sequence_) { + PutVarint32(dst, kLastSequence); + PutVarint64(dst, last_sequence_); + } + + for (size_t i = 0; i < compact_pointers_.size(); i++) { + PutVarint32(dst, kCompactPointer); + PutVarint32(dst, compact_pointers_[i].first); // level + PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode()); + } + + for (DeletedFileSet::const_iterator iter = deleted_files_.begin(); + iter != deleted_files_.end(); + ++iter) { + PutVarint32(dst, kDeletedFile); + PutVarint32(dst, iter->first); // level + PutVarint64(dst, iter->second); // file number + } + + for (size_t i = 0; i < new_files_.size(); i++) { + const FileMetaData& f = new_files_[i].second; + PutVarint32(dst, kNewFile); + PutVarint32(dst, new_files_[i].first); // level + PutVarint64(dst, f.number); + PutVarint64(dst, f.file_size); + PutLengthPrefixedSlice(dst, f.smallest.Encode()); + PutLengthPrefixedSlice(dst, f.largest.Encode()); + } +} + +static bool GetInternalKey(Slice* input, InternalKey* dst) { + Slice str; + if (GetLengthPrefixedSlice(input, &str)) { + dst->DecodeFrom(str); + return true; + } else { + return false; + } +} + +static bool GetLevel(Slice* input, int* level) { + uint32_t v; + if (GetVarint32(input, &v) && + v < config::kNumLevels) { + *level = v; + return true; + } else { + return false; + } +} + +Status VersionEdit::DecodeFrom(const Slice& src) { + Clear(); + Slice input = src; + const char* msg = NULL; + uint32_t tag; + + // Temporary storage for parsing + int level; + uint64_t number; + FileMetaData f; + Slice str; + InternalKey key; + + while (msg == NULL && GetVarint32(&input, &tag)) { + switch (tag) { + case kComparator: + if (GetLengthPrefixedSlice(&input, &str)) { + comparator_ = str.ToString(); + has_comparator_ = true; + } else { + msg = "comparator name"; + } + break; + + case kLogNumber: + if (GetVarint64(&input, &log_number_)) { + has_log_number_ = true; + } else { + msg = "log number"; + } + break; + + case kPrevLogNumber: + if (GetVarint64(&input, &prev_log_number_)) { + has_prev_log_number_ = true; + } else { + msg = "previous log number"; + } + break; + + case kNextFileNumber: + if (GetVarint64(&input, &next_file_number_)) { + has_next_file_number_ = true; + } else { + msg = "next file number"; + } + break; + + case kLastSequence: + if (GetVarint64(&input, &last_sequence_)) { + has_last_sequence_ = true; + } else { + msg = "last sequence number"; + } + break; + + case kCompactPointer: + if (GetLevel(&input, &level) && + GetInternalKey(&input, &key)) { + compact_pointers_.push_back(std::make_pair(level, key)); + } else { + msg = "compaction pointer"; + } + break; + + case kDeletedFile: + if (GetLevel(&input, &level) && + GetVarint64(&input, &number)) { + deleted_files_.insert(std::make_pair(level, number)); + } else { + msg = "deleted file"; + } + break; + + case kNewFile: + if (GetLevel(&input, &level) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest)) { + new_files_.push_back(std::make_pair(level, f)); + } else { + msg = "new-file entry"; + } + break; + + default: + msg = "unknown tag"; + break; + } + } + + if (msg == NULL && !input.empty()) { + msg = "invalid tag"; + } + + Status result; + if (msg != NULL) { + result = Status::Corruption("VersionEdit", msg); + } + return result; +} + +std::string VersionEdit::DebugString() const { + std::string r; + r.append("VersionEdit {"); + if (has_comparator_) { + r.append("\n Comparator: "); + r.append(comparator_); + } + if (has_log_number_) { + r.append("\n LogNumber: "); + AppendNumberTo(&r, log_number_); + } + if (has_prev_log_number_) { + r.append("\n PrevLogNumber: "); + AppendNumberTo(&r, prev_log_number_); + } + if (has_next_file_number_) { + r.append("\n NextFile: "); + AppendNumberTo(&r, next_file_number_); + } + if (has_last_sequence_) { + r.append("\n LastSeq: "); + AppendNumberTo(&r, last_sequence_); + } + for (size_t i = 0; i < compact_pointers_.size(); i++) { + r.append("\n CompactPointer: "); + AppendNumberTo(&r, compact_pointers_[i].first); + r.append(" "); + r.append(compact_pointers_[i].second.DebugString()); + } + for (DeletedFileSet::const_iterator iter = deleted_files_.begin(); + iter != deleted_files_.end(); + ++iter) { + r.append("\n DeleteFile: "); + AppendNumberTo(&r, iter->first); + r.append(" "); + AppendNumberTo(&r, iter->second); + } + for (size_t i = 0; i < new_files_.size(); i++) { + const FileMetaData& f = new_files_[i].second; + r.append("\n AddFile: "); + AppendNumberTo(&r, new_files_[i].first); + r.append(" "); + AppendNumberTo(&r, f.number); + r.append(" "); + AppendNumberTo(&r, f.file_size); + r.append(" "); + r.append(f.smallest.DebugString()); + r.append(" .. "); + r.append(f.largest.DebugString()); + } + r.append("\n}\n"); + return r; +} + +} // namespace leveldb http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.h new file mode 100644 index 0000000..eaef77b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit.h @@ -0,0 +1,107 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_VERSION_EDIT_H_ +#define STORAGE_LEVELDB_DB_VERSION_EDIT_H_ + +#include <set> +#include <utility> +#include <vector> +#include "db/dbformat.h" + +namespace leveldb { + +class VersionSet; + +struct FileMetaData { + int refs; + int allowed_seeks; // Seeks allowed until compaction + uint64_t number; + uint64_t file_size; // File size in bytes + InternalKey smallest; // Smallest internal key served by table + InternalKey largest; // Largest internal key served by table + + FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } +}; + +class VersionEdit { + public: + VersionEdit() { Clear(); } + ~VersionEdit() { } + + void Clear(); + + void SetComparatorName(const Slice& name) { + has_comparator_ = true; + comparator_ = name.ToString(); + } + void SetLogNumber(uint64_t num) { + has_log_number_ = true; + log_number_ = num; + } + void SetPrevLogNumber(uint64_t num) { + has_prev_log_number_ = true; + prev_log_number_ = num; + } + void SetNextFile(uint64_t num) { + has_next_file_number_ = true; + next_file_number_ = num; + } + void SetLastSequence(SequenceNumber seq) { + has_last_sequence_ = true; + last_sequence_ = seq; + } + void SetCompactPointer(int level, const InternalKey& key) { + compact_pointers_.push_back(std::make_pair(level, key)); + } + + // Add the specified file at the specified number. + // REQUIRES: This version has not been saved (see VersionSet::SaveTo) + // REQUIRES: "smallest" and "largest" are smallest and largest keys in file + void AddFile(int level, uint64_t file, + uint64_t file_size, + const InternalKey& smallest, + const InternalKey& largest) { + FileMetaData f; + f.number = file; + f.file_size = file_size; + f.smallest = smallest; + f.largest = largest; + new_files_.push_back(std::make_pair(level, f)); + } + + // Delete the specified "file" from the specified "level". + void DeleteFile(int level, uint64_t file) { + deleted_files_.insert(std::make_pair(level, file)); + } + + void EncodeTo(std::string* dst) const; + Status DecodeFrom(const Slice& src); + + std::string DebugString() const; + + private: + friend class VersionSet; + + typedef std::set< std::pair<int, uint64_t> > DeletedFileSet; + + std::string comparator_; + uint64_t log_number_; + uint64_t prev_log_number_; + uint64_t next_file_number_; + SequenceNumber last_sequence_; + bool has_comparator_; + bool has_log_number_; + bool has_prev_log_number_; + bool has_next_file_number_; + bool has_last_sequence_; + + std::vector< std::pair<int, InternalKey> > compact_pointers_; + DeletedFileSet deleted_files_; + std::vector< std::pair<int, FileMetaData> > new_files_; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_VERSION_EDIT_H_ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6419f4/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit_test.cc new file mode 100644 index 0000000..280310b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/version_edit_test.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_edit.h" +#include "util/testharness.h" + +namespace leveldb { + +static void TestEncodeDecode(const VersionEdit& edit) { + std::string encoded, encoded2; + edit.EncodeTo(&encoded); + VersionEdit parsed; + Status s = parsed.DecodeFrom(encoded); + ASSERT_TRUE(s.ok()) << s.ToString(); + parsed.EncodeTo(&encoded2); + ASSERT_EQ(encoded, encoded2); +} + +class VersionEditTest { }; + +TEST(VersionEditTest, EncodeDecode) { + static const uint64_t kBig = 1ull << 50; + + VersionEdit edit; + for (int i = 0; i < 4; i++) { + TestEncodeDecode(edit); + edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, + InternalKey("foo", kBig + 500 + i, kTypeValue), + InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + edit.DeleteFile(4, kBig + 700 + i); + edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); + } + + edit.SetComparatorName("foo"); + edit.SetLogNumber(kBig + 100); + edit.SetNextFile(kBig + 200); + edit.SetLastSequence(kBig + 1000); + TestEncodeDecode(edit); +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +}