This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0f38a95 ARROW-1947: [Plasma] Change Client Create and Get to use
Buffers
0f38a95 is described below
commit 0f38a9503bcde872b705976bd314ccb6c0d0f8d5
Author: Philipp Moritz <[email protected]>
AuthorDate: Fri Dec 29 14:19:38 2017 -0800
ARROW-1947: [Plasma] Change Client Create and Get to use Buffers
- Create now takes in a pointer to a shared pointer of Buffer and returns a
MutableBuffer.
- Object Buffers data and metadata are pointers to shared pointers of
Buffer.
Author: Philipp Moritz <[email protected]>
Author: William Paul <[email protected]>
Closes #1444 from Wapaul1/plasma_buffer_api and squashes the following
commits:
7fe1cee [Philipp Moritz] fix size of MutableBuffer returned by
plasma::Create
aeed751 [Philipp Moritz] more linting
b3274e0 [Philipp Moritz] fix
463dbeb [Philipp Moritz] fix plasma python extension
a055fa8 [Philipp Moritz] fix linting
fc62dda [William Paul] Added metadata buffer
4d8cbb8 [William Paul] Create and Get use Buffers now
---
cpp/src/plasma/client.cc | 44 +++++++++++++++++++++++--------------
cpp/src/plasma/client.h | 18 +++++++++------
cpp/src/plasma/test/client_tests.cc | 36 ++++++++++++++++++------------
python/pyarrow/plasma.pyx | 14 +++++-------
4 files changed, 67 insertions(+), 45 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 9bbafac..0dd1c44 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -40,6 +40,7 @@
#include <thread>
#include <vector>
+#include "arrow/buffer.h"
#include "plasma/common.h"
#include "plasma/fling.h"
#include "plasma/io.h"
@@ -53,6 +54,8 @@
namespace plasma {
+using arrow::MutableBuffer;
+
// Number of threads used for memcopy and hash computations.
constexpr int64_t kThreadPoolSize = 8;
constexpr int64_t kBytesInMB = 1 << 20;
@@ -145,7 +148,8 @@ void PlasmaClient::increment_object_count(const ObjectID&
object_id, PlasmaObjec
}
Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
- uint8_t* metadata, int64_t metadata_size,
uint8_t** data) {
+ uint8_t* metadata, int64_t metadata_size,
+ std::shared_ptr<Buffer>* data) {
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with
size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size,
metadata_size));
@@ -162,14 +166,16 @@ Status PlasmaClient::Create(const ObjectID& object_id,
int64_t data_size,
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
- *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
- object.data_offset;
+ *data = std::make_shared<MutableBuffer>(
+ lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+ object.data_offset,
+ data_size);
// If plasma_create is being called from a transfer, then we will not copy
the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
if (metadata != NULL) {
// Copy the metadata to the buffer.
- memcpy(*data + object.data_size, metadata, metadata_size);
+ memcpy((*data)->mutable_data() + object.data_size, metadata,
metadata_size);
}
// Increment the count of the number of instances of this object that this
// client is using. A call to PlasmaClient::Release is required to decrement
@@ -203,10 +209,12 @@ Status PlasmaClient::Get(const ObjectID* object_ids,
int64_t num_objects,
ARROW_CHECK(object_entry->second->is_sealed)
<< "Plasma client called get on an unsealed object that it created";
PlasmaObject* object = &object_entry->second->object;
- object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
- object_buffers[i].data = object_buffers[i].data + object->data_offset;
+ uint8_t* data = lookup_mmapped_file(object->handle.store_fd);
+ object_buffers[i].data =
+ std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
+ object_buffers[i].metadata = std::make_shared<Buffer>(
+ data + object->data_offset + object->data_size,
object->metadata_size);
object_buffers[i].data_size = object->data_size;
- object_buffers[i].metadata = object_buffers[i].data + object->data_size;
object_buffers[i].metadata_size = object->metadata_size;
// Increment the count of the number of instances of this object that
this
// client is using. A call to PlasmaClient::Release is required to
@@ -254,13 +262,15 @@ Status PlasmaClient::Get(const ObjectID* object_ids,
int64_t num_objects,
// The object was retrieved. The user will be responsible for releasing
// this object.
int fd = recv_fd(store_conn_);
- ARROW_CHECK(fd >= 0);
- object_buffers[i].data =
+ uint8_t* data =
lookup_or_mmap(fd, object->handle.store_fd,
object->handle.mmap_size);
+ ARROW_CHECK(fd >= 0);
// Finish filling out the return values.
- object_buffers[i].data = object_buffers[i].data + object->data_offset;
+ object_buffers[i].data =
+ std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
+ object_buffers[i].metadata = std::make_shared<Buffer>(
+ data + object->data_offset + object->data_size,
object->metadata_size);
object_buffers[i].data_size = object->data_size;
- object_buffers[i].metadata = object_buffers[i].data + object->data_size;
object_buffers[i].metadata_size = object->metadata_size;
// Increment the count of the number of instances of this object that
this
// client is using. A call to PlasmaClient::Release is required to
@@ -438,14 +448,16 @@ static uint64_t compute_object_hash(const ObjectBuffer&
obj_buffer) {
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (obj_buffer.data_size >= kBytesInMB) {
- compute_object_hash_parallel(&hash_state,
- reinterpret_cast<unsigned
char*>(obj_buffer.data),
- obj_buffer.data_size);
+ compute_object_hash_parallel(
+ &hash_state, reinterpret_cast<const unsigned
char*>(obj_buffer.data->data()),
+ obj_buffer.data_size);
} else {
- XXH64_update(&hash_state, reinterpret_cast<unsigned
char*>(obj_buffer.data),
+ XXH64_update(&hash_state,
+ reinterpret_cast<const unsigned
char*>(obj_buffer.data->data()),
obj_buffer.data_size);
}
- XXH64_update(&hash_state, reinterpret_cast<unsigned
char*>(obj_buffer.metadata),
+ XXH64_update(&hash_state,
+ reinterpret_cast<const unsigned
char*>(obj_buffer.metadata->data()),
obj_buffer.metadata_size);
return XXH64_digest(&hash_state);
}
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index cfd11c1..78793f1 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -26,11 +26,13 @@
#include <string>
#include <unordered_map>
+#include "arrow/buffer.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"
#include "plasma/common.h"
using arrow::Status;
+using arrow::Buffer;
namespace plasma {
@@ -41,14 +43,16 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
/// Object buffer data structure.
struct ObjectBuffer {
+ /// The data buffer.
+ std::shared_ptr<Buffer> data;
/// The size in bytes of the data object.
int64_t data_size;
- /// The address of the data object.
- uint8_t* data;
+ /// The metadata buffer.
+ std::shared_ptr<Buffer> metadata;
/// The metadata size in bytes.
int64_t metadata_size;
- /// The address of the metadata.
- uint8_t* metadata;
+ /// The device number.
+ int device_num;
};
/// Configuration options for the plasma client.
@@ -107,11 +111,11 @@ class ARROW_EXPORT PlasmaClient {
/// should be NULL.
/// \param metadata_size The size in bytes of the metadata. If there is no
/// metadata, this should be 0.
- /// \param data The address of the newly created object will be written here.
+ /// \param data A buffer containing the address of the newly created object
+ /// will be written here.
/// \return The return status.
Status Create(const ObjectID& object_id, int64_t data_size, uint8_t*
metadata,
- int64_t metadata_size, uint8_t** data);
-
+ int64_t metadata_size, std::shared_ptr<Buffer>* data);
/// Get some objects from the Plasma Store. This function will block until
the
/// objects have all been created and sealed in the Plasma Store or the
/// timeout
diff --git a/cpp/src/plasma/test/client_tests.cc
b/cpp/src/plasma/test/client_tests.cc
index f44ed25..5cd3063 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -71,7 +71,7 @@ TEST_F(TestPlasmaStore, ContainsTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- uint8_t* data;
+ std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data));
ARROW_CHECK_OK(client_.Seal(object_id));
// Avoid race condition of Plasma Manager waiting for notification.
@@ -94,16 +94,20 @@ TEST_F(TestPlasmaStore, GetTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data_buffer;
uint8_t* data;
- ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data));
+ ARROW_CHECK_OK(
+ client_.Create(object_id, data_size, metadata, metadata_size,
&data_buffer));
+ data = data_buffer->mutable_data();
for (int64_t i = 0; i < data_size; i++) {
data[i] = static_cast<uint8_t>(i % 4);
}
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ const uint8_t* object_data = object_buffer.data->data();
for (int64_t i = 0; i < data_size; i++) {
- ASSERT_EQ(data[i], object_buffer.data[i]);
+ ASSERT_EQ(data[i], object_data[i]);
}
}
@@ -116,18 +120,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- uint8_t* data;
+ std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata,
metadata_size, &data));
- data[0] = 1;
+ data->mutable_data()[0] = 1;
ARROW_CHECK_OK(client_.Seal(object_id1));
ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata,
metadata_size, &data));
- data[0] = 2;
+ data->mutable_data()[0] = 2;
ARROW_CHECK_OK(client_.Seal(object_id2));
ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
- ASSERT_EQ(object_buffer[0].data[0], 1);
- ASSERT_EQ(object_buffer[1].data[0], 2);
+ ASSERT_EQ(object_buffer[0].data->data()[0], 1);
+ ASSERT_EQ(object_buffer[1].data->data()[0], 2);
}
TEST_F(TestPlasmaStore, AbortTest) {
@@ -143,11 +147,13 @@ TEST_F(TestPlasmaStore, AbortTest) {
int64_t data_size = 4;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- uint8_t* data;
+ std::shared_ptr<Buffer> data;
+ uint8_t* data_ptr;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data));
+ data_ptr = data->mutable_data();
// Write some data.
for (int64_t i = 0; i < data_size / 2; i++) {
- data[i] = static_cast<uint8_t>(i % 4);
+ data_ptr[i] = static_cast<uint8_t>(i % 4);
}
// Attempt to abort. Test that this fails before the first release.
Status status = client_.Abort(object_id);
@@ -162,15 +168,17 @@ TEST_F(TestPlasmaStore, AbortTest) {
// Create the object successfully this time.
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data));
+ data_ptr = data->mutable_data();
for (int64_t i = 0; i < data_size; i++) {
- data[i] = static_cast<uint8_t>(i % 4);
+ data_ptr[i] = static_cast<uint8_t>(i % 4);
}
ARROW_CHECK_OK(client_.Seal(object_id));
// Test that we can get the object.
ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ const uint8_t* buffer_ptr = object_buffer.data->data();
for (int64_t i = 0; i < data_size; i++) {
- ASSERT_EQ(data[i], object_buffer.data[i]);
+ ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
}
}
@@ -187,7 +195,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- uint8_t* data;
+ std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client2_.Seal(object_id));
// Test that the first client can get the object.
@@ -227,7 +235,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
- uint8_t* data;
+ std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata,
metadata_size, &data));
if (i % 3 == 0) {
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 9b3e409..abeec32 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil:
CStatus Create(const CUniqueID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
- uint8_t** data)
+ const shared_ptr[CBuffer]* data)
CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -118,9 +118,9 @@ cdef extern from "plasma/client.h" nogil:
cdef struct CObjectBuffer" plasma::ObjectBuffer":
int64_t data_size
- uint8_t* data
+ shared_ptr[CBuffer] data
int64_t metadata_size
- uint8_t* metadata
+ shared_ptr[CBuffer] metadata
def make_object_id(object_id):
@@ -245,10 +245,8 @@ cdef class PlasmaClient:
check_status(self.client.get().Get(ids.data(), ids.size(),
timeout_ms, result[0].data()))
- cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data,
+ cdef _make_plasma_buffer(self, ObjectID object_id, shared_ptr[CBuffer]
buffer,
int64_t size):
- cdef shared_ptr[CBuffer] buffer
- buffer.reset(new CBuffer(data, size))
result = PlasmaBuffer(object_id, self)
result.init(buffer)
return result
@@ -296,12 +294,12 @@ cdef class PlasmaClient:
not be created because the plasma store is unable to evict
enough objects to create room for it.
"""
- cdef uint8_t* data
+ cdef shared_ptr[CBuffer] data
with nogil:
check_status(self.client.get().Create(object_id.data, data_size,
<uint8_t*>(metadata.data()),
metadata.size(), &data))
- return self._make_mutable_plasma_buffer(object_id, data, data_size)
+ return self._make_mutable_plasma_buffer(object_id,
data.get().mutable_data(), data_size)
def get_buffers(self, object_ids, timeout_ms=-1):
"""
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].