This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 54859c5 ARROW-1394: [Plasma] Add optional extension for allocating
memory on GPUs
54859c5 is described below
commit 54859c57527076c375bc26b0e252f6bf1258cfbc
Author: William Paul <[email protected]>
AuthorDate: Tue Feb 6 16:19:12 2018 -0800
ARROW-1394: [Plasma] Add optional extension for allocating memory on GPUs
**Done:**
- CudaIPCMemHandles are now returned as shared pointers instead of unique
pointers.
- Objects now have a device number; 0 for host memory, 1-infinity for GPU
memory.
- After being allocated and exported on the store, CudaIPCMemHandles are
sent using flatbuffers alongside the object metadata.
- Create and Get now return CudaBuffers for device numbers greater than
zero, with the API change in #1444 .
- There is an issue with the same object on the GPU being retrieved
multiples on the same process. CudaIPCMemHandles can only be mapped once per
process, so to solve this, there is a process-global unordered map
`gpu_object_map` of object id to a struct containing the mapped CudaBuffer and
count of how many clients are using the object. Removing entries would be done
when the count reaches zero on releasing the object.
**Todo:**
- The hash on the data done when the object is sealed is a constant zero
for objects on the GPU.
- The eviction policy currently has no notion of total size on GPUs, so GPU
objects will never be released or evicted.
- Similar to the last point, there is no configuration for how much memory
to use on the GPU or what GPU's to use (though this can be resolved by
`CUDA_VISIBLE_DEVICES`).
As a side note, it seems like what's currently done could be abstracted
into supporting arbitrary devices that can ship memory handles, though that is
out of scope for the ticket.
@pcmoritz @wesm
Author: William Paul <[email protected]>
Author: Philipp Moritz <[email protected]>
Closes #1445 from Wapaul1/plasma_gpu_module and squashes the following
commits:
d803e08b [Philipp Moritz] fixes
712e33b5 [Philipp Moritz] rebase and fixes
1a2b2fdd [William Paul] Changed CudaBuffer to MemHandle in store
45e7eefd [William Paul] Plasma GPU Module
8ad9949c [William Paul] Create and Get use Buffers now
---
cpp/src/arrow/gpu/cuda-test.cc | 4 +-
cpp/src/arrow/gpu/cuda_context.cc | 6 +-
cpp/src/arrow/gpu/cuda_context.h | 2 +-
cpp/src/arrow/gpu/cuda_memory.cc | 6 +-
cpp/src/arrow/gpu/cuda_memory.h | 4 +-
cpp/src/plasma/CMakeLists.txt | 19 +++-
cpp/src/plasma/client.cc | 175 ++++++++++++++++++++++-------
cpp/src/plasma/client.h | 17 ++-
cpp/src/plasma/format/plasma.fbs | 12 ++
cpp/src/plasma/plasma.h | 18 +++
cpp/src/plasma/protocol.cc | 80 ++++++++++---
cpp/src/plasma/protocol.h | 4 +-
cpp/src/plasma/store.cc | 98 +++++++++++-----
cpp/src/plasma/store.h | 12 +-
cpp/src/plasma/test/client_tests.cc | 78 +++++++++++++
cpp/src/plasma/test/serialization_tests.cc | 9 +-
16 files changed, 439 insertions(+), 105 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 7595f8b..ae411c9 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -94,14 +94,14 @@ TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) {
ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize));
// Export for IPC and serialize
- std::unique_ptr<CudaIpcMemHandle> ipc_handle;
+ std::shared_ptr<CudaIpcMemHandle> ipc_handle;
ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle));
std::shared_ptr<Buffer> serialized_handle;
ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle));
// Deserialize IPC handle and open
- std::unique_ptr<CudaIpcMemHandle> ipc_handle2;
+ std::shared_ptr<CudaIpcMemHandle> ipc_handle2;
ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(),
&ipc_handle2));
std::shared_ptr<CudaBuffer> ipc_buffer;
diff --git a/cpp/src/arrow/gpu/cuda_context.cc
b/cpp/src/arrow/gpu/cuda_context.cc
index 2f5ccb0..909c98a 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -89,11 +89,11 @@ class CudaContext::CudaContextImpl {
return Status::OK();
}
- Status ExportIpcBuffer(void* data, std::unique_ptr<CudaIpcMemHandle>*
handle) {
+ Status ExportIpcBuffer(void* data, std::shared_ptr<CudaIpcMemHandle>*
handle) {
CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
CUipcMemHandle cu_handle;
CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle,
reinterpret_cast<CUdeviceptr>(data)));
- *handle = std::unique_ptr<CudaIpcMemHandle>(new
CudaIpcMemHandle(&cu_handle));
+ *handle = std::shared_ptr<CudaIpcMemHandle>(new
CudaIpcMemHandle(&cu_handle));
return Status::OK();
}
@@ -241,7 +241,7 @@ Status CudaContext::Allocate(int64_t nbytes,
std::shared_ptr<CudaBuffer>* out) {
}
Status CudaContext::ExportIpcBuffer(void* data,
- std::unique_ptr<CudaIpcMemHandle>* handle)
{
+ std::shared_ptr<CudaIpcMemHandle>* handle)
{
return impl_->ExportIpcBuffer(data, handle);
}
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 6fc2e0d..50ea94c 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -88,7 +88,7 @@ class ARROW_EXPORT CudaContext : public
std::enable_shared_from_this<CudaContext
private:
CudaContext();
- Status ExportIpcBuffer(void* data, std::unique_ptr<CudaIpcMemHandle>*
handle);
+ Status ExportIpcBuffer(void* data, std::shared_ptr<CudaIpcMemHandle>*
handle);
Status CopyHostToDevice(void* dst, const void* src, int64_t nbytes);
Status CopyDeviceToHost(void* dst, const void* src, int64_t nbytes);
Status Free(void* device_ptr, int64_t nbytes);
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index cbf0441..183cbcb 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -54,8 +54,8 @@ CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
CudaIpcMemHandle::~CudaIpcMemHandle() {}
Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle,
- std::unique_ptr<CudaIpcMemHandle>* handle)
{
- *handle = std::unique_ptr<CudaIpcMemHandle>(new
CudaIpcMemHandle(opaque_handle));
+ std::shared_ptr<CudaIpcMemHandle>* handle)
{
+ *handle = std::shared_ptr<CudaIpcMemHandle>(new
CudaIpcMemHandle(opaque_handle));
return Status::OK();
}
@@ -111,7 +111,7 @@ Status CudaBuffer::CopyFromHost(const int64_t position,
const void* data,
return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
}
-Status CudaBuffer::ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle) {
+Status CudaBuffer::ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle) {
if (is_ipc_) {
return Status::Invalid("Buffer has already been exported for IPC");
}
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 9376b4b..3f3dd2f 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -64,7 +64,7 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
///
/// \note After calling this function, this device memory will not be freed
/// when the CudaBuffer is destructed
- virtual Status ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle);
+ virtual Status ExportForIpc(std::shared_ptr<CudaIpcMemHandle>* handle);
std::shared_ptr<CudaContext> context() const { return context_; }
@@ -95,7 +95,7 @@ class ARROW_EXPORT CudaIpcMemHandle {
/// \param[out] handle the CudaIpcMemHandle instance
/// \return Status
static Status FromBuffer(const void* opaque_handle,
- std::unique_ptr<CudaIpcMemHandle>* handle);
+ std::shared_ptr<CudaIpcMemHandle>* handle);
/// \brief Write CudaIpcMemHandle to a Buffer
/// \param[in] pool a MemoryPool to allocate memory from
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index 4b76f25..3448d00 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -79,11 +79,20 @@ set(PLASMA_SRCS
thirdparty/ae/ae.c
thirdparty/xxhash.cc)
+set(PLASMA_LINK_LIBS arrow_static)
+
+if (ARROW_GPU)
+ set(PLASMA_LINK_LIBS ${PLASMA_LINK_LIBS} arrow_gpu_shared)
+ add_definitions(-DPLASMA_GPU)
+endif()
+
+
+
ADD_ARROW_LIB(plasma
SOURCES ${PLASMA_SRCS}
DEPENDENCIES gen_plasma_fbs
- SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT}
arrow_static
- STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT}
arrow_static)
+ SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT}
${PLASMA_LINK_LIBS}
+ STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT}
${PLASMA_LINK_LIBS})
# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
# malloc.cc; we set it here regardless of whether we do a debug or release
build.
@@ -113,7 +122,7 @@ if ("${COMPILER_FAMILY}" STREQUAL "gcc")
endif()
add_executable(plasma_store store.cc)
-target_link_libraries(plasma_store plasma_static)
+target_link_libraries(plasma_store plasma_static ${PLASMA_LINK_LIBS})
# Headers: top level
install(FILES
@@ -143,6 +152,6 @@ install(
#######################################
ADD_ARROW_TEST(test/serialization_tests)
-ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
+ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static
${PLASMA_LINK_LIBS})
ADD_ARROW_TEST(test/client_tests)
-ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
+ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static ${PLASMA_LINK_LIBS})
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 6e9b696..679d9ce 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -37,6 +37,7 @@
#include <unistd.h>
#include <algorithm>
+#include <mutex>
#include <thread>
#include <vector>
@@ -47,6 +48,15 @@
#include "plasma/plasma.h"
#include "plasma/protocol.h"
+#ifdef PLASMA_GPU
+#include "arrow/gpu/cuda_api.h"
+
+using arrow::gpu::CudaBuffer;
+using arrow::gpu::CudaBufferWriter;
+using arrow::gpu::CudaContext;
+using arrow::gpu::CudaDeviceManager;
+#endif
+
#define XXH_STATIC_LINKING_ONLY
#include "thirdparty/xxhash.h"
@@ -75,7 +85,26 @@ struct ObjectInUseEntry {
bool is_sealed;
};
-PlasmaClient::PlasmaClient() {}
+#ifdef PLASMA_GPU
+struct GpuProcessHandle {
+ /// Pointer to CUDA buffer that is backing this GPU object.
+ std::shared_ptr<CudaBuffer> ptr;
+ /// Number of client using this GPU object.
+ int client_count;
+};
+
+// This is necessary as IPC handles can only be mapped once per process.
+// Thus if multiple clients in the same process get the same gpu object,
+// they need to access the same mapped CudaBuffer.
+static std::unordered_map<ObjectID, GpuProcessHandle*, UniqueIDHasher>
gpu_object_map;
+static std::mutex gpu_mutex;
+#endif
+
+PlasmaClient::PlasmaClient() {
+#ifdef PLASMA_GPU
+ CudaDeviceManager::GetInstance(&manager_);
+#endif
+}
PlasmaClient::~PlasmaClient() {}
@@ -127,16 +156,18 @@ void PlasmaClient::increment_object_count(const ObjectID&
object_id, PlasmaObjec
objects_in_use_[object_id]->count = 0;
objects_in_use_[object_id]->is_sealed = is_sealed;
object_entry = objects_in_use_[object_id].get();
- // Increment the count of the number of objects in the memory-mapped file
- // that are being used. The corresponding decrement should happen in
- // PlasmaClient::Release.
- auto entry = mmap_table_.find(object->store_fd);
- ARROW_CHECK(entry != mmap_table_.end());
- ARROW_CHECK(entry->second.count >= 0);
- // Update the in_use_object_bytes_.
- in_use_object_bytes_ +=
- (object_entry->object.data_size + object_entry->object.metadata_size);
- entry->second.count += 1;
+ if (object->device_num == 0) {
+ // Increment the count of the number of objects in the memory-mapped file
+ // that are being used. The corresponding decrement should happen in
+ // PlasmaClient::Release.
+ auto entry = mmap_table_.find(object->store_fd);
+ ARROW_CHECK(entry != mmap_table_.end());
+ ARROW_CHECK(entry->second.count >= 0);
+ // Update the in_use_object_bytes_.
+ in_use_object_bytes_ +=
+ (object_entry->object.data_size +
object_entry->object.metadata_size);
+ entry->second.count += 1;
+ }
} else {
object_entry = elem->second.get();
ARROW_CHECK(object_entry->count > 0);
@@ -149,10 +180,11 @@ void PlasmaClient::increment_object_count(const ObjectID&
object_id, PlasmaObjec
Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
uint8_t* metadata, int64_t metadata_size,
- std::shared_ptr<Buffer>* data) {
+ std::shared_ptr<Buffer>* data, int device_num) {
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with
size "
<< data_size << " and metadata size " << metadata_size;
- RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size,
metadata_size));
+ RETURN_NOT_OK(
+ SendCreateRequest(store_conn_, object_id, data_size, metadata_size,
device_num));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply,
&buffer));
ObjectID id;
@@ -163,21 +195,41 @@ Status PlasmaClient::Create(const ObjectID& object_id,
int64_t data_size,
ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd,
&mmap_size));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
- int fd = recv_fd(store_conn_);
- ARROW_CHECK(fd >= 0) << "recv not successful";
- ARROW_CHECK(object.data_size == data_size);
- ARROW_CHECK(object.metadata_size == metadata_size);
- // The metadata should come right after the data.
- ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
- *data = std::make_shared<MutableBuffer>(
- lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
- // If plasma_create is being called from a transfer, then we will not copy
the
- // metadata here. The metadata will be written along with the data streamed
- // from the transfer.
- if (metadata != NULL) {
- // Copy the metadata to the buffer.
- memcpy((*data)->mutable_data() + object.data_size, metadata,
metadata_size);
+ if (device_num == 0) {
+ int fd = recv_fd(store_conn_);
+ ARROW_CHECK(fd >= 0) << "recv not successful";
+ ARROW_CHECK(object.data_size == data_size);
+ ARROW_CHECK(object.metadata_size == metadata_size);
+ // The metadata should come right after the data.
+ ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
+ *data = std::make_shared<MutableBuffer>(
+ lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset,
data_size);
+ // If plasma_create is being called from a transfer, then we will not copy
the
+ // metadata here. The metadata will be written along with the data streamed
+ // from the transfer.
+ if (metadata != NULL) {
+ // Copy the metadata to the buffer.
+ memcpy((*data)->mutable_data() + object.data_size, metadata,
metadata_size);
+ }
+ } else {
+#ifdef PLASMA_GPU
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ std::shared_ptr<CudaContext> context;
+ RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context));
+ GpuProcessHandle* handle = new GpuProcessHandle();
+ RETURN_NOT_OK(context->OpenIpcBuffer(*object.ipc_handle, &handle->ptr));
+ gpu_object_map[object_id] = handle;
+ *data = handle->ptr;
+ if (metadata != NULL) {
+ // Copy the metadata to the buffer.
+ CudaBufferWriter writer(std::dynamic_pointer_cast<CudaBuffer>(*data));
+ writer.WriteAt(object.data_size, metadata, metadata_size);
+ }
+#else
+ ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
+#endif
}
+
// Increment the count of the number of instances of this object that this
// client is using. A call to PlasmaClient::Release is required to decrement
// this
@@ -210,13 +262,27 @@ Status PlasmaClient::Get(const ObjectID* object_ids,
int64_t num_objects,
ARROW_CHECK(object_entry->second->is_sealed)
<< "Plasma client called get on an unsealed object that it created";
PlasmaObject* object = &object_entry->second->object;
- uint8_t* data = lookup_mmapped_file(object->store_fd);
- object_buffers[i].data =
- std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
- object_buffers[i].metadata = std::make_shared<Buffer>(
- data + object->data_offset + object->data_size,
object->metadata_size);
+ if (object->device_num == 0) {
+ uint8_t* data = lookup_mmapped_file(object->store_fd);
+ object_buffers[i].data =
+ std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
+ object_buffers[i].metadata = std::make_shared<Buffer>(
+ data + object->data_offset + object->data_size,
object->metadata_size);
+ } else {
+#ifdef PLASMA_GPU
+ std::shared_ptr<CudaBuffer> gpu_handle =
+ gpu_object_map.find(object_ids[i])->second->ptr;
+ object_buffers[i].data =
+ std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
+ object_buffers[i].metadata = std::make_shared<CudaBuffer>(
+ gpu_handle, object->data_size, object->metadata_size);
+#else
+ ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
+#endif
+ }
object_buffers[i].data_size = object->data_size;
object_buffers[i].metadata_size = object->metadata_size;
+ object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that
this
// client is using. A call to PlasmaClient::Release is required to
// decrement this
@@ -265,14 +331,40 @@ Status PlasmaClient::Get(const ObjectID* object_ids,
int64_t num_objects,
// If we are here, the object was not currently in use, so we need to
// process the reply from the object store.
if (object->data_size != -1) {
- uint8_t* data = lookup_mmapped_file(object->store_fd);
- // Finish filling out the return values.
- object_buffers[i].data =
- std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
- object_buffers[i].metadata = std::make_shared<Buffer>(
- data + object->data_offset + object->data_size,
object->metadata_size);
+ if (object->device_num == 0) {
+ uint8_t* data = lookup_mmapped_file(object->store_fd);
+ // Finish filling out the return values.
+ object_buffers[i].data =
+ std::make_shared<Buffer>(data + object->data_offset,
object->data_size);
+ object_buffers[i].metadata = std::make_shared<Buffer>(
+ data + object->data_offset + object->data_size,
object->metadata_size);
+ } else {
+#ifdef PLASMA_GPU
+ std::lock_guard<std::mutex> lock(gpu_mutex);
+ auto handle = gpu_object_map.find(object_ids[i]);
+ std::shared_ptr<CudaBuffer> gpu_handle;
+ if (handle == gpu_object_map.end()) {
+ std::shared_ptr<CudaContext> context;
+ RETURN_NOT_OK(manager_->GetContext(object->device_num - 1,
&context));
+ GpuProcessHandle* obj_handle = new GpuProcessHandle();
+ RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle,
&obj_handle->ptr));
+ gpu_object_map[object_ids[i]] = obj_handle;
+ gpu_handle = obj_handle->ptr;
+ } else {
+ handle->second->client_count += 1;
+ gpu_handle = handle->second->ptr;
+ }
+ object_buffers[i].data =
+ std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
+ object_buffers[i].metadata = std::make_shared<CudaBuffer>(
+ gpu_handle, object->data_size, object->metadata_size);
+#else
+ ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
+#endif
+ }
object_buffers[i].data_size = object->data_size;
object_buffers[i].metadata_size = object->metadata_size;
+ object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that
this
// client is using. A call to PlasmaClient::Release is required to
// decrement this
@@ -358,6 +450,9 @@ Status PlasmaClient::Release(const ObjectID& object_id) {
// If there are too many bytes in use by the client or if there are too many
// pending release calls, and there are at least some pending release calls
in
// the release_history list, then release some objects.
+
+ // TODO(wap) Evicition policy only works on host memory, and thus objects
+ // on the GPU cannot be released currently.
while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ /
100) ||
release_history_.size() > config_.release_delay) &&
release_history_.size() > 0) {
@@ -447,6 +542,10 @@ static inline bool
compute_object_hash_parallel(XXH64_state_t* hash_state,
static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
XXH64_state_t hash_state;
+ if (obj_buffer.device_num != 0) {
+ // TODO(wap): Create cuda program to hash data on gpu.
+ return 0;
+ }
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (obj_buffer.data_size >= kBytesInMB) {
compute_object_hash_parallel(
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index d6372f4..7c27c47 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -30,6 +30,9 @@
#include "arrow/status.h"
#include "arrow/util/visibility.h"
#include "plasma/common.h"
+#ifdef PLASMA_GPU
+#include "arrow/gpu/cuda_api.h"
+#endif
using arrow::Buffer;
using arrow::Status;
@@ -111,11 +114,15 @@ class ARROW_EXPORT PlasmaClient {
/// should be NULL.
/// \param metadata_size The size in bytes of the metadata. If there is no
/// metadata, this should be 0.
- /// \param data A buffer containing the address of the newly created object
- /// will be written here.
+ /// \param data The address of the newly created object will be written here.
+ /// \param device_num The number of the device where the object is being
+ /// created.
+ /// device_num = 0 corresponds to the host,
+ /// device_num = 1 corresponds to GPU0,
+ /// device_num = 2 corresponds to GPU1, etc.
/// \return The return status.
Status Create(const ObjectID& object_id, int64_t data_size, uint8_t*
metadata,
- int64_t metadata_size, std::shared_ptr<Buffer>* data);
+ int64_t metadata_size, std::shared_ptr<Buffer>* data, int
device_num = 0);
/// Get some objects from the Plasma Store. This function will block until
the
/// objects have all been created and sealed in the Plasma Store or the
/// timeout
@@ -368,6 +375,10 @@ class ARROW_EXPORT PlasmaClient {
/// information to make sure that it does not delay in releasing so much
/// memory that the store is unable to evict enough objects to free up space.
int64_t store_capacity_;
+#ifdef PLASMA_GPU
+ /// Cuda Device Manager.
+ arrow::gpu::CudaDeviceManager* manager_;
+#endif
};
} // namespace plasma
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 33803f7..66cb00f 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -97,6 +97,8 @@ struct PlasmaObjectSpec {
metadata_offset: ulong;
// The size in bytes of the metadata.
metadata_size: ulong;
+ // Device to create buffer on.
+ device_num: int;
}
table PlasmaCreateRequest {
@@ -106,6 +108,12 @@ table PlasmaCreateRequest {
data_size: ulong;
// The size of the object's metadata in bytes.
metadata_size: ulong;
+ // Device to create buffer on.
+ device_num: int;
+}
+
+table CudaHandle {
+ handle: [ubyte];
}
table PlasmaCreateReply {
@@ -121,6 +129,8 @@ table PlasmaCreateReply {
// The size in bytes of the segment for the store file descriptor (needed to
// call mmap).
mmap_size: long;
+ // CUDA IPC Handle for objects on GPU.
+ ipc_handle: CudaHandle;
}
table PlasmaAbortRequest {
@@ -171,6 +181,8 @@ table PlasmaGetReply {
// Size in bytes of the segment for each store file descriptor (needed to
call
// mmap). This list must have the same length as store_fds.
mmap_sizes: [long];
+ // The number of elements in both object_ids and plasma_objects arrays must
agree.
+ handles: [CudaHandle];
}
table PlasmaReleaseRequest {
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index bb9cdae..901601f 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -39,6 +39,12 @@
#include "plasma/common.h"
#include "plasma/common_generated.h"
+#ifdef PLASMA_GPU
+#include "arrow/gpu/cuda_api.h"
+
+using arrow::gpu::CudaIpcMemHandle;
+#endif
+
namespace plasma {
#define HANDLE_SIGPIPE(s, fd_) \
@@ -67,6 +73,10 @@ typedef std::unordered_map<ObjectID, ObjectRequest,
UniqueIDHasher> ObjectReques
// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
struct PlasmaObject {
+#ifdef PLASMA_GPU
+ // IPC handle for Cuda.
+ std::shared_ptr<CudaIpcMemHandle> ipc_handle;
+#endif
/// The file descriptor of the memory mapped file in the store. It is used as
/// a unique identifier of the file in the client to look up the
corresponding
/// file descriptor on the client's side.
@@ -79,6 +89,8 @@ struct PlasmaObject {
int64_t data_size;
/// The size in bytes of the metadata.
int64_t metadata_size;
+ /// Device number object is on.
+ int device_num;
};
enum object_state {
@@ -104,12 +116,18 @@ struct ObjectTableEntry {
ObjectInfoT info;
/// Memory mapped file containing the object.
int fd;
+ /// Device number.
+ int device_num;
/// Size of the underlying map.
int64_t map_size;
/// Offset from the base of the mmap.
ptrdiff_t offset;
/// Pointer to the object data. Needed to free the object.
uint8_t* pointer;
+#ifdef PLASMA_GPU
+ /// IPC GPU handle to share with clients.
+ std::shared_ptr<CudaIpcMemHandle> ipc_handle;
+#endif
/// Set of clients currently using this object.
std::unordered_set<Client*> clients;
/// The state of the object, e.g., whether it is open or sealed.
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 6c0bc0c..9443762 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -23,6 +23,10 @@
#include "plasma/common.h"
#include "plasma/io.h"
+#ifdef ARROW_GPU
+#include "arrow/gpu/cuda_api.h"
+#endif
+
namespace plasma {
using flatbuffers::uoffset_t;
@@ -55,21 +59,22 @@ Status PlasmaSend(int sock, int64_t message_type,
flatbuffers::FlatBufferBuilder
// Create messages.
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
- int64_t metadata_size) {
+ int64_t metadata_size, int device_num) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePlasmaCreateRequest(fbb,
fbb.CreateString(object_id.binary()),
- data_size, metadata_size);
+ data_size, metadata_size,
device_num);
return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
}
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
- int64_t* data_size, int64_t* metadata_size) {
+ int64_t* data_size, int64_t* metadata_size, int*
device_num) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
+ *device_num = message->device_num();
return Status::OK();
}
@@ -77,10 +82,31 @@ Status SendCreateReply(int sock, ObjectID object_id,
PlasmaObject* object, int e
int64_t mmap_size) {
flatbuffers::FlatBufferBuilder fbb;
PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset,
object->data_size,
- object->metadata_offset,
object->metadata_size);
- auto message = CreatePlasmaCreateReply(
- fbb, fbb.CreateString(object_id.binary()), &plasma_object,
- static_cast<PlasmaError>(error_code), object->store_fd, mmap_size);
+ object->metadata_offset,
object->metadata_size,
+ object->device_num);
+ auto object_string = fbb.CreateString(object_id.binary());
+#ifdef PLASMA_GPU
+ flatbuffers::Offset<CudaHandle> ipc_handle;
+ if (object->device_num != 0) {
+ std::shared_ptr<arrow::Buffer> handle;
+ object->ipc_handle->Serialize(arrow::default_memory_pool(), &handle);
+ ipc_handle = CreateCudaHandle(fbb, fbb.CreateVector(handle->data(),
handle->size()));
+ }
+#endif
+ PlasmaCreateReplyBuilder crb(fbb);
+ crb.add_error(static_cast<PlasmaError>(error_code));
+ crb.add_plasma_object(&plasma_object);
+ crb.add_object_id(object_string);
+ crb.add_store_fd(object->store_fd);
+ crb.add_mmap_size(mmap_size);
+ if (object->device_num != 0) {
+#ifdef PLASMA_GPU
+ crb.add_ipc_handle(ipc_handle);
+#else
+ ARROW_LOG(FATAL) << "This should be unreachable.";
+#endif
+ }
+ auto message = crb.Finish();
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
}
@@ -99,6 +125,13 @@ Status ReadCreateReply(uint8_t* data, size_t size,
ObjectID* object_id,
*store_fd = message->store_fd();
*mmap_size = message->mmap_size();
+ object->device_num = message->plasma_object()->device_num();
+#ifdef PLASMA_GPU
+ if (object->device_num != 0) {
+ CudaIpcMemHandle::FromBuffer(message->ipc_handle()->handle()->data(),
+ &object->ipc_handle);
+ }
+#endif
return plasma_error_status(message->error());
}
@@ -396,18 +429,25 @@ Status SendGetReply(
flatbuffers::FlatBufferBuilder fbb;
std::vector<PlasmaObjectSpec> objects;
- ARROW_CHECK(store_fds.size() == mmap_sizes.size());
-
+ std::vector<flatbuffers::Offset<CudaHandle>> handles;
for (int64_t i = 0; i < num_objects; ++i) {
const PlasmaObject& object = plasma_objects[object_ids[i]];
objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset,
object.data_size,
object.metadata_offset,
- object.metadata_size));
+ object.metadata_size,
object.device_num));
+#ifdef PLASMA_GPU
+ if (object.device_num != 0) {
+ std::shared_ptr<arrow::Buffer> handle;
+ object.ipc_handle->Serialize(arrow::default_memory_pool(), &handle);
+ handles.push_back(
+ CreateCudaHandle(fbb, fbb.CreateVector(handle->data(),
handle->size())));
+ }
+#endif
}
- auto message =
- CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVectorOfStructs(objects.data(),
num_objects),
- fbb.CreateVector(store_fds),
fbb.CreateVector(mmap_sizes));
+ auto message = CreatePlasmaGetReply(
+ fbb, to_flatbuffer(&fbb, object_ids, num_objects),
+ fbb.CreateVectorOfStructs(objects.data(), num_objects),
fbb.CreateVector(store_fds),
+ fbb.CreateVector(mmap_sizes), fbb.CreateVector(handles));
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
}
@@ -416,6 +456,9 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID
object_ids[],
std::vector<int>& store_fds, std::vector<int64_t>&
mmap_sizes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
+#ifdef PLASMA_GPU
+ int handle_pos = 0;
+#endif
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] =
ObjectID::from_binary(message->object_ids()->Get(i)->str());
@@ -427,6 +470,14 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID
object_ids[],
plasma_objects[i].data_size = object->data_size();
plasma_objects[i].metadata_offset = object->metadata_offset();
plasma_objects[i].metadata_size = object->metadata_size();
+ plasma_objects[i].device_num = object->device_num();
+#ifdef PLASMA_GPU
+ if (object->device_num() != 0) {
+
CudaIpcMemHandle::FromBuffer(message->handles()->Get(handle_pos)->handle()->data(),
+ &plasma_objects[i].ipc_handle);
+ handle_pos++;
+ }
+#endif
}
ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
@@ -435,7 +486,6 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID
object_ids[],
}
return Status::OK();
}
-
// Fetch messages.
Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t
num_objects) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 101a3fa..86b3577 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -43,10 +43,10 @@ Status PlasmaReceive(int sock, int64_t message_type,
std::vector<uint8_t>* buffe
/* Plasma Create message functions. */
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
- int64_t metadata_size);
+ int64_t metadata_size, int device_num);
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
- int64_t* data_size, int64_t* metadata_size);
+ int64_t* data_size, int64_t* metadata_size, int*
device_num);
Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int
error,
int64_t mmap_size);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 316a27f..5e7b452 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -57,6 +57,14 @@
#include "plasma/io.h"
#include "plasma/malloc.h"
+#ifdef PLASMA_GPU
+#include "arrow/gpu/cuda_api.h"
+
+using arrow::gpu::CudaBuffer;
+using arrow::gpu::CudaContext;
+using arrow::gpu::CudaDeviceManager;
+#endif
+
namespace plasma {
extern "C" {
@@ -104,6 +112,9 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t
system_memory, std::string dir
store_info_.memory_capacity = system_memory;
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
+#ifdef PLASMA_GPU
+ CudaDeviceManager::GetInstance(&manager_);
+#endif
}
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
@@ -142,7 +153,7 @@ void
PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client*
// Create a new object buffer in the hash table.
int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
- int64_t metadata_size, Client* client,
+ int64_t metadata_size, int device_num, Client*
client,
PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
if (store_info_.objects.count(object_id) != 0) {
@@ -152,7 +163,14 @@ int PlasmaStore::create_object(const ObjectID& object_id,
int64_t data_size,
}
// Try to evict objects until there is enough space.
uint8_t* pointer;
- do {
+#ifdef PLASMA_GPU
+ std::shared_ptr<CudaBuffer> gpu_handle;
+ std::shared_ptr<CudaContext> context_;
+ if (device_num != 0) {
+ manager_->GetContext(device_num - 1, &context_);
+ }
+#endif
+ while (true) {
// Allocate space for the new object. We use dlmemalign instead of dlmalloc
// in order to align the allocated region to a 64-byte boundary. This is
not
// strictly necessary, but it is an optimization that could speed up the
@@ -160,27 +178,37 @@ int PlasmaStore::create_object(const ObjectID& object_id,
int64_t data_size,
// plasma_client.cc). Note that even though this pointer is 64-byte
aligned,
// it is not guaranteed that the corresponding pointer in the client will
be
// 64-byte aligned, but in practice it often will be.
- pointer =
- reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size +
metadata_size));
- if (pointer == NULL) {
- // Tell the eviction policy how much space we need to create this object.
- std::vector<ObjectID> objects_to_evict;
- bool success =
- eviction_policy_.require_space(data_size + metadata_size,
&objects_to_evict);
- delete_objects(objects_to_evict);
- // Return an error to the client if not enough space could be freed to
- // create the object.
- if (!success) {
- return PlasmaError_OutOfMemory;
+ if (device_num == 0) {
+ pointer =
+ reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size +
metadata_size));
+ if (pointer == NULL) {
+ // Tell the eviction policy how much space we need to create this
object.
+ std::vector<ObjectID> objects_to_evict;
+ bool success =
+ eviction_policy_.require_space(data_size + metadata_size,
&objects_to_evict);
+ delete_objects(objects_to_evict);
+ // Return an error to the client if not enough space could be freed to
+ // create the object.
+ if (!success) {
+ return PlasmaError_OutOfMemory;
+ }
+ } else {
+ break;
}
+ } else {
+#ifdef PLASMA_GPU
+ context_->Allocate(data_size + metadata_size, &gpu_handle);
+ break;
+#endif
}
- } while (pointer == NULL);
- int fd;
- int64_t map_size;
- ptrdiff_t offset;
- get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
- assert(fd != -1);
-
+ }
+ int fd = -1;
+ int64_t map_size = 0;
+ ptrdiff_t offset = 0;
+ if (device_num == 0) {
+ get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
+ assert(fd != -1);
+ }
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry->object_id = object_id;
entry->info.object_id = object_id.binary();
@@ -192,13 +220,20 @@ int PlasmaStore::create_object(const ObjectID& object_id,
int64_t data_size,
entry->map_size = map_size;
entry->offset = offset;
entry->state = PLASMA_CREATED;
-
+ entry->device_num = device_num;
+#ifdef PLASMA_GPU
+ if (device_num != 0) {
+ gpu_handle->ExportForIpc(&entry->ipc_handle);
+ result->ipc_handle = entry->ipc_handle;
+ }
+#endif
store_info_.objects[object_id] = std::move(entry);
result->store_fd = fd;
result->data_offset = offset;
result->metadata_offset = offset + data_size;
result->data_size = data_size;
result->metadata_size = metadata_size;
+ result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
// immediately before the call to add_client_to_object_clients so that the
// eviction policy does not have an opportunity to evict the object.
@@ -212,11 +247,17 @@ void PlasmaObject_init(PlasmaObject* object,
ObjectTableEntry* entry) {
DCHECK(object != NULL);
DCHECK(entry != NULL);
DCHECK(entry->state == PLASMA_SEALED);
+#ifdef PLASMA_GPU
+ if (entry->device_num != 0) {
+ object->ipc_handle = entry->ipc_handle;
+ }
+#endif
object->store_fd = entry->fd;
object->data_offset = entry->offset;
object->metadata_offset = entry->offset + entry->info.data_size;
object->data_size = entry->info.data_size;
object->metadata_size = entry->info.metadata_size;
+ object->device_num = entry->device_num;
}
void PlasmaStore::return_from_get(GetRequest* get_req) {
@@ -227,7 +268,7 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
for (const auto& object_id : get_req->object_ids) {
PlasmaObject& object = get_req->objects[object_id];
int fd = object.store_fd;
- if (object.data_size != -1 && fds_to_send.count(fd) == 0) {
+ if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) {
fds_to_send.insert(fd);
store_fds.push_back(fd);
mmap_sizes.push_back(get_mmap_size(fd));
@@ -646,18 +687,19 @@ Status PlasmaStore::process_message(Client* client) {
case MessageType_PlasmaCreateRequest: {
int64_t data_size;
int64_t metadata_size;
- RETURN_NOT_OK(
- ReadCreateRequest(input, input_size, &object_id, &data_size,
&metadata_size));
+ int device_num;
+ RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id,
&data_size,
+ &metadata_size, &device_num));
int error_code =
- create_object(object_id, data_size, metadata_size, client, &object);
+ create_object(object_id, data_size, metadata_size, device_num,
client, &object);
int64_t mmap_size = 0;
- if (error_code == PlasmaError_OK) {
+ if (error_code == PlasmaError_OK && device_num == 0) {
mmap_size = get_mmap_size(object.store_fd);
}
HANDLE_SIGPIPE(
SendCreateReply(client->fd, object_id, &object, error_code,
mmap_size),
client->fd);
- if (error_code == PlasmaError_OK) {
+ if (error_code == PlasmaError_OK && device_num == 0) {
warn_if_sigpipe(send_fd(client->fd, object.store_fd), client->fd);
}
} break;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 7e716d2..d97cdf7 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -65,6 +65,13 @@ class PlasmaStore {
/// @param object_id Object ID of the object to be created.
/// @param data_size Size in bytes of the object to be created.
/// @param metadata_size Size in bytes of the object metadata.
+ /// @param device_num The number of the device where the object is being
+ /// created.
+ /// device_num = 0 corresponds to the host,
+ /// device_num = 1 corresponds to GPU0,
+ /// device_num = 2 corresponds to GPU1, etc.
+ /// @param client The client that created the object.
+ /// @param result The object that has been created.
/// @return One of the following error codes:
/// - PlasmaError_OK, if the object was created successfully.
/// - PlasmaError_ObjectExists, if an object with this ID is already
@@ -74,7 +81,7 @@ class PlasmaStore {
/// cannot create the object. In this case, the client should not call
/// plasma_release.
int create_object(const ObjectID& object_id, int64_t data_size, int64_t
metadata_size,
- Client* client, PlasmaObject* result);
+ int device_num, Client* client, PlasmaObject* result);
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
@@ -187,6 +194,9 @@ class PlasmaStore {
std::unordered_map<int, NotificationQueue> pending_notifications_;
std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
+#ifdef PLASMA_GPU
+ arrow::gpu::CudaDeviceManager* manager_;
+#endif
};
} // namespace plasma
diff --git a/cpp/src/plasma/test/client_tests.cc
b/cpp/src/plasma/test/client_tests.cc
index f19c2bf..15f9e7c 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -295,6 +295,84 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
}
}
+#ifdef PLASMA_GPU
+using arrow::gpu::CudaBuffer;
+using arrow::gpu::CudaBufferReader;
+using arrow::gpu::CudaBufferWriter;
+
+TEST_F(TestPlasmaStore, GetGPUTest) {
+ ObjectID object_id = ObjectID::from_random();
+ ObjectBuffer object_buffer;
+
+ // Test for object non-existence.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+ ASSERT_EQ(object_buffer.data_size, -1);
+
+ // Test for the object being in local Plasma store.
+ // First create object.
+ uint8_t data[] = {4, 5, 3, 1};
+ int64_t data_size = sizeof(data);
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data_buffer;
+ std::shared_ptr<CudaBuffer> gpu_buffer;
+ ARROW_CHECK_OK(
+ client_.Create(object_id, data_size, metadata, metadata_size,
&data_buffer, 1));
+ gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(data_buffer);
+ CudaBufferWriter writer(gpu_buffer);
+ writer.Write(data, data_size);
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(object_buffer.data);
+ CudaBufferReader reader(gpu_buffer);
+ uint8_t read_data[data_size];
+ int64_t read_data_size;
+ reader.Read(data_size, &read_data_size, read_data);
+ for (int64_t i = 0; i < data_size; i++) {
+ ASSERT_EQ(data[i], read_data[i]);
+ }
+}
+
+TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for object non-existence on the first client.
+ bool has_object;
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, false);
+
+ // Test for the object being in local Plasma store.
+ // First create and seal object on the second client.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(
+ client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1));
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ // Test that the first client can get the object.
+ ObjectBuffer object_buffer;
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+
+ // Test that one client disconnecting does not interfere with the other.
+ // First create object on the second client.
+ object_id = ObjectID::from_random();
+ ARROW_CHECK_OK(
+ client2_.Create(object_id, data_size, metadata, metadata_size, &data,
1));
+ // Disconnect the first client.
+ ARROW_CHECK_OK(client_.Disconnect());
+ // Test that the second client can seal and get the created object.
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+}
+
+#endif
+
} // namespace plasma
int main(int argc, char** argv) {
diff --git a/cpp/src/plasma/test/serialization_tests.cc
b/cpp/src/plasma/test/serialization_tests.cc
index 656b2cc..03b1428 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -68,6 +68,7 @@ PlasmaObject random_plasma_object(void) {
object.metadata_offset = random + 2;
object.data_size = random + 3;
object.metadata_size = random + 4;
+ object.device_num = 0;
return object;
}
@@ -76,16 +77,20 @@ TEST(PlasmaSerialization, CreateRequest) {
ObjectID object_id1 = ObjectID::from_random();
int64_t data_size1 = 42;
int64_t metadata_size1 = 11;
- ARROW_CHECK_OK(SendCreateRequest(fd, object_id1, data_size1,
metadata_size1));
+ int device_num1 = 0;
+ ARROW_CHECK_OK(
+ SendCreateRequest(fd, object_id1, data_size1, metadata_size1,
device_num1));
std::vector<uint8_t> data = read_message_from_file(fd,
MessageType_PlasmaCreateRequest);
ObjectID object_id2;
int64_t data_size2;
int64_t metadata_size2;
+ int device_num2;
ARROW_CHECK_OK(ReadCreateRequest(data.data(), data.size(), &object_id2,
&data_size2,
- &metadata_size2));
+ &metadata_size2, &device_num2));
ASSERT_EQ(data_size1, data_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(device_num1, device_num2);
close(fd);
}
--
To stop receiving notification emails like this one, please contact
[email protected].