This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0689a58 ARROW-3504: [Plasma] Add support for Plasma Client to put/get
raw bytes without pyarrow serialization.
0689a58 is described below
commit 0689a58af818ef5f9a19d9f4458e360c564d5816
Author: Yuhong Guo <[email protected]>
AuthorDate: Mon Oct 15 16:50:31 2018 -0400
ARROW-3504: [Plasma] Add support for Plasma Client to put/get raw bytes
without pyarrow serialization.
This is a feature enables Java Client to read data that python client puts
(cross-language read/write).
Author: Yuhong Guo <[email protected]>
Closes #2752 from guoyuhong/plasmaBytes and squashes the following commits:
b28748be5 <Yuhong Guo> Fix CI and change function name
df3b1ed5c <Yuhong Guo> Change put_raw_bytes to accept more general argument
f8bc1a18e <Yuhong Guo> Fix lint
742d71759 <Yuhong Guo> Add test
31c823871 <Yuhong Guo> enable raw_bytes read/write
---
python/pyarrow/_plasma.pyx | 65 +++++++++++++++++++++++++++++++++++++
python/pyarrow/tests/test_plasma.py | 15 +++++++++
2 files changed, 80 insertions(+)
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 783fbcb..2b9f93e 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -408,6 +408,71 @@ cdef class PlasmaClient:
result.append(None)
return result
+ def put_buffer(self, object value, ObjectID object_id=None,
+ int memcopy_threads=6):
+ """
+ Store Python buffer into the object store.
+
+ Parameters
+ ----------
+ value : Python object that implements the buffer protocol
+ A Python buffer object to store.
+ object_id : ObjectID, default None
+ If this is provided, the specified object ID will be used to refer
+ to the object.
+ memcopy_threads : int, default 6
+ The number of threads to use to write the serialized object into
+ the object store for large objects.
+
+ Returns
+ -------
+ The object ID associated to the Python buffer object.
+ """
+ cdef ObjectID target_id = (object_id if object_id
+ else ObjectID.from_random())
+ cdef Buffer arrow_buffer = pyarrow.py_buffer(value)
+ write_buffer = self.create(target_id, len(value))
+ stream = pyarrow.FixedSizeBufferWriter(write_buffer)
+ stream.set_memcopy_threads(memcopy_threads)
+ stream.write(arrow_buffer)
+ self.seal(target_id)
+ return target_id
+
+ def get_buffer(self, object_ids, int timeout_ms=-1):
+ """
+ Get one or more Python buffers from the object store.
+
+ Parameters
+ ----------
+ object_ids : list or ObjectID
+ Object ID or list of object IDs associated to the values we get
+ from the store.
+ timeout_ms : int, default -1
+ The number of milliseconds that the get call should block before
+ timing out and returning. Pass -1 if the call should block and 0
+ if the call should return immediately.
+
+ Returns
+ -------
+ list of buffer objects or a buffer object
+ Python buffer or list of Python buffer for the data
+ associated with the object_ids and ObjectNotAvailable if the
+ object was not available.
+ """
+ cdef c_vector[CObjectBuffer] object_buffers
+ if isinstance(object_ids, collections.Sequence):
+ results = []
+ self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
+ for i in range(object_buffers.size()):
+ if object_buffers[i].data.get() != nullptr:
+ size = object_buffers[i].data.get().size()
+ results.append(object_buffers[i].data.get().data()[:size])
+ else:
+ results.append(None)
+ return results
+ else:
+ return self.get_buffer([object_ids], timeout_ms)[0]
+
def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
serialization_context=None):
"""
diff --git a/python/pyarrow/tests/test_plasma.py
b/python/pyarrow/tests/test_plasma.py
index 834b980..efda2af 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -291,6 +291,21 @@ class TestPlasmaClient(object):
[result] = self.plasma_client.get([object_id], timeout_ms=0)
assert result == pa.plasma.ObjectNotAvailable
+ def test_put_and_get_buffer_without_serialization(self):
+ temp_id = random_object_id()
+ for value in [b"This is a bytes obj", temp_id.binary(), 10 * b"\x00"]:
+ object_id = self.plasma_client.put_buffer(value)
+ [result] = self.plasma_client.get_buffer([object_id])
+ assert result == value
+
+ result = self.plasma_client.get_buffer(object_id)
+ assert result == value
+
+ object_id = random_object_id()
+ [result] = self.plasma_client.get_buffer(
+ [object_id], timeout_ms=0)
+ assert result is None
+
def test_put_and_get_serialization_context(self):
class CustomType(object):