This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c55b25  ARROW-4294: [C++] [Plasma] Add support for evicting Plasma 
objects to external store
0c55b25 is described below

commit 0c55b25c84119af59320eab0b0625da9ce987294
Author: Anurag Khandelwal <anur...@berkeley.edu>
AuthorDate: Mon Feb 4 18:24:13 2019 -0800

    ARROW-4294: [C++] [Plasma] Add support for evicting Plasma objects to 
external store
    
    https://issues.apache.org/jira/browse/ARROW-4294
    
    Note: this PR was previously at https://github.com/apache/arrow/pull/3432, 
which was closed since its commit history was broken.
    
    Currently, when Plasma needs storage space for additional objects, it 
evicts objects by deleting them from the Plasma store. This is a problem when 
it isn't possible to reconstruct the object or reconstructing it is expensive. 
This patch adds support for a pluggable external store that Plasma can evict 
objects to when it runs out of memory.
    
    Author: Anurag Khandelwal <anur...@berkeley.edu>
    Author: Philipp Moritz <pcmor...@gmail.com>
    
    Closes #3482 from anuragkh/plasma_evict_to_external_store and squashes the 
following commits:
    
    631671561 <Philipp Moritz> remove external store worker, simplify interface
    6fbc55b08 <Anurag Khandelwal> Revert "Add an eviction buffer to allow 
asynchronous evictions"
    4f2c02ce3 <Anurag Khandelwal> Revert "Minor fix"
    1bc1dbed4 <Anurag Khandelwal> Revert "format fix"
    7b662bee4 <Anurag Khandelwal> Revert "Remove timeout for external store 
test tearDown"
    25663df30 <Anurag Khandelwal> Remove timeout for external store test 
tearDown
    7945cc951 <Anurag Khandelwal> format fix
    0d7263936 <Anurag Khandelwal> Minor fix
    957efb5f0 <Anurag Khandelwal> Add an eviction buffer to allow asynchronous 
evictions
    896d895bd <Anurag Khandelwal> Fixes
    7ae486794 <Anurag Khandelwal> Merge branch 'master' into 
plasma_evict_to_external_store
    1af2f8bce <Anurag Khandelwal> Fix cpplint issues
    04e173085 <Anurag Khandelwal> Merge branch 'master' into 
plasma_evict_to_external_store
    301e575ea <Anurag Khandelwal> Fix uses of ARROW_CHECK_OK/ARROW_CHECK
    69a56abcc <Anurag Khandelwal> Fix documentation errrors
    c19c5767d <Anurag Khandelwal> Add documentation for notify flag
    f3fad8086 <Anurag Khandelwal> Fix external store worker intialization
    9081596c4 <Anurag Khandelwal> Clean up formatting issues
    f5cc95c72 <Anurag Khandelwal> Add lint exclusion for external_store_worker, 
since it uses mutex
    ffd1f0e6c <Anurag Khandelwal> Extend plasma eviction changes to python 
module
    8afc9fb2f <Anurag Khandelwal> Kill only the plasma_store_server that we 
started
    be315677b <Anurag Khandelwal> Add test for testing evictions/unevictions
    a43445aee <Anurag Khandelwal> Update serialization test
    58a995318 <Anurag Khandelwal> Add support for evicting/un-evicting Plasma 
objects to/from external store
---
 cpp/src/plasma/CMakeLists.txt               |   7 +-
 cpp/src/plasma/common.h                     |   4 +-
 cpp/src/plasma/external_store.cc            |  63 +++++++++
 cpp/src/plasma/external_store.h             | 123 ++++++++++++++++
 cpp/src/plasma/hash_table_store.cc          |  58 ++++++++
 cpp/src/plasma/hash_table_store.h           |  53 +++++++
 cpp/src/plasma/store.cc                     | 210 +++++++++++++++++++++-------
 cpp/src/plasma/store.h                      |  23 ++-
 cpp/src/plasma/test/client_tests.cc         |  15 +-
 cpp/src/plasma/test/external_store_tests.cc | 139 ++++++++++++++++++
 python/pyarrow/_plasma.pyx                  |   1 +
 python/pyarrow/plasma.py                    |   6 +-
 python/pyarrow/tests/test_plasma.py         |  58 ++++++++
 13 files changed, 692 insertions(+), 68 deletions(-)

diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index 53af8c5..fd25aef 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -125,9 +125,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "gcc")
     " -Wno-conversion")
 endif()
 
+list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" 
"hash_table_store.cc")
+
 # We use static libraries for the plasma_store_server executable so that it can
 # be copied around and used in different locations.
-add_executable(plasma_store_server store.cc)
+add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc)
 target_link_libraries(plasma_store_server plasma_static 
${PLASMA_STATIC_LINK_LIBS})
 add_dependencies(plasma plasma_store_server)
 
@@ -214,3 +216,6 @@ ADD_PLASMA_TEST(test/serialization_tests
 ADD_PLASMA_TEST(test/client_tests
   EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS}
   EXTRA_DEPENDENCIES plasma_store_server)
+ADD_PLASMA_TEST(test/external_store_tests
+  EXTRA_LINK_LIBS plasma_shared ${PLASMA_LINK_LIBS}
+  EXTRA_DEPENDENCIES plasma_store_server)
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index dfbd90c..6f4cef5 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -69,7 +69,9 @@ enum class ObjectState : int {
   /// Object was created but not sealed in the local Plasma Store.
   PLASMA_CREATED = 1,
   /// Object is sealed and stored in the local Plasma Store.
-  PLASMA_SEALED
+  PLASMA_SEALED = 2,
+  /// Object is evicted to external store.
+  PLASMA_EVICTED = 3,
 };
 
 namespace internal {
diff --git a/cpp/src/plasma/external_store.cc b/cpp/src/plasma/external_store.cc
new file mode 100644
index 0000000..8cfbad1
--- /dev/null
+++ b/cpp/src/plasma/external_store.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <sstream>
+
+#include "arrow/util/memory.h"
+
+#include "plasma/external_store.h"
+
+namespace plasma {
+
+Status ExternalStores::ExtractStoreName(const std::string& endpoint,
+                                        std::string* store_name) {
+  size_t off = endpoint.find_first_of(':');
+  if (off == std::string::npos) {
+    return Status::Invalid("Malformed endpoint " + endpoint);
+  }
+  *store_name = endpoint.substr(0, off);
+  return Status::OK();
+}
+
+void ExternalStores::RegisterStore(const std::string& store_name,
+                                   std::shared_ptr<ExternalStore> store) {
+  Stores().insert({store_name, store});
+}
+
+void ExternalStores::DeregisterStore(const std::string& store_name) {
+  auto it = Stores().find(store_name);
+  if (it == Stores().end()) {
+    return;
+  }
+  Stores().erase(it);
+}
+
+std::shared_ptr<ExternalStore> ExternalStores::GetStore(const std::string& 
store_name) {
+  auto it = Stores().find(store_name);
+  if (it == Stores().end()) {
+    return nullptr;
+  }
+  return it->second;
+}
+
+ExternalStores::StoreMap& ExternalStores::Stores() {
+  static auto* external_stores = new StoreMap();
+  return *external_stores;
+}
+
+}  // namespace plasma
diff --git a/cpp/src/plasma/external_store.h b/cpp/src/plasma/external_store.h
new file mode 100644
index 0000000..feca466
--- /dev/null
+++ b/cpp/src/plasma/external_store.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef EXTERNAL_STORE_H
+#define EXTERNAL_STORE_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "plasma/client.h"
+
+namespace plasma {
+
+// ==== The external store ====
+//
+// This file contains declaration for all functions that need to be implemented
+// for an external storage service so that objects evicted from Plasma store
+// can be written to it.
+
+class ExternalStore {
+ public:
+  /// Default constructor.
+  ExternalStore() = default;
+
+  /// Virtual destructor.
+  virtual ~ExternalStore() = default;
+
+  /// Connect to the local plasma store. Return the resulting connection.
+  ///
+  /// \param endpoint The name of the endpoint to connect to the external
+  ///        storage service. While the formatting of the endpoint name is
+  ///        specific to the implementation of the external store, it always
+  ///        starts with {store-name}://, where {store-name} is the name of the
+  ///        external store.
+  ///
+  /// \return The return status.
+  virtual Status Connect(const std::string& endpoint) = 0;
+
+  /// This method will be called whenever an object in the Plasma store needs
+  /// to be evicted to the external store.
+  ///
+  /// This API is experimental and might change in the future.
+  ///
+  /// \param ids The IDs of the objects to put.
+  /// \param data The object data to put.
+  /// \return The return status.
+  virtual Status Put(const std::vector<ObjectID>& ids,
+                     const std::vector<std::shared_ptr<Buffer>>& data) = 0;
+
+  /// This method will be called whenever an evicted object in the external
+  /// store store needs to be accessed.
+  ///
+  /// This API is experimental and might change in the future.
+  ///
+  /// \param ids The IDs of the objects to get.
+  /// \param buffers List of buffers the data should be written to.
+  /// \return The return status.
+  virtual Status Get(const std::vector<ObjectID>& ids,
+                     std::vector<std::shared_ptr<Buffer>> buffers) = 0;
+};
+
+class ExternalStores {
+ public:
+  typedef std::unordered_map<std::string, std::shared_ptr<ExternalStore>> 
StoreMap;
+  /// Extracts the external store name from the external store endpoint.
+  ///
+  /// \param endpoint The endpoint for the external store.
+  /// \param[out] store_name The name of the external store.
+  /// \return The return status.
+  static Status ExtractStoreName(const std::string& endpoint, std::string* 
store_name);
+
+  /// Register a new external store.
+  ///
+  /// \param store_name Name of the new external store.
+  /// \param store The new external store object.
+  static void RegisterStore(const std::string& store_name,
+                            std::shared_ptr<ExternalStore> store);
+
+  /// Remove an external store from the registry.
+  ///
+  /// \param store_name Name of the external store to remove.
+  static void DeregisterStore(const std::string& store_name);
+
+  /// Obtain the external store given its name.
+  ///
+  /// \param store_name Name of the external store.
+  /// \return The external store object.
+  static std::shared_ptr<ExternalStore> GetStore(const std::string& 
store_name);
+
+ private:
+  /// Obtain mapping between external store names and store instances.
+  ///
+  /// \return Mapping between external store names and store instances.
+  static StoreMap& Stores();
+};
+
+#define REGISTER_EXTERNAL_STORE(name, store)                                   
        \
+  class store##Class {                                                         
        \
+   public:                                                                     
        \
+    store##Class() { ExternalStores::RegisterStore(name, 
std::make_shared<store>()); } \
+    ~store##Class() { ExternalStores::DeregisterStore(name); }                 
        \
+  };                                                                           
        \
+  store##Class singleton_##store = store##Class()
+
+}  // namespace plasma
+
+#endif  // EXTERNAL_STORE_H
diff --git a/cpp/src/plasma/hash_table_store.cc 
b/cpp/src/plasma/hash_table_store.cc
new file mode 100644
index 0000000..b77d369
--- /dev/null
+++ b/cpp/src/plasma/hash_table_store.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include "arrow/util/logging.h"
+
+#include "plasma/hash_table_store.h"
+
+namespace plasma {
+
+Status HashTableStore::Connect(const std::string& endpoint) { return 
Status::OK(); }
+
+Status HashTableStore::Put(const std::vector<ObjectID>& ids,
+                           const std::vector<std::shared_ptr<Buffer>>& data) {
+  for (size_t i = 0; i < ids.size(); ++i) {
+    table_[ids[i]] = data[i]->ToString();
+  }
+  return Status::OK();
+}
+
+Status HashTableStore::Get(const std::vector<ObjectID>& ids,
+                           std::vector<std::shared_ptr<Buffer>> buffers) {
+  ARROW_CHECK(ids.size() == buffers.size());
+  for (size_t i = 0; i < ids.size(); ++i) {
+    bool valid;
+    HashTable::iterator result;
+    {
+      result = table_.find(ids[i]);
+      valid = result != table_.end();
+    }
+    if (valid) {
+      ARROW_CHECK(buffers[i]->size() == 
static_cast<int64_t>(result->second.size()));
+      std::memcpy(buffers[i]->mutable_data(), result->second.data(),
+                  result->second.size());
+    }
+  }
+  return Status::OK();
+}
+
+REGISTER_EXTERNAL_STORE("hashtable", HashTableStore);
+
+}  // namespace plasma
diff --git a/cpp/src/plasma/hash_table_store.h 
b/cpp/src/plasma/hash_table_store.h
new file mode 100644
index 0000000..766088b
--- /dev/null
+++ b/cpp/src/plasma/hash_table_store.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef HASH_TABLE_STORE_H
+#define HASH_TABLE_STORE_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "plasma/external_store.h"
+
+namespace plasma {
+
+// This is a sample implementation for an external store, for illustration
+// purposes only.
+
+class HashTableStore : public ExternalStore {
+ public:
+  HashTableStore() = default;
+
+  Status Connect(const std::string& endpoint) override;
+
+  Status Get(const std::vector<ObjectID>& ids,
+             std::vector<std::shared_ptr<Buffer>> buffers) override;
+
+  Status Put(const std::vector<ObjectID>& ids,
+             const std::vector<std::shared_ptr<Buffer>>& data) override;
+
+ private:
+  typedef std::unordered_map<ObjectID, std::string> HashTable;
+
+  HashTable table_;
+};
+
+}  // namespace plasma
+
+#endif  // HASH_TABLE_STORE_H
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 745e336..05495b7 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -108,8 +108,10 @@ GetRequest::GetRequest(Client* client, const 
std::vector<ObjectID>& object_ids)
 
 Client::Client(int fd) : fd(fd), notification_fd(-1) {}
 
-PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool 
hugepages_enabled)
-    : loop_(loop), eviction_policy_(&store_info_) {
+PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool 
hugepages_enabled,
+                         const std::string& socket_name,
+                         std::shared_ptr<ExternalStore> external_store)
+    : loop_(loop), eviction_policy_(&store_info_), 
external_store_(external_store) {
   store_info_.directory = directory;
   store_info_.hugepages_enabled = hugepages_enabled;
 #ifdef PLASMA_CUDA
@@ -136,7 +138,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& 
object_id, ObjectTableEnt
     // Tell the eviction policy that this object is being used.
     std::vector<ObjectID> objects_to_evict;
     eviction_policy_.BeginObjectAccess(object_id, &objects_to_evict);
-    DeleteObjects(objects_to_evict);
+    EvictObjects(objects_to_evict);
   }
   // Increase reference count.
   entry->ref_count++;
@@ -145,16 +147,9 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& 
object_id, ObjectTableEnt
   client->object_ids.insert(object_id);
 }
 
-// Create a new object buffer in the hash table.
-PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t 
data_size,
-                                      int64_t metadata_size, int device_num,
-                                      Client* client, PlasmaObject* result) {
-  ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
-  if (store_info_.objects.count(object_id) != 0) {
-    // There is already an object with the same ID in the Plasma Store, so
-    // ignore this requst.
-    return PlasmaError::ObjectExists;
-  }
+// Allocate memory
+uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd,
+                                     int64_t* map_size, ptrdiff_t* offset) {
   // Try to evict objects until there is enough space.
   uint8_t* pointer = nullptr;
 #ifdef PLASMA_CUDA
@@ -173,18 +168,16 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
     // it is not guaranteed that the corresponding pointer in the client will 
be
     // 64-byte aligned, but in practice it often will be.
     if (device_num == 0) {
-      pointer = reinterpret_cast<uint8_t*>(
-          PlasmaAllocator::Memalign(kBlockSize, data_size + metadata_size));
-      if (pointer == nullptr) {
+      pointer = 
reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
+      if (!pointer) {
         // Tell the eviction policy how much space we need to create this 
object.
         std::vector<ObjectID> objects_to_evict;
-        bool success =
-            eviction_policy_.RequireSpace(data_size + metadata_size, 
&objects_to_evict);
-        DeleteObjects(objects_to_evict);
+        bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
+        EvictObjects(objects_to_evict);
         // Return an error to the client if not enough space could be freed to
         // create the object.
         if (!success) {
-          return PlasmaError::OutOfMemory;
+          return nullptr;
         }
       } else {
         break;
@@ -196,16 +189,39 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
 #endif
     }
   }
+  if (device_num == 0) {
+    GetMallocMapinfo(pointer, fd, map_size, offset);
+    ARROW_CHECK(*fd != -1);
+  }
+  return pointer;
+}
+
+// Create a new object buffer in the hash table.
+PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t 
data_size,
+                                      int64_t metadata_size, int device_num,
+                                      Client* client, PlasmaObject* result) {
+  ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  if (entry != nullptr) {
+    // There is already an object with the same ID in the Plasma Store, so
+    // ignore this requst.
+    return PlasmaError::ObjectExists;
+  }
+
   int fd = -1;
   int64_t map_size = 0;
   ptrdiff_t offset = 0;
-  if (device_num == 0) {
-    GetMallocMapinfo(pointer, &fd, &map_size, &offset);
-    assert(fd != -1);
+  uint8_t* pointer =
+      AllocateMemory(device_num, data_size + metadata_size, &fd, &map_size, 
&offset);
+  if (!pointer) {
+    return PlasmaError::OutOfMemory;
+  }
+  if (!entry) {
+    auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+    entry = store_info_.objects.emplace(object_id, 
std::move(ptr)).first->second.get();
+    entry->data_size = data_size;
+    entry->metadata_size = metadata_size;
   }
-  auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-  entry->data_size = data_size;
-  entry->metadata_size = metadata_size;
   entry->pointer = pointer;
   // TODO(pcm): Set the other fields.
   entry->fd = fd;
@@ -221,7 +237,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& 
object_id, int64_t data_si
     result->ipc_handle = entry->ipc_handle;
   }
 #endif
-  store_info_.objects[object_id] = std::move(entry);
+
   result->store_fd = fd;
   result->data_offset = offset;
   result->metadata_offset = offset + data_size;
@@ -386,7 +402,8 @@ void PlasmaStore::ProcessGetRequest(Client* client,
                                     int64_t timeout_ms) {
   // Create a get request for this object.
   auto get_req = new GetRequest(client, object_ids);
-
+  std::vector<ObjectID> evicted_ids;
+  std::vector<ObjectTableEntry*> evicted_entries;
   for (auto object_id : object_ids) {
     // Check if this object is already present locally. If so, record that the
     // object is being used and mark it as accounted for.
@@ -398,6 +415,26 @@ void PlasmaStore::ProcessGetRequest(Client* client,
       // If necessary, record that this client is using this object. In the 
case
       // where entry == NULL, this will be called from SealObject.
       AddToClientObjectIds(object_id, entry, client);
+    } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) {
+      // Make sure the object pointer is not already allocated
+      ARROW_CHECK(!entry->pointer);
+
+      entry->pointer = AllocateMemory(0, /* Only support device_num = 0 */
+                                      entry->data_size + entry->metadata_size, 
&entry->fd,
+                                      &entry->map_size, &entry->offset);
+      if (entry->pointer) {
+        entry->state = ObjectState::PLASMA_CREATED;
+        entry->create_time = std::time(nullptr);
+        eviction_policy_.ObjectCreated(object_id);
+        AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), 
client);
+        evicted_ids.push_back(object_id);
+        evicted_entries.push_back(entry);
+      } else {
+        // We are out of memory an cannot allocate memory for this object.
+        // Change the state of the object back to PLASMA_EVICTED so some
+        // other request can try again.
+        entry->state = ObjectState::PLASMA_EVICTED;
+      }
     } else {
       // Add a placeholder plasma object to the get request to indicate that 
the
       // object is not present. This will be parsed by the client. We set the
@@ -408,6 +445,33 @@ void PlasmaStore::ProcessGetRequest(Client* client,
     }
   }
 
+  if (!evicted_ids.empty()) {
+    unsigned char digest[kDigestSize];
+    std::vector<std::shared_ptr<Buffer>> buffers;
+    for (size_t i = 0; i < evicted_ids.size(); ++i) {
+      ARROW_CHECK(evicted_entries[i]->pointer != nullptr);
+      buffers.emplace_back(new 
arrow::MutableBuffer(evicted_entries[i]->pointer,
+                                                    
evicted_entries[i]->data_size));
+    }
+    if (external_store_->Get(evicted_ids, buffers).ok()) {
+      for (size_t i = 0; i < evicted_ids.size(); ++i) {
+        evicted_entries[i]->state = ObjectState::PLASMA_SEALED;
+        std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize);
+        evicted_entries[i]->construct_duration =
+            std::time(nullptr) - evicted_entries[i]->create_time;
+        PlasmaObject_init(&get_req->objects[evicted_ids[i]], 
evicted_entries[i]);
+        get_req->num_satisfied += 1;
+      }
+    } else {
+      // We tried to get the objects from the external store, but could not 
get them.
+      // Set the state of these objects back to PLASMA_EVICTED so some other 
request
+      // can try again.
+      for (size_t i = 0; i < evicted_ids.size(); ++i) {
+        evicted_entries[i]->state = ObjectState::PLASMA_EVICTED;
+      }
+    }
+  }
+
   // If all of the objects are present already or if the timeout is 0, return 
to
   // the client.
   if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms 
== 0) {
@@ -437,12 +501,12 @@ int PlasmaStore::RemoveFromClientObjectIds(const 
ObjectID& object_id,
         // Tell the eviction policy that this object is no longer being used.
         std::vector<ObjectID> objects_to_evict;
         eviction_policy_.EndObjectAccess(object_id, &objects_to_evict);
-        DeleteObjects(objects_to_evict);
+        EvictObjects(objects_to_evict);
       } else {
         // Above code does not really delete an object. Instead, it just put an
         // object to LRU cache which will be cleaned when the memory is not 
enough.
         deletion_cache_.erase(object_id);
-        DeleteObjects({object_id});
+        EvictObjects({object_id});
       }
     }
     // Return 1 to indicate that the client was removed.
@@ -463,7 +527,8 @@ void PlasmaStore::ReleaseObject(const ObjectID& object_id, 
Client* client) {
 // Check if an object is present.
 ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
   auto entry = GetObjectTableEntry(&store_info_, object_id);
-  return entry && (entry->state == ObjectState::PLASMA_SEALED)
+  return entry && (entry->state == ObjectState::PLASMA_SEALED ||
+                   entry->state == ObjectState::PLASMA_EVICTED)
              ? ObjectStatus::OBJECT_FOUND
              : ObjectStatus::OBJECT_NOT_FOUND;
 }
@@ -480,6 +545,7 @@ void PlasmaStore::SealObject(const ObjectID& object_id, 
unsigned char digest[])
   std::memcpy(&entry->digest[0], &digest[0], kDigestSize);
   // Set object construction duration.
   entry->construct_duration = std::time(nullptr) - entry->create_time;
+
   // Inform all subscribers that a new object has been sealed.
   ObjectInfoT info;
   info.object_id = object_id.binary();
@@ -545,25 +611,47 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& 
object_id) {
   return PlasmaError::OK;
 }
 
-void PlasmaStore::DeleteObjects(const std::vector<ObjectID>& object_ids) {
+void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
+  std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
+  std::vector<ObjectTableEntry*> evicted_entries;
   for (const auto& object_id : object_ids) {
-    ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
+    ARROW_LOG(DEBUG) << "evicting object " << object_id.hex();
     auto entry = GetObjectTableEntry(&store_info_, object_id);
     // TODO(rkn): This should probably not fail, but should instead throw an
     // error. Maybe we should also support deleting objects that have been
     // created but not sealed.
-    ARROW_CHECK(entry != nullptr)
-        << "To delete an object it must be in the object table.";
+    ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the 
object table.";
     ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED)
-        << "To delete an object it must have been sealed.";
+        << "To evict an object it must have been sealed.";
     ARROW_CHECK(entry->ref_count == 0)
-        << "To delete an object, there must be no clients currently using it.";
-    store_info_.objects.erase(object_id);
-    // Inform all subscribers that the object has been deleted.
-    fb::ObjectInfoT notification;
-    notification.object_id = object_id.binary();
-    notification.is_deletion = true;
-    PushNotification(&notification);
+        << "To evict an object, there must be no clients currently using it.";
+
+    // If there is a backing external store, then mark object for eviction to
+    // external store, free the object data pointer and keep a placeholder
+    // entry in ObjectTable
+    if (external_store_) {
+      evicted_object_data.push_back(std::make_shared<arrow::Buffer>(
+          entry->pointer, entry->data_size + entry->metadata_size));
+      evicted_entries.push_back(entry);
+    } else {
+      // If there is no backing external store, just erase the object entry
+      // and send a deletion notification.
+      store_info_.objects.erase(object_id);
+      // Inform all subscribers that the object has been deleted.
+      fb::ObjectInfoT notification;
+      notification.object_id = object_id.binary();
+      notification.is_deletion = true;
+      PushNotification(&notification);
+    }
+  }
+
+  if (external_store_ && !object_ids.empty()) {
+    ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
+    for (auto entry : evicted_entries) {
+      PlasmaAllocator::Free(entry->pointer, entry->data_size + 
entry->metadata_size);
+      entry->pointer = nullptr;
+      entry->state = ObjectState::PLASMA_EVICTED;
+    }
   }
 }
 
@@ -869,7 +957,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
       std::vector<ObjectID> objects_to_evict;
       int64_t num_bytes_evicted =
           eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict);
-      DeleteObjects(objects_to_evict);
+      EvictObjects(objects_to_evict);
       HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), 
client->fd);
     } break;
     case fb::MessageType::PlasmaSubscribeRequest:
@@ -894,10 +982,12 @@ class PlasmaStoreRunner {
  public:
   PlasmaStoreRunner() {}
 
-  void Start(char* socket_name, std::string directory, bool hugepages_enabled) 
{
+  void Start(char* socket_name, std::string directory, bool hugepages_enabled,
+             std::shared_ptr<ExternalStore> external_store) {
     // Create the event loop.
     loop_.reset(new EventLoop);
-    store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled));
+    store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, 
socket_name,
+                                 external_store));
     plasma_config = store_->GetPlasmaStoreInfo();
 
     // We are using a single memory-mapped file by mallocing and freeing a 
single
@@ -945,15 +1035,15 @@ void HandleSignal(int signal) {
   }
 }
 
-void StartServer(char* socket_name, std::string plasma_directory,
-                 bool hugepages_enabled) {
+void StartServer(char* socket_name, std::string plasma_directory, bool 
hugepages_enabled,
+                 std::shared_ptr<ExternalStore> external_store) {
   // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
   // to a client that has already died, the store could die.
   signal(SIGPIPE, SIG_IGN);
 
   g_runner.reset(new PlasmaStoreRunner());
   signal(SIGTERM, HandleSignal);
-  g_runner->Start(socket_name, plasma_directory, hugepages_enabled);
+  g_runner->Start(socket_name, plasma_directory, hugepages_enabled, 
external_store);
 }
 
 }  // namespace plasma
@@ -964,14 +1054,18 @@ int main(int argc, char* argv[]) {
   char* socket_name = nullptr;
   // Directory where plasma memory mapped files are stored.
   std::string plasma_directory;
+  std::string external_store_endpoint;
   bool hugepages_enabled = false;
   int64_t system_memory = -1;
   int c;
-  while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
+  while ((c = getopt(argc, argv, "s:m:d:e:h")) != -1) {
     switch (c) {
       case 'd':
         plasma_directory = std::string(optarg);
         break;
+      case 'e':
+        external_store_endpoint = std::string(optarg);
+        break;
       case 'h':
         hugepages_enabled = true;
         break;
@@ -1038,8 +1132,22 @@ int main(int argc, char* argv[]) {
     SetMallocGranularity(1024 * 1024 * 1024);  // 1 GB
   }
 #endif
+  // Get external store
+  std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
+  if (!external_store_endpoint.empty()) {
+    std::string name;
+    ARROW_CHECK_OK(
+        plasma::ExternalStores::ExtractStoreName(external_store_endpoint, 
&name));
+    external_store = plasma::ExternalStores::GetStore(name);
+    if (external_store == nullptr) {
+      ARROW_LOG(FATAL) << "No such external store \"" << name << "\"";
+      return -1;
+    }
+    ARROW_LOG(DEBUG) << "connecting to external store...";
+    ARROW_CHECK_OK(external_store->Connect(external_store_endpoint));
+  }
   ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
-  plasma::StartServer(socket_name, plasma_directory, hugepages_enabled);
+  plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, 
external_store);
   plasma::g_runner->Shutdown();
   plasma::g_runner = nullptr;
 
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index a5c586b..7105c51 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -28,7 +28,9 @@
 #include "plasma/common.h"
 #include "plasma/events.h"
 #include "plasma/eviction_policy.h"
+#include "plasma/external_store.h"
 #include "plasma/plasma.h"
+#include "plasma/protocol.h"
 
 namespace arrow {
 class Status;
@@ -75,7 +77,9 @@ class PlasmaStore {
   using NotificationMap = std::unordered_map<int, NotificationQueue>;
 
   // TODO: PascalCase PlasmaStore methods.
-  PlasmaStore(EventLoop* loop, std::string directory, bool hugetlbfs_enabled);
+  PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
+              const std::string& socket_name,
+              std::shared_ptr<ExternalStore> external_store);
 
   ~PlasmaStore();
 
@@ -125,11 +129,10 @@ class PlasmaStore {
   ///  - PlasmaError::ObjectInUse, if the object is in use.
   PlasmaError DeleteObject(ObjectID& object_id);
 
-  /// Delete objects that have been created in the hash table. This should only
-  /// be called on objects that are returned by the eviction policy to evict.
+  /// Evict objects returned by the eviction policy.
   ///
-  /// @param object_ids Object IDs of the objects to be deleted.
-  void DeleteObjects(const std::vector<ObjectID>& object_ids);
+  /// @param object_ids Object IDs of the objects to be evicted.
+  void EvictObjects(const std::vector<ObjectID>& object_ids);
 
   /// Process a get request from a client. This method assumes that we will
   /// eventually have these objects sealed. If one of the objects has not yet
@@ -149,8 +152,7 @@ class PlasmaStore {
   ///
   /// @param object_id Object ID of the object to be sealed.
   /// @param digest The digest of the object. This is used to tell if two
-  /// objects
-  ///        with the same object ID are the same.
+  /// objects with the same object ID are the same.
   void SealObject(const ObjectID& object_id, unsigned char digest[]);
 
   /// Check if the plasma store contains an object:
@@ -210,6 +212,9 @@ class PlasmaStore {
   int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* 
entry,
                                 Client* client);
 
+  uint8_t* AllocateMemory(int device_num, size_t size, int* fd, int64_t* 
map_size,
+                          ptrdiff_t* offset);
+
   /// Event loop of the plasma store.
   EventLoop* loop_;
   /// The plasma store information, including the object tables, that is 
exposed
@@ -233,6 +238,10 @@ class PlasmaStore {
   std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
 
   std::unordered_set<ObjectID> deletion_cache_;
+
+  /// Manages worker threads for handling asynchronous/multi-threaded requests
+  /// for reading/writing data to/from external store.
+  std::shared_ptr<ExternalStore> external_store_;
 #ifdef PLASMA_CUDA
   arrow::cuda::CudaDeviceManager* manager_;
 #endif
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index 1678e27..90ab70f 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -59,9 +59,9 @@ class TestPlasmaStore : public ::testing::Test {
 
     std::string plasma_directory =
         test_executable.substr(0, test_executable.find_last_of("/"));
-    std::string plasma_command = plasma_directory +
-                                 "/plasma_store_server -m 10000000 -s " +
-                                 store_socket_name_ + " 1> /dev/null 2> 
/dev/null &";
+    std::string plasma_command =
+        plasma_directory + "/plasma_store_server -m 10000000 -s " + 
store_socket_name_ +
+        " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + 
".pid";
     system(plasma_command.c_str());
     ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
     ARROW_CHECK_OK(client2_.Connect(store_socket_name_, ""));
@@ -69,15 +69,16 @@ class TestPlasmaStore : public ::testing::Test {
   virtual void TearDown() {
     ARROW_CHECK_OK(client_.Disconnect());
     ARROW_CHECK_OK(client2_.Disconnect());
-    // Kill all plasma_store processes
-    // TODO should only kill the processes we launched
+    // Kill plasma_store process that we started
 #ifdef COVERAGE_BUILD
     // Ask plasma_store to exit gracefully and give it time to write out
     // coverage files
-    system("killall -TERM plasma_store_server");
+    std::string plasma_term_command = "kill -TERM `cat " + store_socket_name_ 
+ ".pid`";
+    system(plasma_term_command.c_str());
     std::this_thread::sleep_for(std::chrono::milliseconds(200));
 #endif
-    system("killall -KILL plasma_store_server");
+    std::string plasma_kill_command = "kill -KILL `cat " + store_socket_name_ 
+ ".pid`";
+    system(plasma_kill_command.c_str());
   }
 
   void CreateObject(PlasmaClient& client, const ObjectID& object_id,
diff --git a/cpp/src/plasma/test/external_store_tests.cc 
b/cpp/src/plasma/test/external_store_tests.cc
new file mode 100644
index 0000000..33d3bd1
--- /dev/null
+++ b/cpp/src/plasma/test/external_store_tests.cc
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <random>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "arrow/test-util.h"
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/external_store.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+#include "plasma/test-util.h"
+
+namespace plasma {
+
+std::string external_test_executable;  // NOLINT
+
+void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
+                             const std::string& metadata, const std::string& 
data) {
+  arrow::AssertBufferEqual(*object_buffer.metadata, metadata);
+  arrow::AssertBufferEqual(*object_buffer.data, data);
+}
+
+class TestPlasmaStoreWithExternal : public ::testing::Test {
+ public:
+  // TODO(pcm): At the moment, stdout of the test gets mixed up with
+  // stdout of the object store. Consider changing that.
+  void SetUp() override {
+    uint64_t seed = 
std::chrono::high_resolution_clock::now().time_since_epoch().count();
+    std::mt19937 rng(static_cast<uint32_t>(seed));
+    std::string store_index = std::to_string(rng());
+    store_socket_name_ = "/tmp/store_with_external" + store_index;
+
+    std::string plasma_directory =
+        external_test_executable.substr(0, 
external_test_executable.find_last_of('/'));
+    std::string plasma_command = plasma_directory +
+                                 "/plasma_store_server -m 1024000 -e " +
+                                 "hashtable://test -s " + store_socket_name_ +
+                                 " 1> /tmp/log.stdout 2> /tmp/log.stderr & " +
+                                 "echo $! > " + store_socket_name_ + ".pid";
+    system(plasma_command.c_str());
+    ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
+  }
+  void TearDown() override {
+    ARROW_CHECK_OK(client_.Disconnect());
+    // Kill plasma_store process that we started
+#ifdef COVERAGE_BUILD
+    // Ask plasma_store to exit gracefully and give it time to write out
+    // coverage files
+    std::string plasma_term_command = "kill -TERM `cat " + store_socket_name_ 
+ ".pid`";
+    system(plasma_term_command.c_str());
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+#endif
+    std::string plasma_kill_command = "kill -KILL `cat " + store_socket_name_ 
+ ".pid`";
+    system(plasma_kill_command.c_str());
+  }
+
+ protected:
+  PlasmaClient client_;
+  std::string store_socket_name_;
+};
+
+TEST_F(TestPlasmaStoreWithExternal, EvictionTest) {
+  std::vector<ObjectID> object_ids;
+  std::string data(100 * 1024, 'x');
+  std::string metadata;
+  for (int i = 0; i < 20; i++) {
+    ObjectID object_id = random_object_id();
+    object_ids.push_back(object_id);
+
+    // Test for object non-existence.
+    bool has_object;
+    ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+    ASSERT_FALSE(has_object);
+
+    // Test for the object being in local Plasma store.
+    // Create and seal the object.
+    ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata));
+    // Test that the client can get the object.
+    ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+    ASSERT_TRUE(has_object);
+  }
+
+  for (int i = 0; i < 20; i++) {
+    // Since we are accessing objects sequentially, every object we
+    // access would be a cache "miss" owing to LRU eviction.
+    // Try and access the object from the plasma store first, and then try
+    // external store on failure. This should succeed to fetch the object.
+    // However, it may evict the next few objects.
+    std::vector<ObjectBuffer> object_buffers;
+    ARROW_CHECK_OK(client_.Get({object_ids[i]}, -1, &object_buffers));
+    ASSERT_EQ(object_buffers.size(), 1);
+    ASSERT_EQ(object_buffers[0].device_num, 0);
+    ASSERT_TRUE(object_buffers[0].data);
+    AssertObjectBufferEqual(object_buffers[0], metadata, data);
+  }
+
+  // Make sure we still cannot fetch objects that do not exist
+  std::vector<ObjectBuffer> object_buffers;
+  ARROW_CHECK_OK(client_.Get({random_object_id()}, 100, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 0);
+  ASSERT_EQ(object_buffers[0].data, nullptr);
+  ASSERT_EQ(object_buffers[0].metadata, nullptr);
+}
+
+}  // namespace plasma
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  plasma::external_test_executable = std::string(argv[0]);
+  return RUN_ALL_TESTS();
+}
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 4f64f20..b3868dc 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -373,6 +373,7 @@ cdef class PlasmaClient:
             The number of milliseconds that the get call should block before
             timing out and returning. Pass -1 if the call should block and 0
             if the call should return immediately.
+        with_meta : bool
 
         Returns
         -------
diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py
index a6ab362..13b3eec 100644
--- a/python/pyarrow/plasma.py
+++ b/python/pyarrow/plasma.py
@@ -78,7 +78,8 @@ def build_plasma_tensorflow_op():
 @contextlib.contextmanager
 def start_plasma_store(plasma_store_memory,
                        use_valgrind=False, use_profiler=False,
-                       plasma_directory=None, use_hugepages=False):
+                       plasma_directory=None, use_hugepages=False,
+                       external_store=None):
     """Start a plasma store process.
     Args:
         plasma_store_memory (int): Capacity of the plasma store in bytes.
@@ -89,6 +90,7 @@ def start_plasma_store(plasma_store_memory,
         plasma_directory (str): Directory where plasma memory mapped files
             will be stored.
         use_hugepages (bool): True if the plasma store should use huge pages.
+        external_store (str): External store to use for evicted objects.
     Return:
         A tuple of the name of the plasma store socket and the process ID of
             the plasma store process.
@@ -108,6 +110,8 @@ def start_plasma_store(plasma_store_memory,
             command += ["-d", plasma_directory]
         if use_hugepages:
             command += ["-h"]
+        if external_store is not None:
+            command += ["-e", external_store]
         stdout_file = None
         stderr_file = None
         if use_valgrind:
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index bcb467a..ef53bab 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -37,6 +37,7 @@ import pandas as pd
 
 DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
 USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
+EXTERNAL_STORE = "hashtable://test"
 SMALL_OBJECT_SIZE = 9000
 
 
@@ -920,6 +921,63 @@ class TestPlasmaClient(object):
 
 
 @pytest.mark.plasma
+class TestEvictionToExternalStore(object):
+
+    def setup_method(self, test_method):
+        import pyarrow.plasma as plasma
+        # Start Plasma store.
+        self.plasma_store_ctx = plasma.start_plasma_store(
+            plasma_store_memory=1000 * 1024,
+            use_valgrind=USE_VALGRIND,
+            external_store=EXTERNAL_STORE)
+        self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
+        # Connect to Plasma.
+        self.plasma_client = plasma.connect(self.plasma_store_name)
+
+    def teardown_method(self, test_method):
+        try:
+            # Check that the Plasma store is still alive.
+            assert self.p.poll() is None
+            self.p.send_signal(signal.SIGTERM)
+            if sys.version_info >= (3, 3):
+                self.p.wait(timeout=5)
+            else:
+                self.p.wait()
+        finally:
+            self.plasma_store_ctx.__exit__(None, None, None)
+
+    def test_eviction(self):
+        client = self.plasma_client
+
+        object_ids = [random_object_id() for _ in range(0, 20)]
+        data = b'x' * 100 * 1024
+        metadata = b''
+
+        for i in range(0, 20):
+            # Test for object non-existence.
+            assert not client.contains(object_ids[i])
+
+            # Create and seal the object.
+            client.create_and_seal(object_ids[i], data, metadata)
+
+            # Test that the client can get the object.
+            assert client.contains(object_ids[i])
+
+        for i in range(0, 20):
+            # Since we are accessing objects sequentially, every object we
+            # access would be a cache "miss" owing to LRU eviction.
+            # Try and access the object from the plasma store first, and then
+            # try external store on failure. This should succeed to fetch the
+            # object. However, it may evict the next few objects.
+            [result] = client.get_buffers([object_ids[i]])
+            assert result.to_pybytes() == data
+
+        # Make sure we still cannot fetch objects that do not exist
+        [result] = client.get_buffers([random_object_id()], timeout_ms=100)
+        assert result is None
+
+
+@pytest.mark.plasma
 def test_object_id_size():
     import pyarrow.plasma as plasma
     with pytest.raises(ValueError):

Reply via email to