Repository: arrow Updated Branches: refs/heads/master 8aab00ee1 -> 3095f2cb7
ARROW-444: [Python] Native file reads into pre-allocated memory. Some IO API cleanup / niceness This yields slightly better performance and less memory use. Also deleted some duplicated code Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #257 from wesm/ARROW-444 and squashes the following commits: 30e480d [Wes McKinney] Rename PyBytes_Empty to something more mundane 9db0d81 [Wes McKinney] Native file reads into pre-allocated memory. Deprecated HdfsClient.connect API. Promote pyarrow.io classes into pyarrow namespace Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3095f2cb Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3095f2cb Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3095f2cb Branch: refs/heads/master Commit: 3095f2cb7bc19954d0dfba02486b7ec48d8fef0f Parents: 8aab00e Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Wed Dec 28 23:05:50 2016 +0100 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Wed Dec 28 23:05:50 2016 +0100 ---------------------------------------------------------------------- python/pyarrow/__init__.py | 4 ++ python/pyarrow/io.pyx | 109 +++++++++++++-------------------- python/pyarrow/tests/test_hdfs.py | 2 +- python/pyarrow/tests/test_io.py | 4 +- 4 files changed, 49 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 6f81ef4..02b2b06 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -37,6 +37,10 @@ from pyarrow.array import (Array, from pyarrow.error import ArrowException +from pyarrow.io import (HdfsClient, HdfsFile, NativeFile, PythonFileInterface, + BytesReader, Buffer, InMemoryOutputStream, + BufferReader) + from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType, BooleanValue, Int8Value, Int16Value, Int32Value, Int64Value, http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 8491aa8..cab6ccb 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -37,6 +37,10 @@ import sys import threading import time +# To let us get a PyObject* and avoid Cython auto-ref-counting +cdef extern from "Python.h": + PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( + char *v, Py_ssize_t len) except NULL cdef class NativeFile: @@ -119,21 +123,24 @@ cdef class NativeFile: with nogil: check_status(self.wr_file.get().Write(buf, bufsize)) - def read(self, int nbytes): + def read(self, int64_t nbytes): cdef: int64_t bytes_read = 0 - uint8_t* buf - shared_ptr[CBuffer] out + PyObject* obj self._assert_readable() + # Allocate empty write space + obj = PyBytes_FromStringAndSizeNative(NULL, nbytes) + + cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) with nogil: - check_status(self.rd_file.get().ReadB(nbytes, &out)) + check_status(self.rd_file.get().Read(nbytes, &bytes_read, buf)) - result = cp.PyBytes_FromStringAndSize( - <const char*>out.get().data(), out.get().size()) + if bytes_read < nbytes: + cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) - return result + return PyObject_to_object(obj) # ---------------------------------------------------------------------- @@ -339,31 +346,8 @@ cdef class HdfsClient: cdef readonly: bint is_open - def __cinit__(self): - self.is_open = False - - def __dealloc__(self): - if self.is_open: - self.close() - - def close(self): - """ - Disconnect from the HDFS cluster - """ - self._ensure_client() - with nogil: - check_status(self.client.get().Disconnect()) - self.is_open = False - - cdef _ensure_client(self): - if self.client.get() == NULL: - raise IOError('HDFS client improperly initialized') - elif not self.is_open: - raise IOError('HDFS client is closed') - - @classmethod - def connect(cls, host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs'): + def __cinit__(self, host="default", port=0, user=None, kerb_ticket=None, + driver='libhdfs'): """ Connect to an HDFS cluster. All parameters are optional and should only be set if the defaults need to be overridden. @@ -391,9 +375,7 @@ cdef class HdfsClient: ------- client : HDFSClient """ - cdef: - HdfsClient out = HdfsClient() - HdfsConnectionConfig conf + cdef HdfsConnectionConfig conf if host is not None: conf.host = tobytes(host) @@ -411,10 +393,31 @@ cdef class HdfsClient: conf.driver = HdfsDriver_LIBHDFS3 with nogil: - check_status(CHdfsClient.Connect(&conf, &out.client)) - out.is_open = True + check_status(CHdfsClient.Connect(&conf, &self.client)) + self.is_open = True - return out + @classmethod + def connect(cls, *args, **kwargs): + return cls(*args, **kwargs) + + def __dealloc__(self): + if self.is_open: + self.close() + + def close(self): + """ + Disconnect from the HDFS cluster + """ + self._ensure_client() + with nogil: + check_status(self.client.get().Disconnect()) + self.is_open = False + + cdef _ensure_client(self): + if self.client.get() == NULL: + raise IOError('HDFS client improperly initialized') + elif not self.is_open: + raise IOError('HDFS client is closed') def exists(self, path): """ @@ -657,36 +660,6 @@ cdef class HdfsFile(NativeFile): def __dealloc__(self): self.parent = None - def read(self, int nbytes): - """ - Read indicated number of bytes from the file, up to EOF - """ - cdef: - int64_t bytes_read = 0 - uint8_t* buf - - self._assert_readable() - - # This isn't ideal -- PyBytes_FromStringAndSize copies the data from - # the passed buffer, so it's hard for us to avoid doubling the memory - buf = <uint8_t*> malloc(nbytes) - if buf == NULL: - raise MemoryError("Failed to allocate {0} bytes".format(nbytes)) - - cdef int64_t total_bytes = 0 - - try: - with nogil: - check_status(self.rd_file.get() - .Read(nbytes, &bytes_read, buf)) - - result = cp.PyBytes_FromStringAndSize(<const char*>buf, - bytes_read) - finally: - free(buf) - - return result - def download(self, stream_or_path): """ Read file completely to local path (rather than reading completely into http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index 73d5a66..4ff5a9d 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -38,7 +38,7 @@ def hdfs_test_client(driver='libhdfs'): raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' 'an integer') - return io.HdfsClient.connect(host, port, user, driver=driver) + return io.HdfsClient(host, port, user, driver=driver) class HdfsTestCases(object): http://git-wip-us.apache.org/repos/asf/arrow/blob/3095f2cb/python/pyarrow/tests/test_io.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 211a12b..c10ed03 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -67,7 +67,9 @@ def test_python_file_read(): f.seek(5) assert f.tell() == 5 - assert f.read(50) == b'sample data' + v = f.read(50) + assert v == b'sample data' + assert len(v) == 11 f.close()