This is an automated email from the ASF dual-hosted git repository. pcmoritz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 0c55b25 ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to external store 0c55b25 is described below commit 0c55b25c84119af59320eab0b0625da9ce987294 Author: Anurag Khandelwal <anur...@berkeley.edu> AuthorDate: Mon Feb 4 18:24:13 2019 -0800 ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to external store https://issues.apache.org/jira/browse/ARROW-4294 Note: this PR was previously at https://github.com/apache/arrow/pull/3432, which was closed since its commit history was broken. Currently, when Plasma needs storage space for additional objects, it evicts objects by deleting them from the Plasma store. This is a problem when it isn't possible to reconstruct the object or reconstructing it is expensive. This patch adds support for a pluggable external store that Plasma can evict objects to when it runs out of memory. Author: Anurag Khandelwal <anur...@berkeley.edu> Author: Philipp Moritz <pcmor...@gmail.com> Closes #3482 from anuragkh/plasma_evict_to_external_store and squashes the following commits: 631671561 <Philipp Moritz> remove external store worker, simplify interface 6fbc55b08 <Anurag Khandelwal> Revert "Add an eviction buffer to allow asynchronous evictions" 4f2c02ce3 <Anurag Khandelwal> Revert "Minor fix" 1bc1dbed4 <Anurag Khandelwal> Revert "format fix" 7b662bee4 <Anurag Khandelwal> Revert "Remove timeout for external store test tearDown" 25663df30 <Anurag Khandelwal> Remove timeout for external store test tearDown 7945cc951 <Anurag Khandelwal> format fix 0d7263936 <Anurag Khandelwal> Minor fix 957efb5f0 <Anurag Khandelwal> Add an eviction buffer to allow asynchronous evictions 896d895bd <Anurag Khandelwal> Fixes 7ae486794 <Anurag Khandelwal> Merge branch 'master' into plasma_evict_to_external_store 1af2f8bce <Anurag Khandelwal> Fix cpplint issues 04e173085 <Anurag Khandelwal> Merge branch 'master' into plasma_evict_to_external_store 301e575ea <Anurag Khandelwal> Fix uses of ARROW_CHECK_OK/ARROW_CHECK 69a56abcc <Anurag Khandelwal> Fix documentation errrors c19c5767d <Anurag Khandelwal> Add documentation for notify flag f3fad8086 <Anurag Khandelwal> Fix external store worker intialization 9081596c4 <Anurag Khandelwal> Clean up formatting issues f5cc95c72 <Anurag Khandelwal> Add lint exclusion for external_store_worker, since it uses mutex ffd1f0e6c <Anurag Khandelwal> Extend plasma eviction changes to python module 8afc9fb2f <Anurag Khandelwal> Kill only the plasma_store_server that we started be315677b <Anurag Khandelwal> Add test for testing evictions/unevictions a43445aee <Anurag Khandelwal> Update serialization test 58a995318 <Anurag Khandelwal> Add support for evicting/un-evicting Plasma objects to/from external store --- cpp/src/plasma/CMakeLists.txt | 7 +- cpp/src/plasma/common.h | 4 +- cpp/src/plasma/external_store.cc | 63 +++++++++ cpp/src/plasma/external_store.h | 123 ++++++++++++++++ cpp/src/plasma/hash_table_store.cc | 58 ++++++++ cpp/src/plasma/hash_table_store.h | 53 +++++++ cpp/src/plasma/store.cc | 210 +++++++++++++++++++++------- cpp/src/plasma/store.h | 23 ++- cpp/src/plasma/test/client_tests.cc | 15 +- cpp/src/plasma/test/external_store_tests.cc | 139 ++++++++++++++++++ python/pyarrow/_plasma.pyx | 1 + python/pyarrow/plasma.py | 6 +- python/pyarrow/tests/test_plasma.py | 58 ++++++++ 13 files changed, 692 insertions(+), 68 deletions(-) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 53af8c5..fd25aef 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -125,9 +125,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "gcc") " -Wno-conversion") endif() +list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" "hash_table_store.cc") + # We use static libraries for the plasma_store_server executable so that it can # be copied around and used in different locations. -add_executable(plasma_store_server store.cc) +add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc) target_link_libraries(plasma_store_server plasma_static ${PLASMA_STATIC_LINK_LIBS}) add_dependencies(plasma plasma_store_server) @@ -214,3 +216,6 @@ ADD_PLASMA_TEST(test/serialization_tests ADD_PLASMA_TEST(test/client_tests EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS} EXTRA_DEPENDENCIES plasma_store_server) +ADD_PLASMA_TEST(test/external_store_tests + EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS} + EXTRA_DEPENDENCIES plasma_store_server) diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index dfbd90c..6f4cef5 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -69,7 +69,9 @@ enum class ObjectState : int { /// Object was created but not sealed in the local Plasma Store. PLASMA_CREATED = 1, /// Object is sealed and stored in the local Plasma Store. - PLASMA_SEALED + PLASMA_SEALED = 2, + /// Object is evicted to external store. + PLASMA_EVICTED = 3, }; namespace internal { diff --git a/cpp/src/plasma/external_store.cc b/cpp/src/plasma/external_store.cc new file mode 100644 index 0000000..8cfbad1 --- /dev/null +++ b/cpp/src/plasma/external_store.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <iostream> +#include <sstream> + +#include "arrow/util/memory.h" + +#include "plasma/external_store.h" + +namespace plasma { + +Status ExternalStores::ExtractStoreName(const std::string& endpoint, + std::string* store_name) { + size_t off = endpoint.find_first_of(':'); + if (off == std::string::npos) { + return Status::Invalid("Malformed endpoint " + endpoint); + } + *store_name = endpoint.substr(0, off); + return Status::OK(); +} + +void ExternalStores::RegisterStore(const std::string& store_name, + std::shared_ptr<ExternalStore> store) { + Stores().insert({store_name, store}); +} + +void ExternalStores::DeregisterStore(const std::string& store_name) { + auto it = Stores().find(store_name); + if (it == Stores().end()) { + return; + } + Stores().erase(it); +} + +std::shared_ptr<ExternalStore> ExternalStores::GetStore(const std::string& store_name) { + auto it = Stores().find(store_name); + if (it == Stores().end()) { + return nullptr; + } + return it->second; +} + +ExternalStores::StoreMap& ExternalStores::Stores() { + static auto* external_stores = new StoreMap(); + return *external_stores; +} + +} // namespace plasma diff --git a/cpp/src/plasma/external_store.h b/cpp/src/plasma/external_store.h new file mode 100644 index 0000000..feca466 --- /dev/null +++ b/cpp/src/plasma/external_store.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef EXTERNAL_STORE_H +#define EXTERNAL_STORE_H + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "plasma/client.h" + +namespace plasma { + +// ==== The external store ==== +// +// This file contains declaration for all functions that need to be implemented +// for an external storage service so that objects evicted from Plasma store +// can be written to it. + +class ExternalStore { + public: + /// Default constructor. + ExternalStore() = default; + + /// Virtual destructor. + virtual ~ExternalStore() = default; + + /// Connect to the local plasma store. Return the resulting connection. + /// + /// \param endpoint The name of the endpoint to connect to the external + /// storage service. While the formatting of the endpoint name is + /// specific to the implementation of the external store, it always + /// starts with {store-name}://, where {store-name} is the name of the + /// external store. + /// + /// \return The return status. + virtual Status Connect(const std::string& endpoint) = 0; + + /// This method will be called whenever an object in the Plasma store needs + /// to be evicted to the external store. + /// + /// This API is experimental and might change in the future. + /// + /// \param ids The IDs of the objects to put. + /// \param data The object data to put. + /// \return The return status. + virtual Status Put(const std::vector<ObjectID>& ids, + const std::vector<std::shared_ptr<Buffer>>& data) = 0; + + /// This method will be called whenever an evicted object in the external + /// store store needs to be accessed. + /// + /// This API is experimental and might change in the future. + /// + /// \param ids The IDs of the objects to get. + /// \param buffers List of buffers the data should be written to. + /// \return The return status. + virtual Status Get(const std::vector<ObjectID>& ids, + std::vector<std::shared_ptr<Buffer>> buffers) = 0; +}; + +class ExternalStores { + public: + typedef std::unordered_map<std::string, std::shared_ptr<ExternalStore>> StoreMap; + /// Extracts the external store name from the external store endpoint. + /// + /// \param endpoint The endpoint for the external store. + /// \param[out] store_name The name of the external store. + /// \return The return status. + static Status ExtractStoreName(const std::string& endpoint, std::string* store_name); + + /// Register a new external store. + /// + /// \param store_name Name of the new external store. + /// \param store The new external store object. + static void RegisterStore(const std::string& store_name, + std::shared_ptr<ExternalStore> store); + + /// Remove an external store from the registry. + /// + /// \param store_name Name of the external store to remove. + static void DeregisterStore(const std::string& store_name); + + /// Obtain the external store given its name. + /// + /// \param store_name Name of the external store. + /// \return The external store object. + static std::shared_ptr<ExternalStore> GetStore(const std::string& store_name); + + private: + /// Obtain mapping between external store names and store instances. + /// + /// \return Mapping between external store names and store instances. + static StoreMap& Stores(); +}; + +#define REGISTER_EXTERNAL_STORE(name, store) \ + class store##Class { \ + public: \ + store##Class() { ExternalStores::RegisterStore(name, std::make_shared<store>()); } \ + ~store##Class() { ExternalStores::DeregisterStore(name); } \ + }; \ + store##Class singleton_##store = store##Class() + +} // namespace plasma + +#endif // EXTERNAL_STORE_H diff --git a/cpp/src/plasma/hash_table_store.cc b/cpp/src/plasma/hash_table_store.cc new file mode 100644 index 0000000..b77d369 --- /dev/null +++ b/cpp/src/plasma/hash_table_store.cc @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> + +#include "arrow/util/logging.h" + +#include "plasma/hash_table_store.h" + +namespace plasma { + +Status HashTableStore::Connect(const std::string& endpoint) { return Status::OK(); } + +Status HashTableStore::Put(const std::vector<ObjectID>& ids, + const std::vector<std::shared_ptr<Buffer>>& data) { + for (size_t i = 0; i < ids.size(); ++i) { + table_[ids[i]] = data[i]->ToString(); + } + return Status::OK(); +} + +Status HashTableStore::Get(const std::vector<ObjectID>& ids, + std::vector<std::shared_ptr<Buffer>> buffers) { + ARROW_CHECK(ids.size() == buffers.size()); + for (size_t i = 0; i < ids.size(); ++i) { + bool valid; + HashTable::iterator result; + { + result = table_.find(ids[i]); + valid = result != table_.end(); + } + if (valid) { + ARROW_CHECK(buffers[i]->size() == static_cast<int64_t>(result->second.size())); + std::memcpy(buffers[i]->mutable_data(), result->second.data(), + result->second.size()); + } + } + return Status::OK(); +} + +REGISTER_EXTERNAL_STORE("hashtable", HashTableStore); + +} // namespace plasma diff --git a/cpp/src/plasma/hash_table_store.h b/cpp/src/plasma/hash_table_store.h new file mode 100644 index 0000000..766088b --- /dev/null +++ b/cpp/src/plasma/hash_table_store.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef HASH_TABLE_STORE_H +#define HASH_TABLE_STORE_H + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "plasma/external_store.h" + +namespace plasma { + +// This is a sample implementation for an external store, for illustration +// purposes only. + +class HashTableStore : public ExternalStore { + public: + HashTableStore() = default; + + Status Connect(const std::string& endpoint) override; + + Status Get(const std::vector<ObjectID>& ids, + std::vector<std::shared_ptr<Buffer>> buffers) override; + + Status Put(const std::vector<ObjectID>& ids, + const std::vector<std::shared_ptr<Buffer>>& data) override; + + private: + typedef std::unordered_map<ObjectID, std::string> HashTable; + + HashTable table_; +}; + +} // namespace plasma + +#endif // HASH_TABLE_STORE_H diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 745e336..05495b7 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -108,8 +108,10 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids) Client::Client(int fd) : fd(fd), notification_fd(-1) {} -PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled) - : loop_(loop), eviction_policy_(&store_info_) { +PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, + const std::string& socket_name, + std::shared_ptr<ExternalStore> external_store) + : loop_(loop), eviction_policy_(&store_info_), external_store_(external_store) { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; #ifdef PLASMA_CUDA @@ -136,7 +138,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt // Tell the eviction policy that this object is being used. std::vector<ObjectID> objects_to_evict; eviction_policy_.BeginObjectAccess(object_id, &objects_to_evict); - DeleteObjects(objects_to_evict); + EvictObjects(objects_to_evict); } // Increase reference count. entry->ref_count++; @@ -145,16 +147,9 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt client->object_ids.insert(object_id); } -// Create a new object buffer in the hash table. -PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size, - int64_t metadata_size, int device_num, - Client* client, PlasmaObject* result) { - ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); - if (store_info_.objects.count(object_id) != 0) { - // There is already an object with the same ID in the Plasma Store, so - // ignore this requst. - return PlasmaError::ObjectExists; - } +// Allocate memory +uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd, + int64_t* map_size, ptrdiff_t* offset) { // Try to evict objects until there is enough space. uint8_t* pointer = nullptr; #ifdef PLASMA_CUDA @@ -173,18 +168,16 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // it is not guaranteed that the corresponding pointer in the client will be // 64-byte aligned, but in practice it often will be. if (device_num == 0) { - pointer = reinterpret_cast<uint8_t*>( - PlasmaAllocator::Memalign(kBlockSize, data_size + metadata_size)); - if (pointer == nullptr) { + pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size)); + if (!pointer) { // Tell the eviction policy how much space we need to create this object. std::vector<ObjectID> objects_to_evict; - bool success = - eviction_policy_.RequireSpace(data_size + metadata_size, &objects_to_evict); - DeleteObjects(objects_to_evict); + bool success = eviction_policy_.RequireSpace(size, &objects_to_evict); + EvictObjects(objects_to_evict); // Return an error to the client if not enough space could be freed to // create the object. if (!success) { - return PlasmaError::OutOfMemory; + return nullptr; } } else { break; @@ -196,16 +189,39 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si #endif } } + if (device_num == 0) { + GetMallocMapinfo(pointer, fd, map_size, offset); + ARROW_CHECK(*fd != -1); + } + return pointer; +} + +// Create a new object buffer in the hash table. +PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size, + int64_t metadata_size, int device_num, + Client* client, PlasmaObject* result) { + ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); + auto entry = GetObjectTableEntry(&store_info_, object_id); + if (entry != nullptr) { + // There is already an object with the same ID in the Plasma Store, so + // ignore this requst. + return PlasmaError::ObjectExists; + } + int fd = -1; int64_t map_size = 0; ptrdiff_t offset = 0; - if (device_num == 0) { - GetMallocMapinfo(pointer, &fd, &map_size, &offset); - assert(fd != -1); + uint8_t* pointer = + AllocateMemory(device_num, data_size + metadata_size, &fd, &map_size, &offset); + if (!pointer) { + return PlasmaError::OutOfMemory; + } + if (!entry) { + auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); + entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); + entry->data_size = data_size; + entry->metadata_size = metadata_size; } - auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); - entry->data_size = data_size; - entry->metadata_size = metadata_size; entry->pointer = pointer; // TODO(pcm): Set the other fields. entry->fd = fd; @@ -221,7 +237,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si result->ipc_handle = entry->ipc_handle; } #endif - store_info_.objects[object_id] = std::move(entry); + result->store_fd = fd; result->data_offset = offset; result->metadata_offset = offset + data_size; @@ -386,7 +402,8 @@ void PlasmaStore::ProcessGetRequest(Client* client, int64_t timeout_ms) { // Create a get request for this object. auto get_req = new GetRequest(client, object_ids); - + std::vector<ObjectID> evicted_ids; + std::vector<ObjectTableEntry*> evicted_entries; for (auto object_id : object_ids) { // Check if this object is already present locally. If so, record that the // object is being used and mark it as accounted for. @@ -398,6 +415,26 @@ void PlasmaStore::ProcessGetRequest(Client* client, // If necessary, record that this client is using this object. In the case // where entry == NULL, this will be called from SealObject. AddToClientObjectIds(object_id, entry, client); + } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) { + // Make sure the object pointer is not already allocated + ARROW_CHECK(!entry->pointer); + + entry->pointer = AllocateMemory(0, /* Only support device_num = 0 */ + entry->data_size + entry->metadata_size, &entry->fd, + &entry->map_size, &entry->offset); + if (entry->pointer) { + entry->state = ObjectState::PLASMA_CREATED; + entry->create_time = std::time(nullptr); + eviction_policy_.ObjectCreated(object_id); + AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); + evicted_ids.push_back(object_id); + evicted_entries.push_back(entry); + } else { + // We are out of memory an cannot allocate memory for this object. + // Change the state of the object back to PLASMA_EVICTED so some + // other request can try again. + entry->state = ObjectState::PLASMA_EVICTED; + } } else { // Add a placeholder plasma object to the get request to indicate that the // object is not present. This will be parsed by the client. We set the @@ -408,6 +445,33 @@ void PlasmaStore::ProcessGetRequest(Client* client, } } + if (!evicted_ids.empty()) { + unsigned char digest[kDigestSize]; + std::vector<std::shared_ptr<Buffer>> buffers; + for (size_t i = 0; i < evicted_ids.size(); ++i) { + ARROW_CHECK(evicted_entries[i]->pointer != nullptr); + buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer, + evicted_entries[i]->data_size)); + } + if (external_store_->Get(evicted_ids, buffers).ok()) { + for (size_t i = 0; i < evicted_ids.size(); ++i) { + evicted_entries[i]->state = ObjectState::PLASMA_SEALED; + std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize); + evicted_entries[i]->construct_duration = + std::time(nullptr) - evicted_entries[i]->create_time; + PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]); + get_req->num_satisfied += 1; + } + } else { + // We tried to get the objects from the external store, but could not get them. + // Set the state of these objects back to PLASMA_EVICTED so some other request + // can try again. + for (size_t i = 0; i < evicted_ids.size(); ++i) { + evicted_entries[i]->state = ObjectState::PLASMA_EVICTED; + } + } + } + // If all of the objects are present already or if the timeout is 0, return to // the client. if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) { @@ -437,12 +501,12 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, // Tell the eviction policy that this object is no longer being used. std::vector<ObjectID> objects_to_evict; eviction_policy_.EndObjectAccess(object_id, &objects_to_evict); - DeleteObjects(objects_to_evict); + EvictObjects(objects_to_evict); } else { // Above code does not really delete an object. Instead, it just put an // object to LRU cache which will be cleaned when the memory is not enough. deletion_cache_.erase(object_id); - DeleteObjects({object_id}); + EvictObjects({object_id}); } } // Return 1 to indicate that the client was removed. @@ -463,7 +527,8 @@ void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) { // Check if an object is present. ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) { auto entry = GetObjectTableEntry(&store_info_, object_id); - return entry && (entry->state == ObjectState::PLASMA_SEALED) + return entry && (entry->state == ObjectState::PLASMA_SEALED || + entry->state == ObjectState::PLASMA_EVICTED) ? ObjectStatus::OBJECT_FOUND : ObjectStatus::OBJECT_NOT_FOUND; } @@ -480,6 +545,7 @@ void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[]) std::memcpy(&entry->digest[0], &digest[0], kDigestSize); // Set object construction duration. entry->construct_duration = std::time(nullptr) - entry->create_time; + // Inform all subscribers that a new object has been sealed. ObjectInfoT info; info.object_id = object_id.binary(); @@ -545,25 +611,47 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { return PlasmaError::OK; } -void PlasmaStore::DeleteObjects(const std::vector<ObjectID>& object_ids) { +void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) { + std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data; + std::vector<ObjectTableEntry*> evicted_entries; for (const auto& object_id : object_ids) { - ARROW_LOG(DEBUG) << "deleting object " << object_id.hex(); + ARROW_LOG(DEBUG) << "evicting object " << object_id.hex(); auto entry = GetObjectTableEntry(&store_info_, object_id); // TODO(rkn): This should probably not fail, but should instead throw an // error. Maybe we should also support deleting objects that have been // created but not sealed. - ARROW_CHECK(entry != nullptr) - << "To delete an object it must be in the object table."; + ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table."; ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED) - << "To delete an object it must have been sealed."; + << "To evict an object it must have been sealed."; ARROW_CHECK(entry->ref_count == 0) - << "To delete an object, there must be no clients currently using it."; - store_info_.objects.erase(object_id); - // Inform all subscribers that the object has been deleted. - fb::ObjectInfoT notification; - notification.object_id = object_id.binary(); - notification.is_deletion = true; - PushNotification(¬ification); + << "To evict an object, there must be no clients currently using it."; + + // If there is a backing external store, then mark object for eviction to + // external store, free the object data pointer and keep a placeholder + // entry in ObjectTable + if (external_store_) { + evicted_object_data.push_back(std::make_shared<arrow::Buffer>( + entry->pointer, entry->data_size + entry->metadata_size)); + evicted_entries.push_back(entry); + } else { + // If there is no backing external store, just erase the object entry + // and send a deletion notification. + store_info_.objects.erase(object_id); + // Inform all subscribers that the object has been deleted. + fb::ObjectInfoT notification; + notification.object_id = object_id.binary(); + notification.is_deletion = true; + PushNotification(¬ification); + } + } + + if (external_store_ && !object_ids.empty()) { + ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data)); + for (auto entry : evicted_entries) { + PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size); + entry->pointer = nullptr; + entry->state = ObjectState::PLASMA_EVICTED; + } } } @@ -869,7 +957,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { std::vector<ObjectID> objects_to_evict; int64_t num_bytes_evicted = eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict); - DeleteObjects(objects_to_evict); + EvictObjects(objects_to_evict); HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd); } break; case fb::MessageType::PlasmaSubscribeRequest: @@ -894,10 +982,12 @@ class PlasmaStoreRunner { public: PlasmaStoreRunner() {} - void Start(char* socket_name, std::string directory, bool hugepages_enabled) { + void Start(char* socket_name, std::string directory, bool hugepages_enabled, + std::shared_ptr<ExternalStore> external_store) { // Create the event loop. loop_.reset(new EventLoop); - store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled)); + store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name, + external_store)); plasma_config = store_->GetPlasmaStoreInfo(); // We are using a single memory-mapped file by mallocing and freeing a single @@ -945,15 +1035,15 @@ void HandleSignal(int signal) { } } -void StartServer(char* socket_name, std::string plasma_directory, - bool hugepages_enabled) { +void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled, + std::shared_ptr<ExternalStore> external_store) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); g_runner.reset(new PlasmaStoreRunner()); signal(SIGTERM, HandleSignal); - g_runner->Start(socket_name, plasma_directory, hugepages_enabled); + g_runner->Start(socket_name, plasma_directory, hugepages_enabled, external_store); } } // namespace plasma @@ -964,14 +1054,18 @@ int main(int argc, char* argv[]) { char* socket_name = nullptr; // Directory where plasma memory mapped files are stored. std::string plasma_directory; + std::string external_store_endpoint; bool hugepages_enabled = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:e:h")) != -1) { switch (c) { case 'd': plasma_directory = std::string(optarg); break; + case 'e': + external_store_endpoint = std::string(optarg); + break; case 'h': hugepages_enabled = true; break; @@ -1038,8 +1132,22 @@ int main(int argc, char* argv[]) { SetMallocGranularity(1024 * 1024 * 1024); // 1 GB } #endif + // Get external store + std::shared_ptr<plasma::ExternalStore> external_store{nullptr}; + if (!external_store_endpoint.empty()) { + std::string name; + ARROW_CHECK_OK( + plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name)); + external_store = plasma::ExternalStores::GetStore(name); + if (external_store == nullptr) { + ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; + return -1; + } + ARROW_LOG(DEBUG) << "connecting to external store..."; + ARROW_CHECK_OK(external_store->Connect(external_store_endpoint)); + } ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::StartServer(socket_name, plasma_directory, hugepages_enabled); + plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store); plasma::g_runner->Shutdown(); plasma::g_runner = nullptr; diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index a5c586b..7105c51 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -28,7 +28,9 @@ #include "plasma/common.h" #include "plasma/events.h" #include "plasma/eviction_policy.h" +#include "plasma/external_store.h" #include "plasma/plasma.h" +#include "plasma/protocol.h" namespace arrow { class Status; @@ -75,7 +77,9 @@ class PlasmaStore { using NotificationMap = std::unordered_map<int, NotificationQueue>; // TODO: PascalCase PlasmaStore methods. - PlasmaStore(EventLoop* loop, std::string directory, bool hugetlbfs_enabled); + PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, + const std::string& socket_name, + std::shared_ptr<ExternalStore> external_store); ~PlasmaStore(); @@ -125,11 +129,10 @@ class PlasmaStore { /// - PlasmaError::ObjectInUse, if the object is in use. PlasmaError DeleteObject(ObjectID& object_id); - /// Delete objects that have been created in the hash table. This should only - /// be called on objects that are returned by the eviction policy to evict. + /// Evict objects returned by the eviction policy. /// - /// @param object_ids Object IDs of the objects to be deleted. - void DeleteObjects(const std::vector<ObjectID>& object_ids); + /// @param object_ids Object IDs of the objects to be evicted. + void EvictObjects(const std::vector<ObjectID>& object_ids); /// Process a get request from a client. This method assumes that we will /// eventually have these objects sealed. If one of the objects has not yet @@ -149,8 +152,7 @@ class PlasmaStore { /// /// @param object_id Object ID of the object to be sealed. /// @param digest The digest of the object. This is used to tell if two - /// objects - /// with the same object ID are the same. + /// objects with the same object ID are the same. void SealObject(const ObjectID& object_id, unsigned char digest[]); /// Check if the plasma store contains an object: @@ -210,6 +212,9 @@ class PlasmaStore { int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry, Client* client); + uint8_t* AllocateMemory(int device_num, size_t size, int* fd, int64_t* map_size, + ptrdiff_t* offset); + /// Event loop of the plasma store. EventLoop* loop_; /// The plasma store information, including the object tables, that is exposed @@ -233,6 +238,10 @@ class PlasmaStore { std::unordered_map<int, std::unique_ptr<Client>> connected_clients_; std::unordered_set<ObjectID> deletion_cache_; + + /// Manages worker threads for handling asynchronous/multi-threaded requests + /// for reading/writing data to/from external store. + std::shared_ptr<ExternalStore> external_store_; #ifdef PLASMA_CUDA arrow::cuda::CudaDeviceManager* manager_; #endif diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 1678e27..90ab70f 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -59,9 +59,9 @@ class TestPlasmaStore : public ::testing::Test { std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); - std::string plasma_command = plasma_directory + - "/plasma_store_server -m 10000000 -s " + - store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + std::string plasma_command = + plasma_directory + "/plasma_store_server -m 10000000 -s " + store_socket_name_ + + " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + ".pid"; system(plasma_command.c_str()); ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); @@ -69,15 +69,16 @@ class TestPlasmaStore : public ::testing::Test { virtual void TearDown() { ARROW_CHECK_OK(client_.Disconnect()); ARROW_CHECK_OK(client2_.Disconnect()); - // Kill all plasma_store processes - // TODO should only kill the processes we launched + // Kill plasma_store process that we started #ifdef COVERAGE_BUILD // Ask plasma_store to exit gracefully and give it time to write out // coverage files - system("killall -TERM plasma_store_server"); + std::string plasma_term_command = "kill -TERM `cat " + store_socket_name_ + ".pid`"; + system(plasma_term_command.c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(200)); #endif - system("killall -KILL plasma_store_server"); + std::string plasma_kill_command = "kill -KILL `cat " + store_socket_name_ + ".pid`"; + system(plasma_kill_command.c_str()); } void CreateObject(PlasmaClient& client, const ObjectID& object_id, diff --git a/cpp/src/plasma/test/external_store_tests.cc b/cpp/src/plasma/test/external_store_tests.cc new file mode 100644 index 0000000..33d3bd1 --- /dev/null +++ b/cpp/src/plasma/test/external_store_tests.cc @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <assert.h> +#include <signal.h> +#include <stdlib.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + +#include <chrono> +#include <random> +#include <thread> + +#include <gtest/gtest.h> + +#include "arrow/test-util.h" + +#include "plasma/client.h" +#include "plasma/common.h" +#include "plasma/external_store.h" +#include "plasma/plasma.h" +#include "plasma/protocol.h" +#include "plasma/test-util.h" + +namespace plasma { + +std::string external_test_executable; // NOLINT + +void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, + const std::string& metadata, const std::string& data) { + arrow::AssertBufferEqual(*object_buffer.metadata, metadata); + arrow::AssertBufferEqual(*object_buffer.data, data); +} + +class TestPlasmaStoreWithExternal : public ::testing::Test { + public: + // TODO(pcm): At the moment, stdout of the test gets mixed up with + // stdout of the object store. Consider changing that. + void SetUp() override { + uint64_t seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + std::mt19937 rng(static_cast<uint32_t>(seed)); + std::string store_index = std::to_string(rng()); + store_socket_name_ = "/tmp/store_with_external" + store_index; + + std::string plasma_directory = + external_test_executable.substr(0, external_test_executable.find_last_of('/')); + std::string plasma_command = plasma_directory + + "/plasma_store_server -m 1024000 -e " + + "hashtable://test -s " + store_socket_name_ + + " 1> /tmp/log.stdout 2> /tmp/log.stderr & " + + "echo $! > " + store_socket_name_ + ".pid"; + system(plasma_command.c_str()); + ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); + } + void TearDown() override { + ARROW_CHECK_OK(client_.Disconnect()); + // Kill plasma_store process that we started +#ifdef COVERAGE_BUILD + // Ask plasma_store to exit gracefully and give it time to write out + // coverage files + std::string plasma_term_command = "kill -TERM `cat " + store_socket_name_ + ".pid`"; + system(plasma_term_command.c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); +#endif + std::string plasma_kill_command = "kill -KILL `cat " + store_socket_name_ + ".pid`"; + system(plasma_kill_command.c_str()); + } + + protected: + PlasmaClient client_; + std::string store_socket_name_; +}; + +TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { + std::vector<ObjectID> object_ids; + std::string data(100 * 1024, 'x'); + std::string metadata; + for (int i = 0; i < 20; i++) { + ObjectID object_id = random_object_id(); + object_ids.push_back(object_id); + + // Test for object non-existence. + bool has_object; + ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_FALSE(has_object); + + // Test for the object being in local Plasma store. + // Create and seal the object. + ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata)); + // Test that the client can get the object. + ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_TRUE(has_object); + } + + for (int i = 0; i < 20; i++) { + // Since we are accessing objects sequentially, every object we + // access would be a cache "miss" owing to LRU eviction. + // Try and access the object from the plasma store first, and then try + // external store on failure. This should succeed to fetch the object. + // However, it may evict the next few objects. + std::vector<ObjectBuffer> object_buffers; + ARROW_CHECK_OK(client_.Get({object_ids[i]}, -1, &object_buffers)); + ASSERT_EQ(object_buffers.size(), 1); + ASSERT_EQ(object_buffers[0].device_num, 0); + ASSERT_TRUE(object_buffers[0].data); + AssertObjectBufferEqual(object_buffers[0], metadata, data); + } + + // Make sure we still cannot fetch objects that do not exist + std::vector<ObjectBuffer> object_buffers; + ARROW_CHECK_OK(client_.Get({random_object_id()}, 100, &object_buffers)); + ASSERT_EQ(object_buffers.size(), 1); + ASSERT_EQ(object_buffers[0].device_num, 0); + ASSERT_EQ(object_buffers[0].data, nullptr); + ASSERT_EQ(object_buffers[0].metadata, nullptr); +} + +} // namespace plasma + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + plasma::external_test_executable = std::string(argv[0]); + return RUN_ALL_TESTS(); +} diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 4f64f20..b3868dc 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -373,6 +373,7 @@ cdef class PlasmaClient: The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 if the call should return immediately. + with_meta : bool Returns ------- diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py index a6ab362..13b3eec 100644 --- a/python/pyarrow/plasma.py +++ b/python/pyarrow/plasma.py @@ -78,7 +78,8 @@ def build_plasma_tensorflow_op(): @contextlib.contextmanager def start_plasma_store(plasma_store_memory, use_valgrind=False, use_profiler=False, - plasma_directory=None, use_hugepages=False): + plasma_directory=None, use_hugepages=False, + external_store=None): """Start a plasma store process. Args: plasma_store_memory (int): Capacity of the plasma store in bytes. @@ -89,6 +90,7 @@ def start_plasma_store(plasma_store_memory, plasma_directory (str): Directory where plasma memory mapped files will be stored. use_hugepages (bool): True if the plasma store should use huge pages. + external_store (str): External store to use for evicted objects. Return: A tuple of the name of the plasma store socket and the process ID of the plasma store process. @@ -108,6 +110,8 @@ def start_plasma_store(plasma_store_memory, command += ["-d", plasma_directory] if use_hugepages: command += ["-h"] + if external_store is not None: + command += ["-e", external_store] stdout_file = None stderr_file = None if use_valgrind: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index bcb467a..ef53bab 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -37,6 +37,7 @@ import pandas as pd DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" +EXTERNAL_STORE = "hashtable://test" SMALL_OBJECT_SIZE = 9000 @@ -920,6 +921,63 @@ class TestPlasmaClient(object): @pytest.mark.plasma +class TestEvictionToExternalStore(object): + + def setup_method(self, test_method): + import pyarrow.plasma as plasma + # Start Plasma store. + self.plasma_store_ctx = plasma.start_plasma_store( + plasma_store_memory=1000 * 1024, + use_valgrind=USE_VALGRIND, + external_store=EXTERNAL_STORE) + self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() + # Connect to Plasma. + self.plasma_client = plasma.connect(self.plasma_store_name) + + def teardown_method(self, test_method): + try: + # Check that the Plasma store is still alive. + assert self.p.poll() is None + self.p.send_signal(signal.SIGTERM) + if sys.version_info >= (3, 3): + self.p.wait(timeout=5) + else: + self.p.wait() + finally: + self.plasma_store_ctx.__exit__(None, None, None) + + def test_eviction(self): + client = self.plasma_client + + object_ids = [random_object_id() for _ in range(0, 20)] + data = b'x' * 100 * 1024 + metadata = b'' + + for i in range(0, 20): + # Test for object non-existence. + assert not client.contains(object_ids[i]) + + # Create and seal the object. + client.create_and_seal(object_ids[i], data, metadata) + + # Test that the client can get the object. + assert client.contains(object_ids[i]) + + for i in range(0, 20): + # Since we are accessing objects sequentially, every object we + # access would be a cache "miss" owing to LRU eviction. + # Try and access the object from the plasma store first, and then + # try external store on failure. This should succeed to fetch the + # object. However, it may evict the next few objects. + [result] = client.get_buffers([object_ids[i]]) + assert result.to_pybytes() == data + + # Make sure we still cannot fetch objects that do not exist + [result] = client.get_buffers([random_object_id()], timeout_ms=100) + assert result is None + + +@pytest.mark.plasma def test_object_id_size(): import pyarrow.plasma as plasma with pytest.raises(ValueError):