This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f38a95  ARROW-1947: [Plasma] Change Client Create and Get to use 
Buffers
0f38a95 is described below

commit 0f38a9503bcde872b705976bd314ccb6c0d0f8d5
Author: Philipp Moritz <[email protected]>
AuthorDate: Fri Dec 29 14:19:38 2017 -0800

    ARROW-1947: [Plasma] Change Client Create and Get to use Buffers
    
    - Create now takes in a pointer to a shared pointer of Buffer and returns a 
MutableBuffer.
    - Object Buffers data and metadata are pointers to shared pointers of 
Buffer.
    
    Author: Philipp Moritz <[email protected]>
    Author: William Paul <[email protected]>
    
    Closes #1444 from Wapaul1/plasma_buffer_api and squashes the following 
commits:
    
    7fe1cee [Philipp Moritz] fix size of MutableBuffer returned by 
plasma::Create
    aeed751 [Philipp Moritz] more linting
    b3274e0 [Philipp Moritz] fix
    463dbeb [Philipp Moritz] fix plasma python extension
    a055fa8 [Philipp Moritz] fix linting
    fc62dda [William Paul] Added metadata buffer
    4d8cbb8 [William Paul] Create and Get use Buffers now
---
 cpp/src/plasma/client.cc            | 44 +++++++++++++++++++++++--------------
 cpp/src/plasma/client.h             | 18 +++++++++------
 cpp/src/plasma/test/client_tests.cc | 36 ++++++++++++++++++------------
 python/pyarrow/plasma.pyx           | 14 +++++-------
 4 files changed, 67 insertions(+), 45 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 9bbafac..0dd1c44 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -40,6 +40,7 @@
 #include <thread>
 #include <vector>
 
+#include "arrow/buffer.h"
 #include "plasma/common.h"
 #include "plasma/fling.h"
 #include "plasma/io.h"
@@ -53,6 +54,8 @@
 
 namespace plasma {
 
+using arrow::MutableBuffer;
+
 // Number of threads used for memcopy and hash computations.
 constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
@@ -145,7 +148,8 @@ void PlasmaClient::increment_object_count(const ObjectID& 
object_id, PlasmaObjec
 }
 
 Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
-                            uint8_t* metadata, int64_t metadata_size, 
uint8_t** data) {
+                            uint8_t* metadata, int64_t metadata_size,
+                            std::shared_ptr<Buffer>* data) {
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with 
size "
                    << data_size << " and metadata size " << metadata_size;
   RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, 
metadata_size));
@@ -162,14 +166,16 @@ Status PlasmaClient::Create(const ObjectID& object_id, 
int64_t data_size,
   ARROW_CHECK(object.metadata_size == metadata_size);
   // The metadata should come right after the data.
   ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
-  *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
-          object.data_offset;
+  *data = std::make_shared<MutableBuffer>(
+      lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+          object.data_offset,
+      data_size);
   // If plasma_create is being called from a transfer, then we will not copy 
the
   // metadata here. The metadata will be written along with the data streamed
   // from the transfer.
   if (metadata != NULL) {
     // Copy the metadata to the buffer.
-    memcpy(*data + object.data_size, metadata, metadata_size);
+    memcpy((*data)->mutable_data() + object.data_size, metadata, 
metadata_size);
   }
   // Increment the count of the number of instances of this object that this
   // client is using. A call to PlasmaClient::Release is required to decrement
@@ -203,10 +209,12 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
       ARROW_CHECK(object_entry->second->is_sealed)
           << "Plasma client called get on an unsealed object that it created";
       PlasmaObject* object = &object_entry->second->object;
-      object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      uint8_t* data = lookup_mmapped_file(object->handle.store_fd);
+      object_buffers[i].data =
+          std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
+      object_buffers[i].metadata = std::make_shared<Buffer>(
+          data + object->data_offset + object->data_size, 
object->metadata_size);
       object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
       object_buffers[i].metadata_size = object->metadata_size;
       // Increment the count of the number of instances of this object that 
this
       // client is using. A call to PlasmaClient::Release is required to
@@ -254,13 +262,15 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
       // The object was retrieved. The user will be responsible for releasing
       // this object.
       int fd = recv_fd(store_conn_);
-      ARROW_CHECK(fd >= 0);
-      object_buffers[i].data =
+      uint8_t* data =
           lookup_or_mmap(fd, object->handle.store_fd, 
object->handle.mmap_size);
+      ARROW_CHECK(fd >= 0);
       // Finish filling out the return values.
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data =
+          std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
+      object_buffers[i].metadata = std::make_shared<Buffer>(
+          data + object->data_offset + object->data_size, 
object->metadata_size);
       object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
       object_buffers[i].metadata_size = object->metadata_size;
       // Increment the count of the number of instances of this object that 
this
       // client is using. A call to PlasmaClient::Release is required to
@@ -438,14 +448,16 @@ static uint64_t compute_object_hash(const ObjectBuffer& 
obj_buffer) {
   XXH64_state_t hash_state;
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
   if (obj_buffer.data_size >= kBytesInMB) {
-    compute_object_hash_parallel(&hash_state,
-                                 reinterpret_cast<unsigned 
char*>(obj_buffer.data),
-                                 obj_buffer.data_size);
+    compute_object_hash_parallel(
+        &hash_state, reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
+        obj_buffer.data_size);
   } else {
-    XXH64_update(&hash_state, reinterpret_cast<unsigned 
char*>(obj_buffer.data),
+    XXH64_update(&hash_state,
+                 reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
                  obj_buffer.data_size);
   }
-  XXH64_update(&hash_state, reinterpret_cast<unsigned 
char*>(obj_buffer.metadata),
+  XXH64_update(&hash_state,
+               reinterpret_cast<const unsigned 
char*>(obj_buffer.metadata->data()),
                obj_buffer.metadata_size);
   return XXH64_digest(&hash_state);
 }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index cfd11c1..78793f1 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -26,11 +26,13 @@
 #include <string>
 #include <unordered_map>
 
+#include "arrow/buffer.h"
 #include "arrow/status.h"
 #include "arrow/util/visibility.h"
 #include "plasma/common.h"
 
 using arrow::Status;
+using arrow::Buffer;
 
 namespace plasma {
 
@@ -41,14 +43,16 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
 
 /// Object buffer data structure.
 struct ObjectBuffer {
+  /// The data buffer.
+  std::shared_ptr<Buffer> data;
   /// The size in bytes of the data object.
   int64_t data_size;
-  /// The address of the data object.
-  uint8_t* data;
+  /// The metadata buffer.
+  std::shared_ptr<Buffer> metadata;
   /// The metadata size in bytes.
   int64_t metadata_size;
-  /// The address of the metadata.
-  uint8_t* metadata;
+  /// The device number.
+  int device_num;
 };
 
 /// Configuration options for the plasma client.
@@ -107,11 +111,11 @@ class ARROW_EXPORT PlasmaClient {
   ///        should be NULL.
   /// \param metadata_size The size in bytes of the metadata. If there is no
   ///        metadata, this should be 0.
-  /// \param data The address of the newly created object will be written here.
+  /// \param data A buffer containing the address of the newly created object
+  ///        will be written here.
   /// \return The return status.
   Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* 
metadata,
-                int64_t metadata_size, uint8_t** data);
-
+                int64_t metadata_size, std::shared_ptr<Buffer>* data);
   /// Get some objects from the Plasma Store. This function will block until 
the
   /// objects have all been created and sealed in the Plasma Store or the
   /// timeout
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index f44ed25..5cd3063 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -71,7 +71,7 @@ TEST_F(TestPlasmaStore, ContainsTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
   ARROW_CHECK_OK(client_.Seal(object_id));
   // Avoid race condition of Plasma Manager waiting for notification.
@@ -94,16 +94,20 @@ TEST_F(TestPlasmaStore, GetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
+  std::shared_ptr<Buffer> data_buffer;
   uint8_t* data;
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  ARROW_CHECK_OK(
+      client_.Create(object_id, data_size, metadata, metadata_size, 
&data_buffer));
+  data = data_buffer->mutable_data();
   for (int64_t i = 0; i < data_size; i++) {
     data[i] = static_cast<uint8_t>(i % 4);
   }
   ARROW_CHECK_OK(client_.Seal(object_id));
 
   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  const uint8_t* object_data = object_buffer.data->data();
   for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_buffer.data[i]);
+    ASSERT_EQ(data[i], object_data[i]);
   }
 }
 
@@ -116,18 +120,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, 
metadata_size, &data));
-  data[0] = 1;
+  data->mutable_data()[0] = 1;
   ARROW_CHECK_OK(client_.Seal(object_id1));
 
   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, 
metadata_size, &data));
-  data[0] = 2;
+  data->mutable_data()[0] = 2;
   ARROW_CHECK_OK(client_.Seal(object_id2));
 
   ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
-  ASSERT_EQ(object_buffer[0].data[0], 1);
-  ASSERT_EQ(object_buffer[1].data[0], 2);
+  ASSERT_EQ(object_buffer[0].data->data()[0], 1);
+  ASSERT_EQ(object_buffer[1].data->data()[0], 2);
 }
 
 TEST_F(TestPlasmaStore, AbortTest) {
@@ -143,11 +147,13 @@ TEST_F(TestPlasmaStore, AbortTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
+  uint8_t* data_ptr;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  data_ptr = data->mutable_data();
   // Write some data.
   for (int64_t i = 0; i < data_size / 2; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+    data_ptr[i] = static_cast<uint8_t>(i % 4);
   }
   // Attempt to abort. Test that this fails before the first release.
   Status status = client_.Abort(object_id);
@@ -162,15 +168,17 @@ TEST_F(TestPlasmaStore, AbortTest) {
 
   // Create the object successfully this time.
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  data_ptr = data->mutable_data();
   for (int64_t i = 0; i < data_size; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+    data_ptr[i] = static_cast<uint8_t>(i % 4);
   }
   ARROW_CHECK_OK(client_.Seal(object_id));
 
   // Test that we can get the object.
   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  const uint8_t* buffer_ptr = object_buffer.data->data();
   for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_buffer.data[i]);
+    ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
   }
 }
 
@@ -187,7 +195,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, 
metadata_size, &data));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
@@ -227,7 +235,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
     int64_t data_size = 100;
     uint8_t metadata[] = {5};
     int64_t metadata_size = sizeof(metadata);
-    uint8_t* data;
+    std::shared_ptr<Buffer> data;
     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, 
metadata_size, &data));
 
     if (i % 3 == 0) {
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 9b3e409..abeec32 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Create(const CUniqueID& object_id, int64_t data_size,
                        const uint8_t* metadata, int64_t metadata_size,
-                       uint8_t** data)
+                       const shared_ptr[CBuffer]* data)
 
         CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
                     int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -118,9 +118,9 @@ cdef extern from "plasma/client.h" nogil:
 
     cdef struct CObjectBuffer" plasma::ObjectBuffer":
         int64_t data_size
-        uint8_t* data
+        shared_ptr[CBuffer] data
         int64_t metadata_size
-        uint8_t* metadata
+        shared_ptr[CBuffer] metadata
 
 
 def make_object_id(object_id):
@@ -245,10 +245,8 @@ cdef class PlasmaClient:
             check_status(self.client.get().Get(ids.data(), ids.size(),
                          timeout_ms, result[0].data()))
 
-    cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data,
+    cdef _make_plasma_buffer(self, ObjectID object_id, shared_ptr[CBuffer] 
buffer,
                              int64_t size):
-        cdef shared_ptr[CBuffer] buffer
-        buffer.reset(new CBuffer(data, size))
         result = PlasmaBuffer(object_id, self)
         result.init(buffer)
         return result
@@ -296,12 +294,12 @@ cdef class PlasmaClient:
                 not be created because the plasma store is unable to evict
                 enough objects to create room for it.
         """
-        cdef uint8_t* data
+        cdef shared_ptr[CBuffer] data
         with nogil:
             check_status(self.client.get().Create(object_id.data, data_size,
                                                   <uint8_t*>(metadata.data()),
                                                   metadata.size(), &data))
-        return self._make_mutable_plasma_buffer(object_id, data, data_size)
+        return self._make_mutable_plasma_buffer(object_id, 
data.get().mutable_data(), data_size)
 
     def get_buffers(self, object_ids, timeout_ms=-1):
         """

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to