Repository: arrow
Updated Branches:
  refs/heads/master 8aab00ee1 -> 3095f2cb7


ARROW-444: [Python] Native file reads into pre-allocated memory. Some IO API 
cleanup / niceness

This yields slightly better performance and less memory use. Also deleted some 
duplicated code

Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #257 from wesm/ARROW-444 and squashes the following commits:

30e480d [Wes McKinney] Rename PyBytes_Empty to something more mundane
9db0d81 [Wes McKinney] Native file reads into pre-allocated memory. Deprecated 
HdfsClient.connect API. Promote pyarrow.io classes into pyarrow namespace


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3095f2cb
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3095f2cb
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3095f2cb

Branch: refs/heads/master
Commit: 3095f2cb7bc19954d0dfba02486b7ec48d8fef0f
Parents: 8aab00e
Author: Wes McKinney <wes.mckin...@twosigma.com>
Authored: Wed Dec 28 23:05:50 2016 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Wed Dec 28 23:05:50 2016 +0100

----------------------------------------------------------------------
 python/pyarrow/__init__.py        |   4 ++
 python/pyarrow/io.pyx             | 109 +++++++++++++--------------------
 python/pyarrow/tests/test_hdfs.py |   2 +-
 python/pyarrow/tests/test_io.py   |   4 +-
 4 files changed, 49 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 6f81ef4..02b2b06 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -37,6 +37,10 @@ from pyarrow.array import (Array,
 
 from pyarrow.error import ArrowException
 
+from pyarrow.io import (HdfsClient, HdfsFile, NativeFile, PythonFileInterface,
+                        BytesReader, Buffer, InMemoryOutputStream,
+                        BufferReader)
+
 from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
                             BooleanValue,
                             Int8Value, Int16Value, Int32Value, Int64Value,

http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 8491aa8..cab6ccb 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -37,6 +37,10 @@ import sys
 import threading
 import time
 
+# To let us get a PyObject* and avoid Cython auto-ref-counting
+cdef extern from "Python.h":
+    PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
+        char *v, Py_ssize_t len) except NULL
 
 cdef class NativeFile:
 
@@ -119,21 +123,24 @@ cdef class NativeFile:
         with nogil:
             check_status(self.wr_file.get().Write(buf, bufsize))
 
-    def read(self, int nbytes):
+    def read(self, int64_t nbytes):
         cdef:
             int64_t bytes_read = 0
-            uint8_t* buf
-            shared_ptr[CBuffer] out
+            PyObject* obj
 
         self._assert_readable()
 
+        # Allocate empty write space
+        obj = PyBytes_FromStringAndSizeNative(NULL, nbytes)
+
+        cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
         with nogil:
-            check_status(self.rd_file.get().ReadB(nbytes, &out))
+            check_status(self.rd_file.get().Read(nbytes, &bytes_read, buf))
 
-        result = cp.PyBytes_FromStringAndSize(
-            <const char*>out.get().data(), out.get().size())
+        if bytes_read < nbytes:
+            cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
 
-        return result
+        return PyObject_to_object(obj)
 
 
 # ----------------------------------------------------------------------
@@ -339,31 +346,8 @@ cdef class HdfsClient:
     cdef readonly:
         bint is_open
 
-    def __cinit__(self):
-        self.is_open = False
-
-    def __dealloc__(self):
-        if self.is_open:
-            self.close()
-
-    def close(self):
-        """
-        Disconnect from the HDFS cluster
-        """
-        self._ensure_client()
-        with nogil:
-            check_status(self.client.get().Disconnect())
-        self.is_open = False
-
-    cdef _ensure_client(self):
-        if self.client.get() == NULL:
-            raise IOError('HDFS client improperly initialized')
-        elif not self.is_open:
-            raise IOError('HDFS client is closed')
-
-    @classmethod
-    def connect(cls, host="default", port=0, user=None, kerb_ticket=None,
-                driver='libhdfs'):
+    def __cinit__(self, host="default", port=0, user=None, kerb_ticket=None,
+                  driver='libhdfs'):
         """
         Connect to an HDFS cluster. All parameters are optional and should
         only be set if the defaults need to be overridden.
@@ -391,9 +375,7 @@ cdef class HdfsClient:
         -------
         client : HDFSClient
         """
-        cdef:
-            HdfsClient out = HdfsClient()
-            HdfsConnectionConfig conf
+        cdef HdfsConnectionConfig conf
 
         if host is not None:
             conf.host = tobytes(host)
@@ -411,10 +393,31 @@ cdef class HdfsClient:
             conf.driver = HdfsDriver_LIBHDFS3
 
         with nogil:
-            check_status(CHdfsClient.Connect(&conf, &out.client))
-        out.is_open = True
+            check_status(CHdfsClient.Connect(&conf, &self.client))
+        self.is_open = True
 
-        return out
+    @classmethod
+    def connect(cls, *args, **kwargs):
+        return cls(*args, **kwargs)
+
+    def __dealloc__(self):
+        if self.is_open:
+            self.close()
+
+    def close(self):
+        """
+        Disconnect from the HDFS cluster
+        """
+        self._ensure_client()
+        with nogil:
+            check_status(self.client.get().Disconnect())
+        self.is_open = False
+
+    cdef _ensure_client(self):
+        if self.client.get() == NULL:
+            raise IOError('HDFS client improperly initialized')
+        elif not self.is_open:
+            raise IOError('HDFS client is closed')
 
     def exists(self, path):
         """
@@ -657,36 +660,6 @@ cdef class HdfsFile(NativeFile):
     def __dealloc__(self):
         self.parent = None
 
-    def read(self, int nbytes):
-        """
-        Read indicated number of bytes from the file, up to EOF
-        """
-        cdef:
-            int64_t bytes_read = 0
-            uint8_t* buf
-
-        self._assert_readable()
-
-        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
-        # the passed buffer, so it's hard for us to avoid doubling the memory
-        buf = <uint8_t*> malloc(nbytes)
-        if buf == NULL:
-            raise MemoryError("Failed to allocate {0} bytes".format(nbytes))
-
-        cdef int64_t total_bytes = 0
-
-        try:
-            with nogil:
-                check_status(self.rd_file.get()
-                             .Read(nbytes, &bytes_read, buf))
-
-            result = cp.PyBytes_FromStringAndSize(<const char*>buf,
-                                                  bytes_read)
-        finally:
-            free(buf)
-
-        return result
-
     def download(self, stream_or_path):
         """
         Read file completely to local path (rather than reading completely into

http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py 
b/python/pyarrow/tests/test_hdfs.py
index 73d5a66..4ff5a9d 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -38,7 +38,7 @@ def hdfs_test_client(driver='libhdfs'):
         raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                          'an integer')
 
-    return io.HdfsClient.connect(host, port, user, driver=driver)
+    return io.HdfsClient(host, port, user, driver=driver)
 
 
 class HdfsTestCases(object):

http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 211a12b..c10ed03 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -67,7 +67,9 @@ def test_python_file_read():
     f.seek(5)
     assert f.tell() == 5
 
-    assert f.read(50) == b'sample data'
+    v = f.read(50)
+    assert v == b'sample data'
+    assert len(v) == 11
 
     f.close()
 

Reply via email to