http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/extension.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc new file mode 100644 index 0000000..5d61e33 --- /dev/null +++ b/cpp/src/plasma/extension.cc @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/extension.h" + +#include <algorithm> +#include <vector> + +#include "plasma/client.h" +#include "plasma/common.h" +#include "plasma/io.h" +#include "plasma/protocol.h" + +PyObject* PlasmaOutOfMemoryError; +PyObject* PlasmaObjectExistsError; + +PyObject* PyPlasma_connect(PyObject* self, PyObject* args) { + const char* store_socket_name; + const char* manager_socket_name; + int release_delay; + if (!PyArg_ParseTuple( + args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) { + return NULL; + } + PlasmaClient* client = new PlasmaClient(); + ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay)); + + return PyCapsule_New(client, "plasma", NULL); +} + +PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) { + PyObject* client_capsule; + if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; } + PlasmaClient* client; + ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client)); + ARROW_CHECK_OK(client->Disconnect()); + /* We use the context of the connection capsule to indicate if the connection + * is still active (if the context is NULL) or if it is closed (if the context + * is (void*) 0x1). This is neccessary because the primary pointer of the + * capsule cannot be NULL. */ + PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1)); + Py_RETURN_NONE; +} + +PyObject* PyPlasma_create(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + Py_ssize_t size; + PyObject* metadata; + if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client, + PyStringToUniqueID, &object_id, &size, &metadata)) { + return NULL; + } + if (!PyByteArray_Check(metadata)) { + PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray"); + return NULL; + } + uint8_t* data; + Status s = client->Create(object_id, size, + reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)), + PyByteArray_Size(metadata), &data); + if (s.IsPlasmaObjectExists()) { + PyErr_SetString(PlasmaObjectExistsError, + "An object with this ID already exists in the plasma " + "store."); + return NULL; + } + if (s.IsPlasmaStoreFull()) { + PyErr_SetString(PlasmaOutOfMemoryError, + "The plasma store ran out of memory and could not create " + "this object."); + return NULL; + } + ARROW_CHECK(s.ok()); + +#if PY_MAJOR_VERSION >= 3 + return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE); +#else + return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size); +#endif +} + +PyObject* PyPlasma_hash(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, + &object_id)) { + return NULL; + } + unsigned char digest[kDigestSize]; + bool success = plasma_compute_object_hash(client, object_id, digest); + if (success) { + PyObject* digest_string = + PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize); + return digest_string; + } else { + Py_RETURN_NONE; + } +} + +PyObject* PyPlasma_seal(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, + &object_id)) { + return NULL; + } + ARROW_CHECK_OK(client->Seal(object_id)); + Py_RETURN_NONE; +} + +PyObject* PyPlasma_release(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, + &object_id)) { + return NULL; + } + ARROW_CHECK_OK(client->Release(object_id)); + Py_RETURN_NONE; +} + +PyObject* PyPlasma_get(PyObject* self, PyObject* args) { + PlasmaClient* client; + PyObject* object_id_list; + Py_ssize_t timeout_ms; + if (!PyArg_ParseTuple( + args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) { + return NULL; + } + + Py_ssize_t num_object_ids = PyList_Size(object_id_list); + std::vector<ObjectID> object_ids(num_object_ids); + std::vector<ObjectBuffer> object_buffers(num_object_ids); + + for (int i = 0; i < num_object_ids; ++i) { + PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } + + Py_BEGIN_ALLOW_THREADS; + ARROW_CHECK_OK( + client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data())); + Py_END_ALLOW_THREADS; + + PyObject* returns = PyList_New(num_object_ids); + for (int i = 0; i < num_object_ids; ++i) { + if (object_buffers[i].data_size != -1) { + /* The object was retrieved, so return the object. */ + PyObject* t = PyTuple_New(2); + Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size); + Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size); +#if PY_MAJOR_VERSION >= 3 + char* data = reinterpret_cast<char*>(object_buffers[i].data); + char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata); + PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ)); + PyTuple_SET_ITEM( + t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ)); +#else + void* data = reinterpret_cast<void*>(object_buffers[i].data); + void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata); + PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size)); + PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size)); +#endif + ARROW_CHECK(PyList_SetItem(returns, i, t) == 0); + } else { + /* The object was not retrieved, so just add None to the list of return + * values. */ + Py_INCREF(Py_None); + ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0); + } + } + return returns; +} + +PyObject* PyPlasma_contains(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, + &object_id)) { + return NULL; + } + bool has_object; + ARROW_CHECK_OK(client->Contains(object_id, &has_object)); + + if (has_object) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} + +PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) { + PlasmaClient* client; + PyObject* object_id_list; + if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) { + return NULL; + } + if (client->get_manager_fd() == -1) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + Py_ssize_t n = PyList_Size(object_id_list); + ObjectID* object_ids = new ObjectID[n]; + for (int i = 0; i < n; ++i) { + PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } + ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids)); + delete[] object_ids; + Py_RETURN_NONE; +} + +PyObject* PyPlasma_wait(PyObject* self, PyObject* args) { + PlasmaClient* client; + PyObject* object_id_list; + Py_ssize_t timeout; + int num_returns; + if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list, + &timeout, &num_returns)) { + return NULL; + } + Py_ssize_t n = PyList_Size(object_id_list); + + if (client->get_manager_fd() == -1) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + if (num_returns < 0) { + PyErr_SetString( + PyExc_RuntimeError, "The argument num_returns cannot be less than zero."); + return NULL; + } + if (num_returns > n) { + PyErr_SetString(PyExc_RuntimeError, + "The argument num_returns cannot be greater than len(object_ids)"); + return NULL; + } + int64_t threshold = 1 << 30; + if (timeout > threshold) { + PyErr_SetString( + PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30."); + return NULL; + } + + std::vector<ObjectRequest> object_requests(n); + for (int i = 0; i < n; ++i) { + ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i), + &object_requests[i].object_id) == 1); + object_requests[i].type = PLASMA_QUERY_ANYWHERE; + } + /* Drop the global interpreter lock while we are waiting, so other threads can + * run. */ + int num_return_objects; + Py_BEGIN_ALLOW_THREADS; + ARROW_CHECK_OK( + client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects)); + Py_END_ALLOW_THREADS; + + int num_to_return = std::min(num_return_objects, num_returns); + PyObject* ready_ids = PyList_New(num_to_return); + PyObject* waiting_ids = PySet_New(object_id_list); + int num_returned = 0; + for (int i = 0; i < n; ++i) { + if (num_returned == num_to_return) { break; } + if (object_requests[i].status == ObjectStatus_Local || + object_requests[i].status == ObjectStatus_Remote) { + PyObject* ready = PyBytes_FromStringAndSize( + reinterpret_cast<char*>(&object_requests[i].object_id), + sizeof(object_requests[i].object_id)); + PyList_SetItem(ready_ids, num_returned, ready); + PySet_Discard(waiting_ids, ready); + num_returned += 1; + } else { + ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent); + } + } + ARROW_CHECK(num_returned == num_to_return); + /* Return both the ready IDs and the remaining IDs. */ + PyObject* t = PyTuple_New(2); + PyTuple_SetItem(t, 0, ready_ids); + PyTuple_SetItem(t, 1, waiting_ids); + return t; +} + +PyObject* PyPlasma_evict(PyObject* self, PyObject* args) { + PlasmaClient* client; + Py_ssize_t num_bytes; + if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) { + return NULL; + } + int64_t evicted_bytes; + ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes)); + return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes)); +} + +PyObject* PyPlasma_delete(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, + &object_id)) { + return NULL; + } + ARROW_CHECK_OK(client->Delete(object_id)); + Py_RETURN_NONE; +} + +PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) { + PlasmaClient* client; + ObjectID object_id; + const char* addr; + int port; + if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client, + PyStringToUniqueID, &object_id, &addr, &port)) { + return NULL; + } + + if (client->get_manager_fd() == -1) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + + ARROW_CHECK_OK(client->Transfer(addr, port, object_id)); + Py_RETURN_NONE; +} + +PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) { + PlasmaClient* client; + if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; } + + int sock; + ARROW_CHECK_OK(client->Subscribe(&sock)); + return PyLong_FromLong(sock); +} + +PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) { + int plasma_sock; + + if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; } + /* Receive object notification from the plasma connection socket. If the + * object was added, return a tuple of its fields: ObjectID, data_size, + * metadata_size. If the object was deleted, data_size and metadata_size will + * be set to -1. */ + uint8_t* notification = read_message_async(plasma_sock); + if (notification == NULL) { + PyErr_SetString( + PyExc_RuntimeError, "Failed to read object notification from Plasma socket"); + return NULL; + } + auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification); + /* Construct a tuple from object_info and return. */ + PyObject* t = PyTuple_New(3); + PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(), + object_info->object_id()->size())); + if (object_info->is_deletion()) { + PyTuple_SetItem(t, 1, PyLong_FromLong(-1)); + PyTuple_SetItem(t, 2, PyLong_FromLong(-1)); + } else { + PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size())); + PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size())); + } + + delete[] notification; + return t; +} + +static PyMethodDef plasma_methods[] = { + {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."}, + {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."}, + {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."}, + {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."}, + {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."}, + {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."}, + {"contains", PyPlasma_contains, METH_VARARGS, + "Does the plasma store contain this plasma object?"}, + {"fetch", PyPlasma_fetch, METH_VARARGS, + "Fetch the object from another plasma manager instance."}, + {"wait", PyPlasma_wait, METH_VARARGS, + "Wait until num_returns objects in object_ids are ready."}, + {"evict", PyPlasma_evict, METH_VARARGS, + "Evict some objects until we recover some number of bytes."}, + {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."}, + {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."}, + {"transfer", PyPlasma_transfer, METH_VARARGS, + "Transfer object to another plasma manager."}, + {"subscribe", PyPlasma_subscribe, METH_VARARGS, + "Subscribe to the plasma notification socket."}, + {"receive_notification", PyPlasma_receive_notification, METH_VARARGS, + "Receive next notification from plasma notification socket."}, + {NULL} /* Sentinel */ +}; + +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, "libplasma", /* m_name */ + "A Python client library for plasma.", /* m_doc */ + 0, /* m_size */ + plasma_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + +#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ +#define PyMODINIT_FUNC void +#endif + +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif + +MOD_INIT(libplasma) { +#if PY_MAJOR_VERSION >= 3 + PyObject* m = PyModule_Create(&moduledef); +#else + PyObject* m = + Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma."); +#endif + + /* Create a custom exception for when an object ID is reused. */ + char plasma_object_exists_error[] = "plasma_object_exists.error"; + PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL); + Py_INCREF(PlasmaObjectExistsError); + PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError); + /* Create a custom exception for when the plasma store is out of memory. */ + char plasma_out_of_memory_error[] = "plasma_out_of_memory.error"; + PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL); + Py_INCREF(PlasmaOutOfMemoryError); + PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError); + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif +}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/extension.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h new file mode 100644 index 0000000..cee30ab --- /dev/null +++ b/cpp/src/plasma/extension.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_EXTENSION_H +#define PLASMA_EXTENSION_H + +#undef _XOPEN_SOURCE +#undef _POSIX_C_SOURCE +#include <Python.h> + +#include "bytesobject.h" // NOLINT + +#include "plasma/client.h" +#include "plasma/common.h" + +static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) { + if (PyCapsule_IsValid(object, "plasma")) { + *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma")); + return 1; + } else { + PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule"); + return 0; + } +} + +int PyStringToUniqueID(PyObject* object, ObjectID* object_id) { + if (PyBytes_Check(object)) { + memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID)); + return 1; + } else { + PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); + return 0; + } +} + +#endif // PLASMA_EXTENSION_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/fling.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc new file mode 100644 index 0000000..79da4f4 --- /dev/null +++ b/cpp/src/plasma/fling.cc @@ -0,0 +1,90 @@ +// Copyright 2013 Sharvil Nanavati +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "plasma/fling.h" + +#include <string.h> + +void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) { + iov->iov_base = buf; + iov->iov_len = 1; + + msg->msg_iov = iov; + msg->msg_iovlen = 1; + msg->msg_control = buf; + msg->msg_controllen = buf_len; + msg->msg_name = NULL; + msg->msg_namelen = 0; +} + +int send_fd(int conn, int fd) { + struct msghdr msg; + struct iovec iov; + char buf[CMSG_SPACE(sizeof(int))]; + memset(&buf, 0, CMSG_SPACE(sizeof(int))); + + init_msg(&msg, &iov, buf, sizeof(buf)); + + struct cmsghdr* header = CMSG_FIRSTHDR(&msg); + header->cmsg_level = SOL_SOCKET; + header->cmsg_type = SCM_RIGHTS; + header->cmsg_len = CMSG_LEN(sizeof(int)); + *reinterpret_cast<int*>(CMSG_DATA(header)) = fd; + + // Send file descriptor. + ssize_t r = sendmsg(conn, &msg, 0); + if (r >= 0) { + return 0; + } else { + return static_cast<int>(r); + } +} + +int recv_fd(int conn) { + struct msghdr msg; + struct iovec iov; + char buf[CMSG_SPACE(sizeof(int))]; + init_msg(&msg, &iov, buf, sizeof(buf)); + + if (recvmsg(conn, &msg, 0) == -1) return -1; + + int found_fd = -1; + int oh_noes = 0; + for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL; + header = CMSG_NXTHDR(&msg, header)) + if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) { + ssize_t count = + (header->cmsg_len - (CMSG_DATA(header) - (unsigned char*)header)) / sizeof(int); + for (int i = 0; i < count; ++i) { + int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i]; + if (found_fd == -1) { + found_fd = fd; + } else { + close(fd); + oh_noes = 1; + } + } + } + + // The sender sent us more than one file descriptor. We've closed + // them all to prevent fd leaks but notify the caller that we got + // a bad message. + if (oh_noes) { + close(found_fd); + errno = EBADMSG; + return -1; + } + + return found_fd; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/fling.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/fling.h b/cpp/src/plasma/fling.h new file mode 100644 index 0000000..78ac9d1 --- /dev/null +++ b/cpp/src/plasma/fling.h @@ -0,0 +1,52 @@ +// Copyright 2013 Sharvil Nanavati +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// FLING: Exchanging file descriptors over sockets +// +// This is a little library for sending file descriptors over a socket +// between processes. The reason for doing that (as opposed to using +// filenames to share the files) is so (a) no files remain in the +// filesystem after all the processes terminate, (b) to make sure that +// there are no name collisions and (c) to be able to control who has +// access to the data. +// +// Most of the code is from https://github.com/sharvil/flingfd + +#include <errno.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +// This is neccessary for Mac OS X, see http://www.apuebook.com/faqs2e.html +// (10). +#if !defined(CMSG_SPACE) && !defined(CMSG_LEN) +#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len)) +#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len)) +#endif + +void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len); + +// Send a file descriptor over a unix domain socket. +// +// @param conn Unix domain socket to send the file descriptor over. +// @param fd File descriptor to send over. +// @return Status code which is < 0 on failure. +int send_fd(int conn, int fd); + +// Receive a file descriptor over a unix domain socket. +// +// @param conn Unix domain socket to receive the file descriptor from. +// @return File descriptor or a value < 0 on failure. +int recv_fd(int conn); http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/format/common.fbs ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs new file mode 100644 index 0000000..4d7d285 --- /dev/null +++ b/cpp/src/plasma/format/common.fbs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Object information data structure. +table ObjectInfo { + // Object ID of this object. + object_id: string; + // Number of bytes the content of this object occupies in memory. + data_size: long; + // Number of bytes the metadata of this object occupies in memory. + metadata_size: long; + // Unix epoch of when this object was created. + create_time: long; + // How long creation of this object took. + construct_duration: long; + // Hash of the object content. + digest: string; + // Specifies if this object was deleted or added. + is_deletion: bool; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/format/plasma.fbs ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs new file mode 100644 index 0000000..23782ad --- /dev/null +++ b/cpp/src/plasma/format/plasma.fbs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Plasma protocol specification + +enum MessageType:int { + // Create a new object. + PlasmaCreateRequest = 1, + PlasmaCreateReply, + // Seal an object. + PlasmaSealRequest, + PlasmaSealReply, + // Get an object that is stored on the local Plasma store. + PlasmaGetRequest, + PlasmaGetReply, + // Release an object. + PlasmaReleaseRequest, + PlasmaReleaseReply, + // Delete an object. + PlasmaDeleteRequest, + PlasmaDeleteReply, + // Get status of an object. + PlasmaStatusRequest, + PlasmaStatusReply, + // See if the store contains an object (will be deprecated). + PlasmaContainsRequest, + PlasmaContainsReply, + // Get information for a newly connecting client. + PlasmaConnectRequest, + PlasmaConnectReply, + // Make room for new objects in the plasma store. + PlasmaEvictRequest, + PlasmaEvictReply, + // Fetch objects from remote Plasma stores. + PlasmaFetchRequest, + // Wait for objects to be ready either from local or remote Plasma stores. + PlasmaWaitRequest, + PlasmaWaitReply, + // Subscribe to a list of objects or to all objects. + PlasmaSubscribeRequest, + // Unsubscribe. + PlasmaUnsubscribeRequest, + // Sending and receiving data. + // PlasmaDataRequest initiates sending the data, there will be one + // such message per data transfer. + PlasmaDataRequest, + // PlasmaDataReply contains the actual data and is sent back to the + // object store that requested the data. For each transfer, multiple + // reply messages get sent. Each one contains a fixed number of bytes. + PlasmaDataReply, + // Object notifications. + PlasmaNotification +} + +enum PlasmaError:int { + // Operation was successful. + OK, + // Trying to create an object that already exists. + ObjectExists, + // Trying to access an object that doesn't exist. + ObjectNonexistent, + // Trying to create an object but there isn't enough space in the store. + OutOfMemory +} + +// Plasma store messages + +struct PlasmaObjectSpec { + // Index of the memory segment (= memory mapped file) that + // this object is allocated in. + segment_index: int; + // Size in bytes of this segment (needed to call mmap). + mmap_size: ulong; + // The offset in bytes in the memory mapped file of the data. + data_offset: ulong; + // The size in bytes of the data. + data_size: ulong; + // The offset in bytes in the memory mapped file of the metadata. + metadata_offset: ulong; + // The size in bytes of the metadata. + metadata_size: ulong; +} + +table PlasmaCreateRequest { + // ID of the object to be created. + object_id: string; + // The size of the object's data in bytes. + data_size: ulong; + // The size of the object's metadata in bytes. + metadata_size: ulong; +} + +table PlasmaCreateReply { + // ID of the object that was created. + object_id: string; + // The object that is returned with this reply. + plasma_object: PlasmaObjectSpec; + // Error that occurred for this call. + error: PlasmaError; +} + +table PlasmaSealRequest { + // ID of the object to be sealed. + object_id: string; + // Hash of the object data. + digest: string; +} + +table PlasmaSealReply { + // ID of the object that was sealed. + object_id: string; + // Error code. + error: PlasmaError; +} + +table PlasmaGetRequest { + // IDs of the objects stored at local Plasma store we are getting. + object_ids: [string]; + // The number of milliseconds before the request should timeout. + timeout_ms: long; +} + +table PlasmaGetReply { + // IDs of the objects being returned. + // This number can be smaller than the number of requested + // objects if not all requested objects are stored and sealed + // in the local Plasma store. + object_ids: [string]; + // Plasma object information, in the same order as their IDs. + plasma_objects: [PlasmaObjectSpec]; + // The number of elements in both object_ids and plasma_objects arrays must agree. +} + +table PlasmaReleaseRequest { + // ID of the object to be released. + object_id: string; +} + +table PlasmaReleaseReply { + // ID of the object that was released. + object_id: string; + // Error code. + error: PlasmaError; +} + +table PlasmaDeleteRequest { + // ID of the object to be deleted. + object_id: string; +} + +table PlasmaDeleteReply { + // ID of the object that was deleted. + object_id: string; + // Error code. + error: PlasmaError; +} + +table PlasmaStatusRequest { + // IDs of the objects stored at local Plasma store we request the status of. + object_ids: [string]; +} + +enum ObjectStatus:int { + // Object is stored in the local Plasma Store. + Local = 1, + // Object is stored on a remote Plasma store, and it is not stored on the + // local Plasma Store. + Remote, + // Object is not stored in the system. + Nonexistent, + // Object is currently transferred from a remote Plasma store the the local + // Plasma Store. + Transfer +} + +table PlasmaStatusReply { + // IDs of the objects being returned. + object_ids: [string]; + // Status of the object. + status: [ObjectStatus]; +} + +// PlasmaContains is a subset of PlasmaStatus which does not +// involve the plasma manager, only the store. We should consider +// unifying them in the future and deprecating PlasmaContains. + +table PlasmaContainsRequest { + // ID of the object we are querying. + object_id: string; +} + +table PlasmaContainsReply { + // ID of the object we are querying. + object_id: string; + // 1 if the object is in the store and 0 otherwise. + has_object: int; +} + +// PlasmaConnect is used by a plasma client the first time it connects with the +// store. This is not really necessary, but is used to get some information +// about the store such as its memory capacity. + +table PlasmaConnectRequest { +} + +table PlasmaConnectReply { + // The memory capacity of the store. + memory_capacity: long; +} + +table PlasmaEvictRequest { + // Number of bytes that shall be freed. + num_bytes: ulong; +} + +table PlasmaEvictReply { + // Number of bytes that have been freed. + num_bytes: ulong; +} + +table PlasmaFetchRequest { + // IDs of objects to be gotten. + object_ids: [string]; +} + +table ObjectRequestSpec { + // ID of the object. + object_id: string; + // The type of the object. This specifies whether we + // will be waiting for an object store in the local or + // global Plasma store. + type: int; +} + +table PlasmaWaitRequest { + // Array of object requests whose status we are asking for. + object_requests: [ObjectRequestSpec]; + // Number of objects expected to be returned, if available. + num_ready_objects: int; + // timeout + timeout: long; +} + +table ObjectReply { + // ID of the object. + object_id: string; + // The object status. This specifies where the object is stored. + status: int; +} + +table PlasmaWaitReply { + // Array of object requests being returned. + object_requests: [ObjectReply]; + // Number of objects expected to be returned, if available. + num_ready_objects: int; +} + +table PlasmaSubscribeRequest { +} + +table PlasmaDataRequest { + // ID of the object that is requested. + object_id: string; + // The host address where the data shall be sent to. + address: string; + // The port of the manager the data shall be sent to. + port: int; +} + +table PlasmaDataReply { + // ID of the object that will be sent. + object_id: string; + // Size of the object data in bytes. + object_size: ulong; + // Size of the metadata in bytes. + metadata_size: ulong; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc new file mode 100644 index 0000000..5875ebb --- /dev/null +++ b/cpp/src/plasma/io.cc @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/io.h" + +#include "plasma/common.h" + +using arrow::Status; + +/* Number of times we try binding to a socket. */ +#define NUM_BIND_ATTEMPTS 5 +#define BIND_TIMEOUT_MS 100 + +/* Number of times we try connecting to a socket. */ +#define NUM_CONNECT_ATTEMPTS 50 +#define CONNECT_TIMEOUT_MS 100 + +Status WriteBytes(int fd, uint8_t* cursor, size_t length) { + ssize_t nbytes = 0; + size_t bytesleft = length; + size_t offset = 0; + while (bytesleft > 0) { + /* While we haven't written the whole message, write to the file descriptor, + * advance the cursor, and decrease the amount left to write. */ + nbytes = write(fd, cursor + offset, bytesleft); + if (nbytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } + return Status::IOError(std::string(strerror(errno))); + } else if (nbytes == 0) { + return Status::IOError("Encountered unexpected EOF"); + } + ARROW_CHECK(nbytes > 0); + bytesleft -= nbytes; + offset += nbytes; + } + + return Status::OK(); +} + +Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) { + int64_t version = PLASMA_PROTOCOL_VERSION; + RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version))); + RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type))); + RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length))); + return WriteBytes(fd, bytes, length * sizeof(char)); +} + +Status ReadBytes(int fd, uint8_t* cursor, size_t length) { + ssize_t nbytes = 0; + /* Termination condition: EOF or read 'length' bytes total. */ + size_t bytesleft = length; + size_t offset = 0; + while (bytesleft > 0) { + nbytes = read(fd, cursor + offset, bytesleft); + if (nbytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } + return Status::IOError(std::string(strerror(errno))); + } else if (0 == nbytes) { + return Status::IOError("Encountered unexpected EOF"); + } + ARROW_CHECK(nbytes > 0); + bytesleft -= nbytes; + offset += nbytes; + } + + return Status::OK(); +} + +Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) { + int64_t version; + RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)), + *type = DISCONNECT_CLIENT); + ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version; + size_t length; + RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)), + *type = DISCONNECT_CLIENT); + RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)), + *type = DISCONNECT_CLIENT); + if (length > buffer->size()) { buffer->resize(length); } + RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT); + return Status::OK(); +} + +int bind_ipc_sock(const std::string& pathname, bool shall_listen) { + struct sockaddr_un socket_address; + int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (socket_fd < 0) { + ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname; + return -1; + } + /* Tell the system to allow the port to be reused. */ + int on = 1; + if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on), + sizeof(on)) < 0) { + ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname; + close(socket_fd); + return -1; + } + + unlink(pathname.c_str()); + memset(&socket_address, 0, sizeof(socket_address)); + socket_address.sun_family = AF_UNIX; + if (pathname.size() + 1 > sizeof(socket_address.sun_path)) { + ARROW_LOG(ERROR) << "Socket pathname is too long."; + close(socket_fd); + return -1; + } + strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1); + + if (bind(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) != 0) { + ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname; + close(socket_fd); + return -1; + } + if (shall_listen && listen(socket_fd, 128) == -1) { + ARROW_LOG(ERROR) << "Could not listen to socket " << pathname; + close(socket_fd); + return -1; + } + return socket_fd; +} + +int connect_ipc_sock_retry( + const std::string& pathname, int num_retries, int64_t timeout) { + /* Pick the default values if the user did not specify. */ + if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; } + if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; } + + int fd = -1; + for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { + fd = connect_ipc_sock(pathname); + if (fd >= 0) { break; } + if (num_attempts == 0) { + ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname; + } + /* Sleep for timeout milliseconds. */ + usleep(static_cast<int>(timeout * 1000)); + } + /* If we could not connect to the socket, exit. */ + if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; } + return fd; +} + +int connect_ipc_sock(const std::string& pathname) { + struct sockaddr_un socket_address; + int socket_fd; + + socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (socket_fd < 0) { + ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname; + return -1; + } + + memset(&socket_address, 0, sizeof(socket_address)); + socket_address.sun_family = AF_UNIX; + if (pathname.size() + 1 > sizeof(socket_address.sun_path)) { + ARROW_LOG(ERROR) << "Socket pathname is too long."; + return -1; + } + strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1); + + if (connect(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) != + 0) { + close(socket_fd); + return -1; + } + + return socket_fd; +} + +int AcceptClient(int socket_fd) { + int client_fd = accept(socket_fd, NULL, NULL); + if (client_fd < 0) { + ARROW_LOG(ERROR) << "Error reading from socket."; + return -1; + } + return client_fd; +} + +uint8_t* read_message_async(int sock) { + int64_t size; + Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t)); + if (!s.ok()) { + /* The other side has closed the socket. */ + ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; + close(sock); + return NULL; + } + uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size)); + s = ReadBytes(sock, message, size); + if (!s.ok()) { + /* The other side has closed the socket. */ + ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; + close(sock); + return NULL; + } + return message; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/io.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h new file mode 100644 index 0000000..43c3fb5 --- /dev/null +++ b/cpp/src/plasma/io.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_IO_H +#define PLASMA_IO_H + +#include <inttypes.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <unistd.h> + +#include <string> +#include <vector> + +#include "arrow/status.h" + +// TODO(pcm): Replace our own custom message header (message type, +// message length, plasma protocol verion) with one that is serialized +// using flatbuffers. +#define PLASMA_PROTOCOL_VERSION 0x0000000000000000 +#define DISCONNECT_CLIENT 0 + +arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length); + +arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes); + +arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length); + +arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer); + +int bind_ipc_sock(const std::string& pathname, bool shall_listen); + +int connect_ipc_sock(const std::string& pathname); + +int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout); + +int AcceptClient(int socket_fd); + +uint8_t* read_message_async(int sock); + +#endif // PLASMA_IO_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/malloc.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc new file mode 100644 index 0000000..e7ffd1a --- /dev/null +++ b/cpp/src/plasma/malloc.cc @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/malloc.h" + +#include <assert.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/mman.h> +#include <unistd.h> + +#include <unordered_map> + +#include "plasma/common.h" + +extern "C" { +void* fake_mmap(size_t); +int fake_munmap(void*, int64_t); + +#define MMAP(s) fake_mmap(s) +#define MUNMAP(a, s) fake_munmap(a, s) +#define DIRECT_MMAP(s) fake_mmap(s) +#define DIRECT_MUNMAP(a, s) fake_munmap(a, s) +#define USE_DL_PREFIX +#define HAVE_MORECORE 0 +#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T +#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) + +#include "thirdparty/dlmalloc.c" + +#undef MMAP +#undef MUNMAP +#undef DIRECT_MMAP +#undef DIRECT_MUNMAP +#undef USE_DL_PREFIX +#undef HAVE_MORECORE +#undef DEFAULT_GRANULARITY +} + +struct mmap_record { + int fd; + int64_t size; +}; + +namespace { + +/** Hashtable that contains one entry per segment that we got from the OS + * via mmap. Associates the address of that segment with its file descriptor + * and size. */ +std::unordered_map<void*, mmap_record> mmap_records; + +} /* namespace */ + +constexpr int GRANULARITY_MULTIPLIER = 2; + +static void* pointer_advance(void* p, ptrdiff_t n) { + return (unsigned char*)p + n; +} + +static void* pointer_retreat(void* p, ptrdiff_t n) { + return (unsigned char*)p - n; +} + +static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { + return (unsigned char const*)pto - (unsigned char const*)pfrom; +} + +/* Create a buffer. This is creating a temporary file and then + * immediately unlinking it so we do not leave traces in the system. */ +int create_buffer(int64_t size) { + int fd; +#ifdef _WIN32 + if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, + (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size, + NULL)) { + fd = -1; + } +#else +#ifdef __linux__ + constexpr char file_template[] = "/dev/shm/plasmaXXXXXX"; +#else + constexpr char file_template[] = "/tmp/plasmaXXXXXX"; +#endif + char file_name[32]; + strncpy(file_name, file_template, 32); + fd = mkstemp(file_name); + if (fd < 0) return -1; + FILE* file = fdopen(fd, "a+"); + if (!file) { + close(fd); + return -1; + } + if (unlink(file_name) != 0) { + ARROW_LOG(FATAL) << "unlink error"; + return -1; + } + if (ftruncate(fd, (off_t)size) != 0) { + ARROW_LOG(FATAL) << "ftruncate error"; + return -1; + } +#endif + return fd; +} + +void* fake_mmap(size_t size) { + /* Add sizeof(size_t) so that the returned pointer is deliberately not + * page-aligned. This ensures that the segments of memory returned by + * fake_mmap are never contiguous. */ + size += sizeof(size_t); + + int fd = create_buffer(size); + ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; + void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (pointer == MAP_FAILED) { return pointer; } + + /* Increase dlmalloc's allocation granularity directly. */ + mparams.granularity *= GRANULARITY_MULTIPLIER; + + mmap_record& record = mmap_records[pointer]; + record.fd = fd; + record.size = size; + + /* We lie to dlmalloc about where mapped memory actually lives. */ + pointer = pointer_advance(pointer, sizeof(size_t)); + ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; + return pointer; +} + +int fake_munmap(void* addr, int64_t size) { + ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")"; + addr = pointer_retreat(addr, sizeof(size_t)); + size += sizeof(size_t); + + auto entry = mmap_records.find(addr); + + if (entry == mmap_records.end() || entry->second.size != size) { + /* Reject requests to munmap that don't directly match previous + * calls to mmap, to prevent dlmalloc from trimming. */ + return -1; + } + + int r = munmap(addr, size); + if (r == 0) { close(entry->second.fd); } + + mmap_records.erase(entry); + return r; +} + +void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) { + /* TODO(rshin): Implement a more efficient search through mmap_records. */ + for (const auto& entry : mmap_records) { + if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) { + *fd = entry.second.fd; + *map_size = entry.second.size; + *offset = pointer_distance(entry.first, addr); + return; + } + } + *fd = -1; + *map_size = 0; + *offset = 0; +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/malloc.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h new file mode 100644 index 0000000..b4af2c8 --- /dev/null +++ b/cpp/src/plasma/malloc.h @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_MALLOC_H +#define PLASMA_MALLOC_H + +#include <inttypes.h> +#include <stddef.h> + +void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset); + +#endif // MALLOC_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/plasma.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc new file mode 100644 index 0000000..559d8e7 --- /dev/null +++ b/cpp/src/plasma/plasma.cc @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/plasma.h" + +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include "plasma/common.h" +#include "plasma/protocol.h" + +int warn_if_sigpipe(int status, int client_sock) { + if (status >= 0) { return 0; } + if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { + ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " + "sending a message to client on fd " + << client_sock << ". The client on the other end may " + "have hung up."; + return errno; + } + ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << "."; + return -1; // This is never reached. +} + +/** + * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes + * of this buffer are the length of the remaining message and the + * remaining message is a serialized version of the object info. + * + * @param object_info The object info to be serialized + * @return The object info buffer. It is the caller's responsibility to free + * this buffer with "delete" after it has been used. + */ +uint8_t* create_object_info_buffer(ObjectInfoT* object_info) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreateObjectInfo(fbb, object_info); + fbb.Finish(message); + uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()]; + *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize(); + memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize()); + return notification; +} + +ObjectTableEntry* get_object_table_entry( + PlasmaStoreInfo* store_info, const ObjectID& object_id) { + auto it = store_info->objects.find(object_id); + if (it == store_info->objects.end()) { return NULL; } + return it->second.get(); +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/plasma.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h new file mode 100644 index 0000000..275d0c7 --- /dev/null +++ b/cpp/src/plasma/plasma.h @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_PLASMA_H +#define PLASMA_PLASMA_H + +#include <errno.h> +#include <inttypes.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> // pid_t + +#include <unordered_map> +#include <unordered_set> + +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "format/common_generated.h" +#include "plasma/common.h" + +#define HANDLE_SIGPIPE(s, fd_) \ + do { \ + Status _s = (s); \ + if (!_s.ok()) { \ + if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \ + ARROW_LOG(WARNING) \ + << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \ + "sending a message to client on fd " \ + << fd_ << ". " \ + "The client on the other end may have hung up."; \ + } else { \ + return _s; \ + } \ + } \ + } while (0); + +/// Allocation granularity used in plasma for object allocation. +#define BLOCK_SIZE 64 + +/// Size of object hash digests. +constexpr int64_t kDigestSize = sizeof(uint64_t); + +struct Client; + +/// Object request data structure. Used in the plasma_wait_for_objects() +/// argument. +typedef struct { + /// The ID of the requested object. If ID_NIL request any object. + ObjectID object_id; + /// Request associated to the object. It can take one of the following values: + /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the + /// local Plasma Store. + /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in + /// the system (i.e., either in the local or a remote Plasma Store). + int type; + /// Object status. Same as the status returned by plasma_status() function + /// call. This is filled in by plasma_wait_for_objects1(): + /// - ObjectStatus_Local: object is ready at the local Plasma Store. + /// - ObjectStatus_Remote: object is ready at a remote Plasma Store. + /// - ObjectStatus_Nonexistent: object does not exist in the system. + /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled + /// for being transferred or it is transferring. + int status; +} ObjectRequest; + +/// Mapping from object IDs to type and status of the request. +typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap; + +/// Handle to access memory mapped file and map it into client address space. +typedef struct { + /// The file descriptor of the memory mapped file in the store. It is used as + /// a unique identifier of the file in the client to look up the corresponding + /// file descriptor on the client's side. + int store_fd; + /// The size in bytes of the memory mapped file. + int64_t mmap_size; +} object_handle; + +// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. +typedef struct { + /// Handle for memory mapped file the object is stored in. + object_handle handle; + /// The offset in bytes in the memory mapped file of the data. + ptrdiff_t data_offset; + /// The offset in bytes in the memory mapped file of the metadata. + ptrdiff_t metadata_offset; + /// The size in bytes of the data. + int64_t data_size; + /// The size in bytes of the metadata. + int64_t metadata_size; +} PlasmaObject; + +typedef enum { + /// Object was created but not sealed in the local Plasma Store. + PLASMA_CREATED = 1, + /// Object is sealed and stored in the local Plasma Store. + PLASMA_SEALED +} object_state; + +typedef enum { + /// The object was not found. + OBJECT_NOT_FOUND = 0, + /// The object was found. + OBJECT_FOUND = 1 +} object_status; + +typedef enum { + /// Query for object in the local plasma store. + PLASMA_QUERY_LOCAL = 1, + /// Query for object in the local plasma store or in a remote plasma store. + PLASMA_QUERY_ANYWHERE +} object_request_type; + +/// This type is used by the Plasma store. It is here because it is exposed to +/// the eviction policy. +struct ObjectTableEntry { + /// Object id of this object. + ObjectID object_id; + /// Object info like size, creation time and owner. + ObjectInfoT info; + /// Memory mapped file containing the object. + int fd; + /// Size of the underlying map. + int64_t map_size; + /// Offset from the base of the mmap. + ptrdiff_t offset; + /// Pointer to the object data. Needed to free the object. + uint8_t* pointer; + /// Set of clients currently using this object. + std::unordered_set<Client*> clients; + /// The state of the object, e.g., whether it is open or sealed. + object_state state; + /// The digest of the object. Used to see if two objects are the same. + unsigned char digest[kDigestSize]; +}; + +/// The plasma store information that is exposed to the eviction policy. +struct PlasmaStoreInfo { + /// Objects that are in the Plasma store. + std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>, UniqueIDHasher> objects; + /// The amount of memory (in bytes) that we allow to be allocated in the + /// store. + int64_t memory_capacity; +}; + +/// Get an entry from the object table and return NULL if the object_id +/// is not present. +/// +/// @param store_info The PlasmaStoreInfo that contains the object table. +/// @param object_id The object_id of the entry we are looking for. +/// @return The entry associated with the object_id or NULL if the object_id +/// is not present. +ObjectTableEntry* get_object_table_entry( + PlasmaStoreInfo* store_info, const ObjectID& object_id); + +/// Print a warning if the status is less than zero. This should be used to check +/// the success of messages sent to plasma clients. We print a warning instead of +/// failing because the plasma clients are allowed to die. This is used to handle +/// situations where the store writes to a client file descriptor, and the client +/// may already have disconnected. If we have processed the disconnection and +/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we +/// have not, then we should get a SIGPIPE. If we write to a TCP socket that +/// isn't connected yet, then we should get an ECONNRESET. +/// +/// @param status The status to check. If it is less less than zero, we will +/// print a warning. +/// @param client_sock The client socket. This is just used to print some extra +/// information. +/// @return The errno set. +int warn_if_sigpipe(int status, int client_sock); + +uint8_t* create_object_info_buffer(ObjectInfoT* object_info); + +#endif // PLASMA_PLASMA_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/protocol.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc new file mode 100644 index 0000000..246aa29 --- /dev/null +++ b/cpp/src/plasma/protocol.cc @@ -0,0 +1,502 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/protocol.h" + +#include "flatbuffers/flatbuffers.h" +#include "format/plasma_generated.h" + +#include "plasma/common.h" +#include "plasma/io.h" + +using flatbuffers::uoffset_t; + +flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> +to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids, + int64_t num_objects) { + std::vector<flatbuffers::Offset<flatbuffers::String>> results; + for (int64_t i = 0; i < num_objects; i++) { + results.push_back(fbb->CreateString(object_ids[i].binary())); + } + return fbb->CreateVector(results); +} + +Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer) { + int64_t type; + RETURN_NOT_OK(ReadMessage(sock, &type, buffer)); + ARROW_CHECK(type == message_type) << "type = " << type + << ", message_type = " << message_type; + return Status::OK(); +} + +template <typename Message> +Status PlasmaSend(int sock, int64_t message_type, flatbuffers::FlatBufferBuilder* fbb, + const Message& message) { + fbb->Finish(message); + return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer()); +} + +// Create messages. + +Status SendCreateRequest( + int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaCreateRequest( + fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size); + return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message); +} + +Status ReadCreateRequest( + uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data); + *data_size = message->data_size(); + *metadata_size = message->metadata_size(); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendCreateReply( + int sock, ObjectID object_id, PlasmaObject* object, int error_code) { + flatbuffers::FlatBufferBuilder fbb; + PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size, + object->data_offset, object->data_size, object->metadata_offset, + object->metadata_size); + auto message = CreatePlasmaCreateReply( + fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError)error_code); + return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); +} + +Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + object->handle.store_fd = message->plasma_object()->segment_index(); + object->handle.mmap_size = message->plasma_object()->mmap_size(); + object->data_offset = message->plasma_object()->data_offset(); + object->data_size = message->plasma_object()->data_size(); + object->metadata_offset = message->plasma_object()->metadata_offset(); + object->metadata_size = message->plasma_object()->metadata_size(); + return plasma_error_status(message->error()); +} + +// Seal messages. + +Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) { + flatbuffers::FlatBufferBuilder fbb; + auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize); + auto message = + CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), digest_string); + return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message); +} + +Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + ARROW_CHECK(message->digest()->size() == kDigestSize); + memcpy(digest, message->digest()->data(), kDigestSize); + return Status::OK(); +} + +Status SendSealReply(int sock, ObjectID object_id, int error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaSealReply( + fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message); +} + +Status ReadSealReply(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaSealReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return plasma_error_status(message->error()); +} + +// Release messages. + +Status SendReleaseRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message); +} + +Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendReleaseReply(int sock, ObjectID object_id, int error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaReleaseReply( + fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message); +} + +Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return plasma_error_status(message->error()); +} + +// Delete messages. + +Status SendDeleteRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message); +} + +Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendDeleteReply(int sock, ObjectID object_id, int error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaDeleteReply( + fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message); +} + +Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return plasma_error_status(message->error()); +} + +// Satus messages. + +Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects)); + return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message); +} + +Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data); + for (uoffset_t i = 0; i < num_objects; ++i) { + object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); + } + return Status::OK(); +} + +Status SendStatusReply( + int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), + fbb.CreateVector(object_status, num_objects)); + return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message); +} + +int64_t ReadStatusReply_num_objects(uint8_t* data) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data); + return message->object_ids()->size(); +} + +Status ReadStatusReply( + uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data); + for (uoffset_t i = 0; i < num_objects; ++i) { + object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); + } + for (uoffset_t i = 0; i < num_objects; ++i) { + object_status[i] = message->status()->data()[i]; + } + return Status::OK(); +} + +// Contains messages. + +Status SendContainsRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message); +} + +Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), has_object); + return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message); +} + +Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *has_object = message->has_object(); + return Status::OK(); +} + +// Connect messages. + +Status SendConnectRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaConnectRequest(fbb); + return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message); +} + +Status ReadConnectRequest(uint8_t* data) { + return Status::OK(); +} + +Status SendConnectReply(int sock, int64_t memory_capacity) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaConnectReply(fbb, memory_capacity); + return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message); +} + +Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data); + *memory_capacity = message->memory_capacity(); + return Status::OK(); +} + +// Evict messages. + +Status SendEvictRequest(int sock, int64_t num_bytes) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaEvictRequest(fbb, num_bytes); + return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message); +} + +Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data); + *num_bytes = message->num_bytes(); + return Status::OK(); +} + +Status SendEvictReply(int sock, int64_t num_bytes) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaEvictReply(fbb, num_bytes); + return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message); +} + +Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data); + num_bytes = message->num_bytes(); + return Status::OK(); +} + +// Get messages. + +Status SendGetRequest( + int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaGetRequest( + fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms); + return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message); +} + +Status ReadGetRequest( + uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data); + for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { + auto object_id = message->object_ids()->Get(i)->str(); + object_ids.push_back(ObjectID::from_binary(object_id)); + } + *timeout_ms = message->timeout_ms(); + return Status::OK(); +} + +Status SendGetReply(int sock, ObjectID object_ids[], + std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects, + int64_t num_objects) { + flatbuffers::FlatBufferBuilder fbb; + std::vector<PlasmaObjectSpec> objects; + + for (int i = 0; i < num_objects; ++i) { + const PlasmaObject& object = plasma_objects[object_ids[i]]; + objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size, + object.data_offset, object.data_size, object.metadata_offset, + object.metadata_size)); + } + auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), + fbb.CreateVectorOfStructs(objects.data(), num_objects)); + return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message); +} + +Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], + int64_t num_objects) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaGetReply>(data); + for (uoffset_t i = 0; i < num_objects; ++i) { + object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); + } + for (uoffset_t i = 0; i < num_objects; ++i) { + const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); + plasma_objects[i].handle.store_fd = object->segment_index(); + plasma_objects[i].handle.mmap_size = object->mmap_size(); + plasma_objects[i].data_offset = object->data_offset(); + plasma_objects[i].data_size = object->data_size(); + plasma_objects[i].metadata_offset = object->metadata_offset(); + plasma_objects[i].metadata_size = object->metadata_size(); + } + return Status::OK(); +} + +// Fetch messages. + +Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects)); + return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message); +} + +Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data); + for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { + object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str())); + } + return Status::OK(); +} + +// Wait messages. + +Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, + int num_ready_objects, int64_t timeout_ms) { + flatbuffers::FlatBufferBuilder fbb; + + std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs; + for (int i = 0; i < num_requests; i++) { + object_request_specs.push_back(CreateObjectRequestSpec(fbb, + fbb.CreateString(object_requests[i].object_id.binary()), + object_requests[i].type)); + } + + auto message = CreatePlasmaWaitRequest( + fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms); + return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message); +} + +Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, + int64_t* timeout_ms, int* num_ready_objects) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data); + *num_ready_objects = message->num_ready_objects(); + *timeout_ms = message->timeout(); + + for (uoffset_t i = 0; i < message->object_requests()->size(); i++) { + ObjectID object_id = + ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str()); + ObjectRequest object_request({object_id, message->object_requests()->Get(i)->type(), + ObjectStatus_Nonexistent}); + object_requests[object_id] = object_request; + } + return Status::OK(); +} + +Status SendWaitReply( + int sock, const ObjectRequestMap& object_requests, int num_ready_objects) { + flatbuffers::FlatBufferBuilder fbb; + + std::vector<flatbuffers::Offset<ObjectReply>> object_replies; + for (const auto& entry : object_requests) { + const auto& object_request = entry.second; + object_replies.push_back(CreateObjectReply( + fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status)); + } + + auto message = CreatePlasmaWaitReply( + fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects); + return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message); +} + +Status ReadWaitReply( + uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) { + DCHECK(data); + + auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data); + *num_ready_objects = message->num_ready_objects(); + for (int i = 0; i < *num_ready_objects; i++) { + object_requests[i].object_id = + ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str()); + object_requests[i].status = message->object_requests()->Get(i)->status(); + } + return Status::OK(); +} + +// Subscribe messages. + +Status SendSubscribeRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaSubscribeRequest(fbb); + return PlasmaSend(sock, MessageType_PlasmaSubscribeRequest, &fbb, message); +} + +// Data messages. + +Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) { + flatbuffers::FlatBufferBuilder fbb; + auto addr = fbb.CreateString(address, strlen(address)); + auto message = + CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port); + return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message); +} + +Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data); + DCHECK(message->object_id()->size() == sizeof(ObjectID)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *address = strdup(message->address()->c_str()); + *port = message->port(); + return Status::OK(); +} + +Status SendDataReply( + int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreatePlasmaDataReply( + fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size); + return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message); +} + +Status ReadDataReply( + uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<PlasmaDataReply>(data); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_size = (int64_t)message->object_size(); + *metadata_size = (int64_t)message->metadata_size(); + return Status::OK(); +} http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/protocol.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h new file mode 100644 index 0000000..5d9d136 --- /dev/null +++ b/cpp/src/plasma/protocol.h @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_PROTOCOL_H +#define PLASMA_PROTOCOL_H + +#include <vector> + +#include "arrow/status.h" +#include "format/plasma_generated.h" +#include "plasma/plasma.h" + +using arrow::Status; + +/* Plasma receive message. */ + +Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer); + +/* Plasma Create message functions. */ + +Status SendCreateRequest( + int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size); + +Status ReadCreateRequest( + uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + +Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); + +Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object); + +/* Plasma Seal message functions. */ + +Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest); + +Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest); + +Status SendSealReply(int sock, ObjectID object_id, int error); + +Status ReadSealReply(uint8_t* data, ObjectID* object_id); + +/* Plasma Get message functions. */ + +Status SendGetRequest( + int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms); + +Status ReadGetRequest( + uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms); + +Status SendGetReply(int sock, ObjectID object_ids[], + std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects, + int64_t num_objects); + +Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], + int64_t num_objects); + +/* Plasma Release message functions. */ + +Status SendReleaseRequest(int sock, ObjectID object_id); + +Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id); + +Status SendReleaseReply(int sock, ObjectID object_id, int error); + +Status ReadReleaseReply(uint8_t* data, ObjectID* object_id); + +/* Plasma Delete message functions. */ + +Status SendDeleteRequest(int sock, ObjectID object_id); + +Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id); + +Status SendDeleteReply(int sock, ObjectID object_id, int error); + +Status ReadDeleteReply(uint8_t* data, ObjectID* object_id); + +/* Satus messages. */ + +Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects); + +Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects); + +Status SendStatusReply( + int sock, ObjectID object_ids[], int object_status[], int64_t num_objects); + +int64_t ReadStatusReply_num_objects(uint8_t* data); + +Status ReadStatusReply( + uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects); + +/* Plasma Constains message functions. */ + +Status SendContainsRequest(int sock, ObjectID object_id); + +Status ReadContainsRequest(uint8_t* data, ObjectID* object_id); + +Status SendContainsReply(int sock, ObjectID object_id, bool has_object); + +Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object); + +/* Plasma Connect message functions. */ + +Status SendConnectRequest(int sock); + +Status ReadConnectRequest(uint8_t* data); + +Status SendConnectReply(int sock, int64_t memory_capacity); + +Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity); + +/* Plasma Evict message functions (no reply so far). */ + +Status SendEvictRequest(int sock, int64_t num_bytes); + +Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes); + +Status SendEvictReply(int sock, int64_t num_bytes); + +Status ReadEvictReply(uint8_t* data, int64_t& num_bytes); + +/* Plasma Fetch Remote message functions. */ + +Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects); + +Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids); + +/* Plasma Wait message functions. */ + +Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, + int num_ready_objects, int64_t timeout_ms); + +Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, + int64_t* timeout_ms, int* num_ready_objects); + +Status SendWaitReply( + int sock, const ObjectRequestMap& object_requests, int num_ready_objects); + +Status ReadWaitReply( + uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects); + +/* Plasma Subscribe message functions. */ + +Status SendSubscribeRequest(int sock); + +/* Data messages. */ + +Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port); + +Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port); + +Status SendDataReply( + int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size); + +Status ReadDataReply( + uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size); + +#endif /* PLASMA_PROTOCOL */