This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0689a58  ARROW-3504: [Plasma] Add support for Plasma Client to put/get 
raw bytes without pyarrow serialization.
0689a58 is described below

commit 0689a58af818ef5f9a19d9f4458e360c564d5816
Author: Yuhong Guo <[email protected]>
AuthorDate: Mon Oct 15 16:50:31 2018 -0400

    ARROW-3504: [Plasma] Add support for Plasma Client to put/get raw bytes 
without pyarrow serialization.
    
    This is a feature enables Java Client to read data that python client puts 
(cross-language read/write).
    
    Author: Yuhong Guo <[email protected]>
    
    Closes #2752 from guoyuhong/plasmaBytes and squashes the following commits:
    
    b28748be5 <Yuhong Guo> Fix CI and change function name
    df3b1ed5c <Yuhong Guo> Change put_raw_bytes to accept more general argument
    f8bc1a18e <Yuhong Guo> Fix lint
    742d71759 <Yuhong Guo> Add test
    31c823871 <Yuhong Guo> enable raw_bytes read/write
---
 python/pyarrow/_plasma.pyx          | 65 +++++++++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_plasma.py | 15 +++++++++
 2 files changed, 80 insertions(+)

diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 783fbcb..2b9f93e 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -408,6 +408,71 @@ cdef class PlasmaClient:
                 result.append(None)
         return result
 
+    def put_buffer(self, object value, ObjectID object_id=None,
+                   int memcopy_threads=6):
+        """
+        Store Python buffer into the object store.
+
+        Parameters
+        ----------
+        value : Python object that implements the buffer protocol
+            A Python buffer object to store.
+        object_id : ObjectID, default None
+            If this is provided, the specified object ID will be used to refer
+            to the object.
+        memcopy_threads : int, default 6
+            The number of threads to use to write the serialized object into
+            the object store for large objects.
+
+        Returns
+        -------
+        The object ID associated to the Python buffer object.
+        """
+        cdef ObjectID target_id = (object_id if object_id
+                                   else ObjectID.from_random())
+        cdef Buffer arrow_buffer = pyarrow.py_buffer(value)
+        write_buffer = self.create(target_id, len(value))
+        stream = pyarrow.FixedSizeBufferWriter(write_buffer)
+        stream.set_memcopy_threads(memcopy_threads)
+        stream.write(arrow_buffer)
+        self.seal(target_id)
+        return target_id
+
+    def get_buffer(self, object_ids, int timeout_ms=-1):
+        """
+        Get one or more Python buffers from the object store.
+
+        Parameters
+        ----------
+        object_ids : list or ObjectID
+            Object ID or list of object IDs associated to the values we get
+            from the store.
+        timeout_ms : int, default -1
+            The number of milliseconds that the get call should block before
+            timing out and returning. Pass -1 if the call should block and 0
+            if the call should return immediately.
+
+        Returns
+        -------
+        list of buffer objects or a buffer object
+            Python buffer or list of Python buffer for the data
+            associated with the object_ids and ObjectNotAvailable if the
+            object was not available.
+        """
+        cdef c_vector[CObjectBuffer] object_buffers
+        if isinstance(object_ids, collections.Sequence):
+            results = []
+            self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
+            for i in range(object_buffers.size()):
+                if object_buffers[i].data.get() != nullptr:
+                    size = object_buffers[i].data.get().size()
+                    results.append(object_buffers[i].data.get().data()[:size])
+                else:
+                    results.append(None)
+            return results
+        else:
+            return self.get_buffer([object_ids], timeout_ms)[0]
+
     def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
             serialization_context=None):
         """
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 834b980..efda2af 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -291,6 +291,21 @@ class TestPlasmaClient(object):
             [result] = self.plasma_client.get([object_id], timeout_ms=0)
             assert result == pa.plasma.ObjectNotAvailable
 
+    def test_put_and_get_buffer_without_serialization(self):
+        temp_id = random_object_id()
+        for value in [b"This is a bytes obj", temp_id.binary(), 10 * b"\x00"]:
+            object_id = self.plasma_client.put_buffer(value)
+            [result] = self.plasma_client.get_buffer([object_id])
+            assert result == value
+
+            result = self.plasma_client.get_buffer(object_id)
+            assert result == value
+
+            object_id = random_object_id()
+            [result] = self.plasma_client.get_buffer(
+                [object_id], timeout_ms=0)
+            assert result is None
+
     def test_put_and_get_serialization_context(self):
 
         class CustomType(object):

Reply via email to