http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx deleted file mode 100644 index 1c4253e..0000000 --- a/python/pyarrow/array.pyx +++ /dev/null @@ -1,646 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -import numpy as np - -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.common cimport PyObject_to_object -cimport pyarrow.includes.pyarrow as pyarrow - -import pyarrow.config - -from pyarrow.compat import frombytes, tobytes, PandasSeries, Categorical -from pyarrow.error cimport check_status -from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool - -cimport pyarrow.scalar as scalar -from pyarrow.scalar import NA - -from pyarrow.schema cimport (DataType, Field, Schema, DictionaryType, - FixedSizeBinaryType, - box_data_type) -import pyarrow.schema as schema - -cimport cpython - - -cdef maybe_coerce_datetime64(values, dtype, DataType type, - timestamps_to_ms=False): - - from pyarrow.compat import DatetimeTZDtype - - if values.dtype.type != np.datetime64: - return values, type - - coerce_ms = timestamps_to_ms and values.dtype != 'datetime64[ms]' - - if coerce_ms: - values = values.astype('datetime64[ms]') - - if isinstance(dtype, DatetimeTZDtype): - tz = dtype.tz - unit = 'ms' if coerce_ms else dtype.unit - type = schema.timestamp(unit, tz) - elif type is None: - # Trust the NumPy dtype - type = schema.type_from_numpy_dtype(values.dtype) - - return values, type - - -cdef class Array: - - cdef init(self, const shared_ptr[CArray]& sp_array): - self.sp_array = sp_array - self.ap = sp_array.get() - self.type = box_data_type(self.sp_array.get().type()) - - @staticmethod - def from_numpy(obj, mask=None, DataType type=None, - timestamps_to_ms=False, - MemoryPool memory_pool=None): - """ - Convert pandas.Series to an Arrow Array. - - Parameters - ---------- - series : pandas.Series or numpy.ndarray - - mask : pandas.Series or numpy.ndarray, optional - boolean mask if the object is valid or null - - type : pyarrow.DataType - Explicit type to attempt to coerce to - - timestamps_to_ms : bool, optional - Convert datetime columns to ms resolution. This is needed for - compatibility with other functionality like Parquet I/O which - only supports milliseconds. - - memory_pool: MemoryPool, optional - Specific memory pool to use to allocate the resulting Arrow array. - - Notes - ----- - Localized timestamps will currently be returned as UTC (pandas's native - representation). Timezone-naive data will be implicitly interpreted as - UTC. - - Examples - -------- - - >>> import pandas as pd - >>> import pyarrow as pa - >>> pa.Array.from_numpy(pd.Series([1, 2])) - <pyarrow.array.Int64Array object at 0x7f674e4c0e10> - [ - 1, - 2 - ] - - >>> import numpy as np - >>> pa.Array.from_numpy(pd.Series([1, 2]), np.array([0, 1], - ... dtype=bool)) - <pyarrow.array.Int64Array object at 0x7f9019e11208> - [ - 1, - NA - ] - - Returns - ------- - pyarrow.array.Array - """ - cdef: - shared_ptr[CArray] out - shared_ptr[CDataType] c_type - CMemoryPool* pool - - if mask is not None: - mask = get_series_values(mask) - - values = get_series_values(obj) - pool = maybe_unbox_memory_pool(memory_pool) - - if isinstance(values, Categorical): - return DictionaryArray.from_arrays( - values.codes, values.categories.values, - mask=mask, memory_pool=memory_pool) - elif values.dtype == object: - # Object dtype undergoes a different conversion path as more type - # inference may be needed - if type is not None: - c_type = type.sp_type - with nogil: - check_status(pyarrow.PandasObjectsToArrow( - pool, values, mask, c_type, &out)) - else: - values, type = maybe_coerce_datetime64( - values, obj.dtype, type, timestamps_to_ms=timestamps_to_ms) - - if type is None: - check_status(pyarrow.NumPyDtypeToArrow(values.dtype, &c_type)) - else: - c_type = type.sp_type - - with nogil: - check_status(pyarrow.PandasToArrow( - pool, values, mask, c_type, &out)) - - return box_array(out) - - @staticmethod - def from_list(object list_obj, DataType type=None, - MemoryPool memory_pool=None): - """ - Convert Python list to Arrow array - - Parameters - ---------- - list_obj : array_like - - Returns - ------- - pyarrow.array.Array - """ - cdef: - shared_ptr[CArray] sp_array - CMemoryPool* pool - - pool = maybe_unbox_memory_pool(memory_pool) - if type is None: - check_status(pyarrow.ConvertPySequence(list_obj, pool, &sp_array)) - else: - check_status( - pyarrow.ConvertPySequence( - list_obj, pool, &sp_array, type.sp_type - ) - ) - - return box_array(sp_array) - - property null_count: - - def __get__(self): - return self.sp_array.get().null_count() - - def __iter__(self): - for i in range(len(self)): - yield self.getitem(i) - raise StopIteration - - def __repr__(self): - from pyarrow.formatting import array_format - type_format = object.__repr__(self) - values = array_format(self, window=10) - return '{0}\n{1}'.format(type_format, values) - - def equals(Array self, Array other): - return self.ap.Equals(deref(other.ap)) - - def __len__(self): - if self.sp_array.get(): - return self.sp_array.get().length() - else: - return 0 - - def isnull(self): - raise NotImplemented - - def __getitem__(self, key): - cdef: - Py_ssize_t n = len(self) - - if PySlice_Check(key): - start = key.start or 0 - while start < 0: - start += n - - stop = key.stop if key.stop is not None else n - while stop < 0: - stop += n - - step = key.step or 1 - if step != 1: - raise IndexError('only slices with step 1 supported') - else: - return self.slice(start, stop - start) - - while key < 0: - key += len(self) - - return self.getitem(key) - - cdef getitem(self, int64_t i): - return scalar.box_scalar(self.type, self.sp_array, i) - - def slice(self, offset=0, length=None): - """ - Compute zero-copy slice of this array - - Parameters - ---------- - offset : int, default 0 - Offset from start of array to slice - length : int, default None - Length of slice (default is until end of Array starting from - offset) - - Returns - ------- - sliced : RecordBatch - """ - cdef: - shared_ptr[CArray] result - - if offset < 0: - raise IndexError('Offset must be non-negative') - - if length is None: - result = self.ap.Slice(offset) - else: - result = self.ap.Slice(offset, length) - - return box_array(result) - - def to_pandas(self): - """ - Convert to an array object suitable for use in pandas - - See also - -------- - Column.to_pandas - Table.to_pandas - RecordBatch.to_pandas - """ - cdef: - PyObject* out - - with nogil: - check_status( - pyarrow.ConvertArrayToPandas(self.sp_array, <PyObject*> self, - &out)) - return wrap_array_output(out) - - def to_pylist(self): - """ - Convert to an list of native Python objects. - """ - return [x.as_py() for x in self] - - -cdef class Tensor: - - cdef init(self, const shared_ptr[CTensor]& sp_tensor): - self.sp_tensor = sp_tensor - self.tp = sp_tensor.get() - self.type = box_data_type(self.tp.type()) - - def __repr__(self): - return """<pyarrow.Tensor> -type: {0} -shape: {1} -strides: {2}""".format(self.type, self.shape, self.strides) - - @staticmethod - def from_numpy(obj): - cdef shared_ptr[CTensor] ctensor - check_status(pyarrow.NdarrayToTensor(default_memory_pool(), - obj, &ctensor)) - return box_tensor(ctensor) - - def to_numpy(self): - """ - Convert arrow::Tensor to numpy.ndarray with zero copy - """ - cdef: - PyObject* out - - check_status(pyarrow.TensorToNdarray(deref(self.tp), <PyObject*> self, - &out)) - return PyObject_to_object(out) - - def equals(self, Tensor other): - """ - Return true if the tensors contains exactly equal data - """ - return self.tp.Equals(deref(other.tp)) - - property is_mutable: - - def __get__(self): - return self.tp.is_mutable() - - property is_contiguous: - - def __get__(self): - return self.tp.is_contiguous() - - property ndim: - - def __get__(self): - return self.tp.ndim() - - property size: - - def __get__(self): - return self.tp.size() - - property shape: - - def __get__(self): - cdef size_t i - py_shape = [] - for i in range(self.tp.shape().size()): - py_shape.append(self.tp.shape()[i]) - return py_shape - - property strides: - - def __get__(self): - cdef size_t i - py_strides = [] - for i in range(self.tp.strides().size()): - py_strides.append(self.tp.strides()[i]) - return py_strides - - - -cdef wrap_array_output(PyObject* output): - cdef object obj = PyObject_to_object(output) - - if isinstance(obj, dict): - return Categorical(obj['indices'], - categories=obj['dictionary'], - fastpath=True) - else: - return obj - - -cdef class NullArray(Array): - pass - - -cdef class BooleanArray(Array): - pass - - -cdef class NumericArray(Array): - pass - - -cdef class IntegerArray(NumericArray): - pass - - -cdef class FloatingPointArray(NumericArray): - pass - - -cdef class Int8Array(IntegerArray): - pass - - -cdef class UInt8Array(IntegerArray): - pass - - -cdef class Int16Array(IntegerArray): - pass - - -cdef class UInt16Array(IntegerArray): - pass - - -cdef class Int32Array(IntegerArray): - pass - - -cdef class UInt32Array(IntegerArray): - pass - - -cdef class Int64Array(IntegerArray): - pass - - -cdef class UInt64Array(IntegerArray): - pass - - -cdef class Date32Array(NumericArray): - pass - - -cdef class Date64Array(NumericArray): - pass - - -cdef class TimestampArray(NumericArray): - pass - - -cdef class Time32Array(NumericArray): - pass - - -cdef class Time64Array(NumericArray): - pass - - -cdef class FloatArray(FloatingPointArray): - pass - - -cdef class DoubleArray(FloatingPointArray): - pass - - -cdef class FixedSizeBinaryArray(Array): - pass - - -cdef class DecimalArray(FixedSizeBinaryArray): - pass - - -cdef class ListArray(Array): - pass - - -cdef class StringArray(Array): - pass - - -cdef class BinaryArray(Array): - pass - - -cdef class DictionaryArray(Array): - - cdef getitem(self, int64_t i): - cdef Array dictionary = self.dictionary - index = self.indices[i] - if index is NA: - return index - else: - return scalar.box_scalar(dictionary.type, dictionary.sp_array, - index.as_py()) - - property dictionary: - - def __get__(self): - cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap) - - if self._dictionary is None: - self._dictionary = box_array(darr.dictionary()) - - return self._dictionary - - property indices: - - def __get__(self): - cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap) - - if self._indices is None: - self._indices = box_array(darr.indices()) - - return self._indices - - @staticmethod - def from_arrays(indices, dictionary, mask=None, - MemoryPool memory_pool=None): - """ - Construct Arrow DictionaryArray from array of indices (must be - non-negative integers) and corresponding array of dictionary values - - Parameters - ---------- - indices : ndarray or pandas.Series, integer type - dictionary : ndarray or pandas.Series - mask : ndarray or pandas.Series, boolean type - True values indicate that indices are actually null - - Returns - ------- - dict_array : DictionaryArray - """ - cdef: - Array arrow_indices, arrow_dictionary - DictionaryArray result - shared_ptr[CDataType] c_type - shared_ptr[CArray] c_result - - if isinstance(indices, Array): - if mask is not None: - raise NotImplementedError( - "mask not implemented with Arrow array inputs yet") - arrow_indices = indices - else: - if mask is None: - mask = indices == -1 - else: - mask = mask | (indices == -1) - arrow_indices = Array.from_numpy(indices, mask=mask, - memory_pool=memory_pool) - - if isinstance(dictionary, Array): - arrow_dictionary = dictionary - else: - arrow_dictionary = Array.from_numpy(dictionary, - memory_pool=memory_pool) - - if not isinstance(arrow_indices, IntegerArray): - raise ValueError('Indices must be integer type') - - c_type.reset(new CDictionaryType(arrow_indices.type.sp_type, - arrow_dictionary.sp_array)) - c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array)) - - result = DictionaryArray() - result.init(c_result) - return result - - -cdef dict _array_classes = { - Type_NA: NullArray, - Type_BOOL: BooleanArray, - Type_UINT8: UInt8Array, - Type_UINT16: UInt16Array, - Type_UINT32: UInt32Array, - Type_UINT64: UInt64Array, - Type_INT8: Int8Array, - Type_INT16: Int16Array, - Type_INT32: Int32Array, - Type_INT64: Int64Array, - Type_DATE32: Date32Array, - Type_DATE64: Date64Array, - Type_TIMESTAMP: TimestampArray, - Type_TIME32: Time32Array, - Type_TIME64: Time64Array, - Type_FLOAT: FloatArray, - Type_DOUBLE: DoubleArray, - Type_LIST: ListArray, - Type_BINARY: BinaryArray, - Type_STRING: StringArray, - Type_DICTIONARY: DictionaryArray, - Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray, - Type_DECIMAL: DecimalArray, -} - -cdef object box_array(const shared_ptr[CArray]& sp_array): - if sp_array.get() == NULL: - raise ValueError('Array was NULL') - - cdef CDataType* data_type = sp_array.get().type().get() - - if data_type == NULL: - raise ValueError('Array data type was NULL') - - cdef Array arr = _array_classes[data_type.id()]() - arr.init(sp_array) - return arr - - -cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor): - if sp_tensor.get() == NULL: - raise ValueError('Tensor was NULL') - - cdef Tensor tensor = Tensor() - tensor.init(sp_tensor) - return tensor - - -cdef object get_series_values(object obj): - if isinstance(obj, PandasSeries): - result = obj.values - elif isinstance(obj, np.ndarray): - result = obj - else: - result = PandasSeries(obj).values - - return result - - -from_pylist = Array.from_list
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/config.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx deleted file mode 100644 index 536f278..0000000 --- a/python/pyarrow/config.pyx +++ /dev/null @@ -1,54 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. See accompanying LICENSE file. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -cdef extern from 'arrow/python/do_import_numpy.h': - pass - -cdef extern from 'arrow/python/numpy_interop.h' namespace 'arrow::py': - int import_numpy() - -cdef extern from 'arrow/python/config.h' namespace 'arrow::py': - void Init() - void set_numpy_nan(object o) - -import_numpy() -Init() - -import numpy as np -set_numpy_nan(np.nan) - -import multiprocessing -import os -cdef int CPU_COUNT = int( - os.environ.get('OMP_NUM_THREADS', - max(multiprocessing.cpu_count() // 2, 1))) - -def cpu_count(): - """ - Returns - ------- - count : Number of CPUs to use by default in parallel operations. Default is - max(1, multiprocessing.cpu_count() / 2), but can be overridden by the - OMP_NUM_THREADS environment variable. For the default, we divide the CPU - count by 2 because most modern computers have hyperthreading turned on, - so doubling the CPU count beyond the number of physical cores does not - help. - """ - return CPU_COUNT - -def set_cpu_count(count): - global CPU_COUNT - CPU_COUNT = max(int(count), 1) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd deleted file mode 100644 index 4fb46c2..0000000 --- a/python/pyarrow/error.pxd +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.libarrow cimport CStatus - -cdef int check_status(const CStatus& status) nogil except -1 http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx deleted file mode 100644 index 259aeb0..0000000 --- a/python/pyarrow/error.pyx +++ /dev/null @@ -1,70 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.libarrow cimport CStatus -from pyarrow.includes.common cimport c_string -from pyarrow.compat import frombytes - - -class ArrowException(Exception): - pass - - -class ArrowInvalid(ValueError, ArrowException): - pass - - -class ArrowMemoryError(MemoryError, ArrowException): - pass - - -class ArrowIOError(IOError, ArrowException): - pass - - -class ArrowKeyError(KeyError, ArrowException): - pass - - -class ArrowTypeError(TypeError, ArrowException): - pass - - -class ArrowNotImplementedError(NotImplementedError, ArrowException): - pass - - -cdef int check_status(const CStatus& status) nogil except -1: - if status.ok(): - return 0 - - with gil: - message = frombytes(status.ToString()) - if status.IsInvalid(): - raise ArrowInvalid(message) - elif status.IsIOError(): - raise ArrowIOError(message) - elif status.IsOutOfMemory(): - raise ArrowMemoryError(message) - elif status.IsKeyError(): - raise ArrowKeyError(message) - elif status.IsNotImplemented(): - raise ArrowNotImplementedError(message) - elif status.IsTypeError(): - raise ArrowTypeError(message) - else: - raise ArrowException(message) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py index 3b5716e..c7b118e 100644 --- a/python/pyarrow/feather.py +++ b/python/pyarrow/feather.py @@ -22,9 +22,9 @@ import six import pandas as pd from pyarrow.compat import pdapi -from pyarrow.io import FeatherError # noqa -from pyarrow.table import Table -import pyarrow.io as ext +from pyarrow._io import FeatherError # noqa +from pyarrow._table import Table +import pyarrow._io as ext if LooseVersion(pd.__version__) < '0.17.0': http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index 269cf1c..92dd91c 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -19,7 +19,7 @@ from os.path import join as pjoin import os from pyarrow.util import implements -import pyarrow.io as io +import pyarrow._io as io class Filesystem(object): http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/formatting.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/formatting.py b/python/pyarrow/formatting.py index 5fe0611..c358344 100644 --- a/python/pyarrow/formatting.py +++ b/python/pyarrow/formatting.py @@ -17,7 +17,7 @@ # Pretty-printing and other formatting utilities for Arrow data structures -import pyarrow.scalar as scalar +import pyarrow._array as _array def array_format(arr, window=None): @@ -42,7 +42,7 @@ def array_format(arr, window=None): def value_format(x, indent_level=0): - if isinstance(x, scalar.ListValue): + if isinstance(x, _array.ListValue): contents = ',\n'.join(value_format(item) for item in x) return '[{0}]'.format(_indent(contents, 1).strip()) else: http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ae2b45f..2444f3f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -113,8 +113,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CDataType] index_type() shared_ptr[CArray] dictionary() - shared_ptr[CDataType] timestamp(TimeUnit unit) - shared_ptr[CDataType] timestamp(TimeUnit unit, const c_string& timezone) + shared_ptr[CDataType] ctimestamp" arrow::timestamp"(TimeUnit unit) + shared_ptr[CDataType] ctimestamp" arrow::timestamp"( + TimeUnit unit, const c_string& timezone) cdef cppclass CMemoryPool" arrow::MemoryPool": int64_t bytes_allocated() http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd deleted file mode 100644 index 0c37a09..0000000 --- a/python/pyarrow/io.pxd +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# distutils: language = c++ - -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport * - - -cdef class Buffer: - cdef: - shared_ptr[CBuffer] buffer - Py_ssize_t shape[1] - Py_ssize_t strides[1] - - cdef init(self, const shared_ptr[CBuffer]& buffer) - - -cdef class NativeFile: - cdef: - shared_ptr[RandomAccessFile] rd_file - shared_ptr[OutputStream] wr_file - bint is_readable - bint is_writeable - bint is_open - bint own_file - - # By implementing these "virtual" functions (all functions in Cython - # extension classes are technically virtual in the C++ sense) we can expose - # the arrow::io abstract file interfaces to other components throughout the - # suite of Arrow C++ libraries - cdef read_handle(self, shared_ptr[RandomAccessFile]* file) - cdef write_handle(self, shared_ptr[OutputStream]* file) - -cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader) -cdef get_writer(object source, shared_ptr[OutputStream]* writer) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx deleted file mode 100644 index 4eb0816..0000000 --- a/python/pyarrow/io.pyx +++ /dev/null @@ -1,1276 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Cython wrappers for IO interfaces defined in arrow::io and messaging in -# arrow::ipc - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -from libc.stdlib cimport malloc, free - -from pyarrow.includes.libarrow cimport * -cimport pyarrow.includes.pyarrow as pyarrow - -from pyarrow.compat import frombytes, tobytes, encode_file_path -from pyarrow.array cimport Array, Tensor, box_tensor -from pyarrow.error cimport check_status -from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool -from pyarrow.schema cimport Schema -from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch, - table_from_ctable) - -cimport cpython as cp - -import re -import six -import sys -import threading -import time - - -# 64K -DEFAULT_BUFFER_SIZE = 2 ** 16 - - -# To let us get a PyObject* and avoid Cython auto-ref-counting -cdef extern from "Python.h": - PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( - char *v, Py_ssize_t len) except NULL - -cdef class NativeFile: - - def __cinit__(self): - self.is_open = False - self.own_file = False - - def __dealloc__(self): - if self.is_open and self.own_file: - self.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, tb): - self.close() - - def close(self): - if self.is_open: - with nogil: - if self.is_readable: - check_status(self.rd_file.get().Close()) - else: - check_status(self.wr_file.get().Close()) - self.is_open = False - - cdef read_handle(self, shared_ptr[RandomAccessFile]* file): - self._assert_readable() - file[0] = <shared_ptr[RandomAccessFile]> self.rd_file - - cdef write_handle(self, shared_ptr[OutputStream]* file): - self._assert_writeable() - file[0] = <shared_ptr[OutputStream]> self.wr_file - - def _assert_readable(self): - if not self.is_readable: - raise IOError("only valid on readonly files") - - if not self.is_open: - raise IOError("file not open") - - def _assert_writeable(self): - if not self.is_writeable: - raise IOError("only valid on writeable files") - - if not self.is_open: - raise IOError("file not open") - - def size(self): - cdef int64_t size - self._assert_readable() - with nogil: - check_status(self.rd_file.get().GetSize(&size)) - return size - - def tell(self): - cdef int64_t position - with nogil: - if self.is_readable: - check_status(self.rd_file.get().Tell(&position)) - else: - check_status(self.wr_file.get().Tell(&position)) - return position - - def seek(self, int64_t position): - self._assert_readable() - with nogil: - check_status(self.rd_file.get().Seek(position)) - - def write(self, data): - """ - Write byte from any object implementing buffer protocol (bytes, - bytearray, ndarray, pyarrow.Buffer) - """ - self._assert_writeable() - - if isinstance(data, six.string_types): - data = tobytes(data) - - cdef Buffer arrow_buffer = frombuffer(data) - - cdef const uint8_t* buf = arrow_buffer.buffer.get().data() - cdef int64_t bufsize = len(arrow_buffer) - with nogil: - check_status(self.wr_file.get().Write(buf, bufsize)) - - def read(self, nbytes=None): - cdef: - int64_t c_nbytes - int64_t bytes_read = 0 - PyObject* obj - - if nbytes is None: - c_nbytes = self.size() - self.tell() - else: - c_nbytes = nbytes - - self._assert_readable() - - # Allocate empty write space - obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes) - - cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) - with nogil: - check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf)) - - if bytes_read < c_nbytes: - cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) - - return PyObject_to_object(obj) - - def read_buffer(self, nbytes=None): - cdef: - int64_t c_nbytes - int64_t bytes_read = 0 - shared_ptr[CBuffer] output - self._assert_readable() - - if nbytes is None: - c_nbytes = self.size() - self.tell() - else: - c_nbytes = nbytes - - with nogil: - check_status(self.rd_file.get().ReadB(c_nbytes, &output)) - - return wrap_buffer(output) - - def download(self, stream_or_path, buffer_size=None): - """ - Read file completely to local path (rather than reading completely into - memory). First seeks to the beginning of the file. - """ - cdef: - int64_t bytes_read = 0 - uint8_t* buf - self._assert_readable() - - buffer_size = buffer_size or DEFAULT_BUFFER_SIZE - - write_queue = Queue(50) - - if not hasattr(stream_or_path, 'read'): - stream = open(stream_or_path, 'wb') - cleanup = lambda: stream.close() - else: - stream = stream_or_path - cleanup = lambda: None - - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue - stream.write(buf) - except Exception as e: - exc_info = sys.exc_info() - finally: - cleanup() - - self.seek(0) - - writer_thread = threading.Thread(target=bg_write) - - # This isn't ideal -- PyBytes_FromStringAndSize copies the data from - # the passed buffer, so it's hard for us to avoid doubling the memory - buf = <uint8_t*> malloc(buffer_size) - if buf == NULL: - raise MemoryError("Failed to allocate {0} bytes" - .format(buffer_size)) - - writer_thread.start() - - cdef int64_t total_bytes = 0 - cdef int32_t c_buffer_size = buffer_size - - try: - while True: - with nogil: - check_status(self.rd_file.get() - .Read(c_buffer_size, &bytes_read, buf)) - - total_bytes += bytes_read - - # EOF - if bytes_read == 0: - break - - pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, - bytes_read) - - write_queue.put_nowait(pybuf) - finally: - free(buf) - done = True - - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] - - def upload(self, stream, buffer_size=None): - """ - Pipe file-like object to file - """ - write_queue = Queue(50) - self._assert_writeable() - - buffer_size = buffer_size or DEFAULT_BUFFER_SIZE - - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue - - self.write(buf) - - except Exception as e: - exc_info = sys.exc_info() - - writer_thread = threading.Thread(target=bg_write) - writer_thread.start() - - try: - while True: - buf = stream.read(buffer_size) - if not buf: - break - - if writer_thread.is_alive(): - while write_queue.full(): - time.sleep(0.01) - else: - break - - write_queue.put_nowait(buf) - finally: - done = True - - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] - - -# ---------------------------------------------------------------------- -# Python file-like objects - - -cdef class PythonFileInterface(NativeFile): - cdef: - object handle - - def __cinit__(self, handle, mode='w'): - self.handle = handle - - if mode.startswith('w'): - self.wr_file.reset(new pyarrow.PyOutputStream(handle)) - self.is_readable = 0 - self.is_writeable = 1 - elif mode.startswith('r'): - self.rd_file.reset(new pyarrow.PyReadableFile(handle)) - self.is_readable = 1 - self.is_writeable = 0 - else: - raise ValueError('Invalid file mode: {0}'.format(mode)) - - self.is_open = True - - -cdef class MemoryMappedFile(NativeFile): - """ - Supports 'r', 'r+w', 'w' modes - """ - cdef: - object path - - def __cinit__(self): - self.is_open = False - self.is_readable = 0 - self.is_writeable = 0 - - @staticmethod - def create(path, size): - cdef: - shared_ptr[CMemoryMappedFile] handle - c_string c_path = encode_file_path(path) - int64_t c_size = size - - with nogil: - check_status(CMemoryMappedFile.Create(c_path, c_size, &handle)) - - cdef MemoryMappedFile result = MemoryMappedFile() - result.path = path - result.is_readable = 1 - result.is_writeable = 1 - result.wr_file = <shared_ptr[OutputStream]> handle - result.rd_file = <shared_ptr[RandomAccessFile]> handle - result.is_open = True - - return result - - def open(self, path, mode='r'): - self.path = path - - cdef: - FileMode c_mode - shared_ptr[CMemoryMappedFile] handle - c_string c_path = encode_file_path(path) - - if mode in ('r', 'rb'): - c_mode = FileMode_READ - self.is_readable = 1 - elif mode in ('w', 'wb'): - c_mode = FileMode_WRITE - self.is_writeable = 1 - elif mode == 'r+w': - c_mode = FileMode_READWRITE - self.is_readable = 1 - self.is_writeable = 1 - else: - raise ValueError('Invalid file mode: {0}'.format(mode)) - - check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle)) - - self.wr_file = <shared_ptr[OutputStream]> handle - self.rd_file = <shared_ptr[RandomAccessFile]> handle - self.is_open = True - - -def memory_map(path, mode='r'): - """ - Open memory map at file path. Size of the memory map cannot change - - Parameters - ---------- - path : string - mode : {'r', 'w'}, default 'r' - - Returns - ------- - mmap : MemoryMappedFile - """ - cdef MemoryMappedFile mmap = MemoryMappedFile() - mmap.open(path, mode) - return mmap - - -def create_memory_map(path, size): - """ - Create memory map at indicated path of the given size, return open - writeable file object - - Parameters - ---------- - path : string - size : int - - Returns - ------- - mmap : MemoryMappedFile - """ - return MemoryMappedFile.create(path, size) - - -cdef class OSFile(NativeFile): - """ - Supports 'r', 'w' modes - """ - cdef: - object path - - def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): - self.path = path - - cdef: - FileMode c_mode - shared_ptr[Readable] handle - c_string c_path = encode_file_path(path) - - self.is_readable = self.is_writeable = 0 - - if mode in ('r', 'rb'): - self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) - elif mode in ('w', 'wb'): - self._open_writeable(c_path) - else: - raise ValueError('Invalid file mode: {0}'.format(mode)) - - self.is_open = True - - cdef _open_readable(self, c_string path, CMemoryPool* pool): - cdef shared_ptr[ReadableFile] handle - - with nogil: - check_status(ReadableFile.Open(path, pool, &handle)) - - self.is_readable = 1 - self.rd_file = <shared_ptr[RandomAccessFile]> handle - - cdef _open_writeable(self, c_string path): - cdef shared_ptr[FileOutputStream] handle - - with nogil: - check_status(FileOutputStream.Open(path, &handle)) - self.is_writeable = 1 - self.wr_file = <shared_ptr[OutputStream]> handle - - -# ---------------------------------------------------------------------- -# Arrow buffers - - -cdef class Buffer: - - def __cinit__(self): - pass - - cdef init(self, const shared_ptr[CBuffer]& buffer): - self.buffer = buffer - self.shape[0] = self.size - self.strides[0] = <Py_ssize_t>(1) - - def __len__(self): - return self.size - - property size: - - def __get__(self): - return self.buffer.get().size() - - property parent: - - def __get__(self): - cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent() - - if parent_buf.get() == NULL: - return None - else: - return wrap_buffer(parent_buf) - - def __getitem__(self, key): - # TODO(wesm): buffer slicing - raise NotImplementedError - - def to_pybytes(self): - return cp.PyBytes_FromStringAndSize( - <const char*>self.buffer.get().data(), - self.buffer.get().size()) - - def __getbuffer__(self, cp.Py_buffer* buffer, int flags): - - buffer.buf = <char *>self.buffer.get().data() - buffer.format = 'b' - buffer.internal = NULL - buffer.itemsize = 1 - buffer.len = self.size - buffer.ndim = 1 - buffer.obj = self - buffer.readonly = 1 - buffer.shape = self.shape - buffer.strides = self.strides - buffer.suboffsets = NULL - -cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool): - cdef shared_ptr[PoolBuffer] result - result.reset(new PoolBuffer(pool)) - return result - - -cdef class InMemoryOutputStream(NativeFile): - - cdef: - shared_ptr[PoolBuffer] buffer - - def __cinit__(self, MemoryPool memory_pool=None): - self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool)) - self.wr_file.reset(new BufferOutputStream( - <shared_ptr[ResizableBuffer]> self.buffer)) - self.is_readable = 0 - self.is_writeable = 1 - self.is_open = True - - def get_result(self): - check_status(self.wr_file.get().Close()) - self.is_open = False - return wrap_buffer(<shared_ptr[CBuffer]> self.buffer) - - -cdef class BufferReader(NativeFile): - """ - Zero-copy reader from objects convertible to Arrow buffer - - Parameters - ---------- - obj : Python bytes or pyarrow.io.Buffer - """ - cdef: - Buffer buffer - - def __cinit__(self, object obj): - - if isinstance(obj, Buffer): - self.buffer = obj - else: - self.buffer = frombuffer(obj) - - self.rd_file.reset(new CBufferReader(self.buffer.buffer)) - self.is_readable = 1 - self.is_writeable = 0 - self.is_open = True - - -def frombuffer(object obj): - """ - Construct an Arrow buffer from a Python bytes object - """ - cdef shared_ptr[CBuffer] buf - try: - memoryview(obj) - buf.reset(new pyarrow.PyBuffer(obj)) - return wrap_buffer(buf) - except TypeError: - raise ValueError('Must pass object that implements buffer protocol') - - - -cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf): - cdef Buffer result = Buffer() - result.init(buf) - return result - - -cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader): - cdef NativeFile nf - - if isinstance(source, six.string_types): - source = memory_map(source, mode='r') - elif isinstance(source, Buffer): - source = BufferReader(source) - elif not isinstance(source, NativeFile) and hasattr(source, 'read'): - # Optimistically hope this is file-like - source = PythonFileInterface(source, mode='r') - - if isinstance(source, NativeFile): - nf = source - - # TODO: what about read-write sources (e.g. memory maps) - if not nf.is_readable: - raise IOError('Native file is not readable') - - nf.read_handle(reader) - else: - raise TypeError('Unable to read from object of type: {0}' - .format(type(source))) - - -cdef get_writer(object source, shared_ptr[OutputStream]* writer): - cdef NativeFile nf - - if isinstance(source, six.string_types): - source = OSFile(source, mode='w') - elif not isinstance(source, NativeFile) and hasattr(source, 'write'): - # Optimistically hope this is file-like - source = PythonFileInterface(source, mode='w') - - if isinstance(source, NativeFile): - nf = source - - if nf.is_readable: - raise IOError('Native file is not writeable') - - nf.write_handle(writer) - else: - raise TypeError('Unable to read from object of type: {0}' - .format(type(source))) - -# ---------------------------------------------------------------------- -# HDFS IO implementation - -_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') - -try: - # Python 3 - from queue import Queue, Empty as QueueEmpty, Full as QueueFull -except ImportError: - from Queue import Queue, Empty as QueueEmpty, Full as QueueFull - - -def have_libhdfs(): - try: - check_status(HaveLibHdfs()) - return True - except: - return False - - -def have_libhdfs3(): - try: - check_status(HaveLibHdfs3()) - return True - except: - return False - - -def strip_hdfs_abspath(path): - m = _HDFS_PATH_RE.match(path) - if m: - return m.group(3) - else: - return path - - -cdef class _HdfsClient: - cdef: - shared_ptr[CHdfsClient] client - - cdef readonly: - bint is_open - - def __cinit__(self): - pass - - def _connect(self, host, port, user, kerb_ticket, driver): - cdef HdfsConnectionConfig conf - - if host is not None: - conf.host = tobytes(host) - conf.port = port - if user is not None: - conf.user = tobytes(user) - if kerb_ticket is not None: - conf.kerb_ticket = tobytes(kerb_ticket) - - if driver == 'libhdfs': - check_status(HaveLibHdfs()) - conf.driver = HdfsDriver_LIBHDFS - else: - check_status(HaveLibHdfs3()) - conf.driver = HdfsDriver_LIBHDFS3 - - with nogil: - check_status(CHdfsClient.Connect(&conf, &self.client)) - self.is_open = True - - @classmethod - def connect(cls, *args, **kwargs): - return cls(*args, **kwargs) - - def __dealloc__(self): - if self.is_open: - self.close() - - def close(self): - """ - Disconnect from the HDFS cluster - """ - self._ensure_client() - with nogil: - check_status(self.client.get().Disconnect()) - self.is_open = False - - cdef _ensure_client(self): - if self.client.get() == NULL: - raise IOError('HDFS client improperly initialized') - elif not self.is_open: - raise IOError('HDFS client is closed') - - def exists(self, path): - """ - Returns True if the path is known to the cluster, False if it does not - (or there is an RPC error) - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - cdef c_bool result - with nogil: - result = self.client.get().Exists(c_path) - return result - - def isdir(self, path): - cdef HdfsPathInfo info - self._path_info(path, &info) - return info.kind == ObjectType_DIRECTORY - - def isfile(self, path): - cdef HdfsPathInfo info - self._path_info(path, &info) - return info.kind == ObjectType_FILE - - cdef _path_info(self, path, HdfsPathInfo* info): - cdef c_string c_path = tobytes(path) - - with nogil: - check_status(self.client.get() - .GetPathInfo(c_path, info)) - - - def ls(self, path, bint full_info): - cdef: - c_string c_path = tobytes(path) - vector[HdfsPathInfo] listing - list results = [] - int i - - self._ensure_client() - - with nogil: - check_status(self.client.get() - .ListDirectory(c_path, &listing)) - - cdef const HdfsPathInfo* info - for i in range(<int> listing.size()): - info = &listing[i] - - # Try to trim off the hdfs://HOST:PORT piece - name = strip_hdfs_abspath(frombytes(info.name)) - - if full_info: - kind = ('file' if info.kind == ObjectType_FILE - else 'directory') - - results.append({ - 'kind': kind, - 'name': name, - 'owner': frombytes(info.owner), - 'group': frombytes(info.group), - 'list_modified_time': info.last_modified_time, - 'list_access_time': info.last_access_time, - 'size': info.size, - 'replication': info.replication, - 'block_size': info.block_size, - 'permissions': info.permissions - }) - else: - results.append(name) - - return results - - def mkdir(self, path): - """ - Create indicated directory and any necessary parent directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .CreateDirectory(c_path)) - - def delete(self, path, bint recursive=False): - """ - Delete the indicated file or directory - - Parameters - ---------- - path : string - recursive : boolean, default False - If True, also delete child paths for directories - """ - self._ensure_client() - - cdef c_string c_path = tobytes(path) - with nogil: - check_status(self.client.get() - .Delete(c_path, recursive)) - - def open(self, path, mode='rb', buffer_size=None, replication=None, - default_block_size=None): - """ - Parameters - ---------- - mode : string, 'rb', 'wb', 'ab' - """ - self._ensure_client() - - cdef HdfsFile out = HdfsFile() - - if mode not in ('rb', 'wb', 'ab'): - raise Exception("Mode must be 'rb' (read), " - "'wb' (write, new file), or 'ab' (append)") - - cdef c_string c_path = tobytes(path) - cdef c_bool append = False - - # 0 in libhdfs means "use the default" - cdef int32_t c_buffer_size = buffer_size or 0 - cdef int16_t c_replication = replication or 0 - cdef int64_t c_default_block_size = default_block_size or 0 - - cdef shared_ptr[HdfsOutputStream] wr_handle - cdef shared_ptr[HdfsReadableFile] rd_handle - - if mode in ('wb', 'ab'): - if mode == 'ab': - append = True - - with nogil: - check_status( - self.client.get() - .OpenWriteable(c_path, append, c_buffer_size, - c_replication, c_default_block_size, - &wr_handle)) - - out.wr_file = <shared_ptr[OutputStream]> wr_handle - - out.is_readable = False - out.is_writeable = 1 - else: - with nogil: - check_status(self.client.get() - .OpenReadable(c_path, &rd_handle)) - - out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle - out.is_readable = True - out.is_writeable = 0 - - if c_buffer_size == 0: - c_buffer_size = 2 ** 16 - - out.mode = mode - out.buffer_size = c_buffer_size - out.parent = _HdfsFileNanny(self, out) - out.is_open = True - out.own_file = True - - return out - - def download(self, path, stream, buffer_size=None): - with self.open(path, 'rb') as f: - f.download(stream, buffer_size=buffer_size) - - def upload(self, path, stream, buffer_size=None): - """ - Upload file-like object to HDFS path - """ - with self.open(path, 'wb') as f: - f.upload(stream, buffer_size=buffer_size) - - -# ARROW-404: Helper class to ensure that files are closed before the -# client. During deallocation of the extension class, the attributes are -# decref'd which can cause the client to get closed first if the file has the -# last remaining reference -cdef class _HdfsFileNanny: - cdef: - object client - object file_handle_ref - - def __cinit__(self, client, file_handle): - import weakref - self.client = client - self.file_handle_ref = weakref.ref(file_handle) - - def __dealloc__(self): - fh = self.file_handle_ref() - if fh: - fh.close() - # avoid cyclic GC - self.file_handle_ref = None - self.client = None - - -cdef class HdfsFile(NativeFile): - cdef readonly: - int32_t buffer_size - object mode - object parent - - cdef object __weakref__ - - def __dealloc__(self): - self.parent = None - -# ---------------------------------------------------------------------- -# File and stream readers and writers - -cdef class _StreamWriter: - cdef: - shared_ptr[CStreamWriter] writer - shared_ptr[OutputStream] sink - bint closed - - def __cinit__(self): - self.closed = True - - def __dealloc__(self): - if not self.closed: - self.close() - - def _open(self, sink, Schema schema): - get_writer(sink, &self.sink) - - with nogil: - check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema, - &self.writer)) - - self.closed = False - - def write_batch(self, RecordBatch batch): - with nogil: - check_status(self.writer.get() - .WriteRecordBatch(deref(batch.batch))) - - def close(self): - with nogil: - check_status(self.writer.get().Close()) - self.closed = True - - -cdef class _StreamReader: - cdef: - shared_ptr[CStreamReader] reader - - cdef readonly: - Schema schema - - def __cinit__(self): - pass - - def _open(self, source): - cdef: - shared_ptr[RandomAccessFile] reader - shared_ptr[InputStream] in_stream - - get_reader(source, &reader) - in_stream = <shared_ptr[InputStream]> reader - - with nogil: - check_status(CStreamReader.Open(in_stream, &self.reader)) - - self.schema = Schema() - self.schema.init_schema(self.reader.get().schema()) - - def get_next_batch(self): - """ - Read next RecordBatch from the stream. Raises StopIteration at end of - stream - """ - cdef shared_ptr[CRecordBatch] batch - - with nogil: - check_status(self.reader.get().GetNextRecordBatch(&batch)) - - if batch.get() == NULL: - raise StopIteration - - return batch_from_cbatch(batch) - - def read_all(self): - """ - Read all record batches as a pyarrow.Table - """ - cdef: - vector[shared_ptr[CRecordBatch]] batches - shared_ptr[CRecordBatch] batch - shared_ptr[CTable] table - - with nogil: - while True: - check_status(self.reader.get().GetNextRecordBatch(&batch)) - if batch.get() == NULL: - break - batches.push_back(batch) - - check_status(CTable.FromRecordBatches(batches, &table)) - - return table_from_ctable(table) - - -cdef class _FileWriter(_StreamWriter): - - def _open(self, sink, Schema schema): - cdef shared_ptr[CFileWriter] writer - get_writer(sink, &self.sink) - - with nogil: - check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) - - # Cast to base class, because has same interface - self.writer = <shared_ptr[CStreamWriter]> writer - self.closed = False - - -cdef class _FileReader: - cdef: - shared_ptr[CFileReader] reader - - def __cinit__(self): - pass - - def _open(self, source, footer_offset=None): - cdef shared_ptr[RandomAccessFile] reader - get_reader(source, &reader) - - cdef int64_t offset = 0 - if footer_offset is not None: - offset = footer_offset - - with nogil: - if offset != 0: - check_status(CFileReader.Open2(reader, offset, &self.reader)) - else: - check_status(CFileReader.Open(reader, &self.reader)) - - property num_record_batches: - - def __get__(self): - return self.reader.get().num_record_batches() - - def get_batch(self, int i): - cdef shared_ptr[CRecordBatch] batch - - if i < 0 or i >= self.num_record_batches: - raise ValueError('Batch number {0} out of range'.format(i)) - - with nogil: - check_status(self.reader.get().GetRecordBatch(i, &batch)) - - return batch_from_cbatch(batch) - - # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of - # time has passed - get_record_batch = get_batch - - def read_all(self): - """ - Read all record batches as a pyarrow.Table - """ - cdef: - vector[shared_ptr[CRecordBatch]] batches - shared_ptr[CTable] table - int i, nbatches - - nbatches = self.num_record_batches - - batches.resize(nbatches) - with nogil: - for i in range(nbatches): - check_status(self.reader.get().GetRecordBatch(i, &batches[i])) - check_status(CTable.FromRecordBatches(batches, &table)) - - return table_from_ctable(table) - - -#---------------------------------------------------------------------- -# Implement legacy Feather file format - - -class FeatherError(Exception): - pass - - -cdef class FeatherWriter: - cdef: - unique_ptr[CFeatherWriter] writer - - cdef public: - int64_t num_rows - - def __cinit__(self): - self.num_rows = -1 - - def open(self, object dest): - cdef shared_ptr[OutputStream] sink - get_writer(dest, &sink) - - with nogil: - check_status(CFeatherWriter.Open(sink, &self.writer)) - - def close(self): - if self.num_rows < 0: - self.num_rows = 0 - self.writer.get().SetNumRows(self.num_rows) - check_status(self.writer.get().Finalize()) - - def write_array(self, object name, object col, object mask=None): - cdef Array arr - - if self.num_rows >= 0: - if len(col) != self.num_rows: - raise ValueError('prior column had a different number of rows') - else: - self.num_rows = len(col) - - if isinstance(col, Array): - arr = col - else: - arr = Array.from_numpy(col, mask=mask) - - cdef c_string c_name = tobytes(name) - - with nogil: - check_status( - self.writer.get().Append(c_name, deref(arr.sp_array))) - - -cdef class FeatherReader: - cdef: - unique_ptr[CFeatherReader] reader - - def __cinit__(self): - pass - - def open(self, source): - cdef shared_ptr[RandomAccessFile] reader - get_reader(source, &reader) - - with nogil: - check_status(CFeatherReader.Open(reader, &self.reader)) - - property num_rows: - - def __get__(self): - return self.reader.get().num_rows() - - property num_columns: - - def __get__(self): - return self.reader.get().num_columns() - - def get_column_name(self, int i): - cdef c_string name = self.reader.get().GetColumnName(i) - return frombytes(name) - - def get_column(self, int i): - if i < 0 or i >= self.num_columns: - raise IndexError(i) - - cdef shared_ptr[CColumn] sp_column - with nogil: - check_status(self.reader.get() - .GetColumn(i, &sp_column)) - - cdef Column col = Column() - col.init(sp_column) - return col - - -def get_tensor_size(Tensor tensor): - """ - Return total size of serialized Tensor including metadata and padding - """ - cdef int64_t size - with nogil: - check_status(GetTensorSize(deref(tensor.tp), &size)) - return size - - -def get_record_batch_size(RecordBatch batch): - """ - Return total size of serialized RecordBatch including metadata and padding - """ - cdef int64_t size - with nogil: - check_status(GetRecordBatchSize(deref(batch.batch), &size)) - return size - - -def write_tensor(Tensor tensor, NativeFile dest): - """ - Write pyarrow.Tensor to pyarrow.NativeFile object its current position - - Parameters - ---------- - tensor : pyarrow.Tensor - dest : pyarrow.NativeFile - - Returns - ------- - bytes_written : int - Total number of bytes written to the file - """ - cdef: - int32_t metadata_length - int64_t body_length - - dest._assert_writeable() - - with nogil: - check_status( - WriteTensor(deref(tensor.tp), dest.wr_file.get(), - &metadata_length, &body_length)) - - return metadata_length + body_length - - -def read_tensor(NativeFile source): - """ - Read pyarrow.Tensor from pyarrow.NativeFile object from current - position. If the file source supports zero copy (e.g. a memory map), then - this operation does not allocate any memory - - Parameters - ---------- - source : pyarrow.NativeFile - - Returns - ------- - tensor : Tensor - """ - cdef: - shared_ptr[CTensor] sp_tensor - - source._assert_writeable() - - cdef int64_t offset = source.tell() - with nogil: - check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor)) - - return box_tensor(sp_tensor) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 5a56165..f96ead3 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -17,10 +17,10 @@ # Arrow file and stream reader/writer classes, and other messaging tools -import pyarrow.io as io +import pyarrow._io as _io -class StreamReader(io._StreamReader): +class StreamReader(_io._StreamReader): """ Reader for the Arrow streaming binary format @@ -37,7 +37,7 @@ class StreamReader(io._StreamReader): yield self.get_next_batch() -class StreamWriter(io._StreamWriter): +class StreamWriter(_io._StreamWriter): """ Writer for the Arrow streaming binary format @@ -52,7 +52,7 @@ class StreamWriter(io._StreamWriter): self._open(sink, schema) -class FileReader(io._FileReader): +class FileReader(_io._FileReader): """ Class for reading Arrow record batch data from the Arrow binary file format @@ -68,7 +68,7 @@ class FileReader(io._FileReader): self._open(source, footer_offset=footer_offset) -class FileWriter(io._FileWriter): +class FileWriter(_io._FileWriter): """ Writer to create the Arrow binary file format http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/jemalloc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/jemalloc.pyx b/python/pyarrow/jemalloc.pyx deleted file mode 100644 index 97583f4..0000000 --- a/python/pyarrow/jemalloc.pyx +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool -from pyarrow.memory cimport MemoryPool - -def default_pool(): - cdef MemoryPool pool = MemoryPool() - pool.init(CJemallocMemoryPool.default_pool()) - return pool http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/memory.pxd b/python/pyarrow/memory.pxd deleted file mode 100644 index bb1af85..0000000 --- a/python/pyarrow/memory.pxd +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool - - -cdef class MemoryPool: - cdef: - CMemoryPool* pool - - cdef init(self, CMemoryPool* pool) - -cdef class LoggingMemoryPool(MemoryPool): - pass - -cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/memory.pyx b/python/pyarrow/memory.pyx deleted file mode 100644 index 98dbf66..0000000 --- a/python/pyarrow/memory.pyx +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool -from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool - -cdef class MemoryPool: - cdef init(self, CMemoryPool* pool): - self.pool = pool - - def bytes_allocated(self): - return self.pool.bytes_allocated() - -cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool): - if memory_pool is None: - return get_memory_pool() - else: - return memory_pool.pool - -cdef class LoggingMemoryPool(MemoryPool): - pass - -def default_pool(): - cdef: - MemoryPool pool = MemoryPool() - pool.init(get_memory_pool()) - return pool - -def set_default_pool(MemoryPool pool): - set_default_memory_pool(pool.pool) - -def total_allocated_bytes(): - cdef CMemoryPool* pool = get_memory_pool() - return pool.bytes_allocated() http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index f81b6c2..aaec43a 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -23,8 +23,8 @@ from pyarrow.filesystem import LocalFilesystem from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa RowGroupMetaData, Schema, ParquetWriter) import pyarrow._parquet as _parquet # noqa -import pyarrow.array as _array -import pyarrow.table as _table +import pyarrow._array as _array +import pyarrow._table as _table # ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/scalar.pxd b/python/pyarrow/scalar.pxd deleted file mode 100644 index 62a5664..0000000 --- a/python/pyarrow/scalar.pxd +++ /dev/null @@ -1,72 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport * - -from pyarrow.schema cimport DataType - - -cdef class Scalar: - cdef readonly: - DataType type - - -cdef class NAType(Scalar): - pass - - -cdef class ArrayValue(Scalar): - cdef: - shared_ptr[CArray] sp_array - int64_t index - - cdef void init(self, DataType type, - const shared_ptr[CArray]& sp_array, int64_t index) - - cdef void _set_array(self, const shared_ptr[CArray]& sp_array) - - -cdef class Int8Value(ArrayValue): - pass - - -cdef class Int64Value(ArrayValue): - pass - - -cdef class ListValue(ArrayValue): - cdef readonly: - DataType value_type - - cdef: - CListArray* ap - - cdef getitem(self, int64_t i) - - -cdef class StringValue(ArrayValue): - pass - - -cdef class FixedSizeBinaryValue(ArrayValue): - pass - - -cdef object box_scalar(DataType type, - const shared_ptr[CArray]& sp_array, - int64_t index) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx deleted file mode 100644 index 2b6746a..0000000 --- a/python/pyarrow/scalar.pyx +++ /dev/null @@ -1,315 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.schema cimport DataType, box_data_type - -from pyarrow.compat import frombytes -import pyarrow.schema as schema -import decimal -import datetime - -cimport cpython as cp - -NA = None - - -cdef _pandas(): - import pandas as pd - return pd - - -cdef class NAType(Scalar): - - def __cinit__(self): - global NA - if NA is not None: - raise Exception('Cannot create multiple NAType instances') - - self.type = schema.null() - - def __repr__(self): - return 'NA' - - def as_py(self): - return None - -NA = NAType() - -cdef class ArrayValue(Scalar): - - cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array, - int64_t index): - self.type = type - self.index = index - self._set_array(sp_array) - - cdef void _set_array(self, const shared_ptr[CArray]& sp_array): - self.sp_array = sp_array - - def __repr__(self): - if hasattr(self, 'as_py'): - return repr(self.as_py()) - else: - return super(Scalar, self).__repr__() - - -cdef class BooleanValue(ArrayValue): - - def as_py(self): - cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class Int8Value(ArrayValue): - - def as_py(self): - cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class UInt8Value(ArrayValue): - - def as_py(self): - cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class Int16Value(ArrayValue): - - def as_py(self): - cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class UInt16Value(ArrayValue): - - def as_py(self): - cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class Int32Value(ArrayValue): - - def as_py(self): - cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class UInt32Value(ArrayValue): - - def as_py(self): - cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class Int64Value(ArrayValue): - - def as_py(self): - cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class UInt64Value(ArrayValue): - - def as_py(self): - cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class Date32Value(ArrayValue): - - def as_py(self): - cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get() - - # Shift to seconds since epoch - return datetime.datetime.utcfromtimestamp( - int(ap.Value(self.index)) * 86400).date() - - -cdef class Date64Value(ArrayValue): - - def as_py(self): - cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get() - return datetime.datetime.utcfromtimestamp( - ap.Value(self.index) / 1000).date() - - -cdef class TimestampValue(ArrayValue): - - def as_py(self): - cdef: - CTimestampArray* ap = <CTimestampArray*> self.sp_array.get() - CTimestampType* dtype = <CTimestampType*>ap.type().get() - int64_t val = ap.Value(self.index) - - timezone = None - tzinfo = None - if dtype.timezone().size() > 0: - timezone = frombytes(dtype.timezone()) - import pytz - tzinfo = pytz.timezone(timezone) - - try: - pd = _pandas() - if dtype.unit() == TimeUnit_SECOND: - val = val * 1000000000 - elif dtype.unit() == TimeUnit_MILLI: - val = val * 1000000 - elif dtype.unit() == TimeUnit_MICRO: - val = val * 1000 - return pd.Timestamp(val, tz=tzinfo) - except ImportError: - if dtype.unit() == TimeUnit_SECOND: - result = datetime.datetime.utcfromtimestamp(val) - elif dtype.unit() == TimeUnit_MILLI: - result = datetime.datetime.utcfromtimestamp(float(val) / 1000) - elif dtype.unit() == TimeUnit_MICRO: - result = datetime.datetime.utcfromtimestamp( - float(val) / 1000000) - else: - # TimeUnit_NANO - raise NotImplementedError("Cannot convert nanosecond " - "timestamps without pandas") - if timezone is not None: - result = result.replace(tzinfo=tzinfo) - return result - - -cdef class FloatValue(ArrayValue): - - def as_py(self): - cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class DoubleValue(ArrayValue): - - def as_py(self): - cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get() - return ap.Value(self.index) - - -cdef class DecimalValue(ArrayValue): - - def as_py(self): - cdef: - CDecimalArray* ap = <CDecimalArray*> self.sp_array.get() - c_string s = ap.FormatValue(self.index) - return decimal.Decimal(s.decode('utf8')) - - -cdef class StringValue(ArrayValue): - - def as_py(self): - cdef CStringArray* ap = <CStringArray*> self.sp_array.get() - return ap.GetString(self.index).decode('utf-8') - - -cdef class BinaryValue(ArrayValue): - - def as_py(self): - cdef: - const uint8_t* ptr - int32_t length - CBinaryArray* ap = <CBinaryArray*> self.sp_array.get() - - ptr = ap.GetValue(self.index, &length) - return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length) - - -cdef class ListValue(ArrayValue): - - def __len__(self): - return self.ap.value_length(self.index) - - def __getitem__(self, i): - return self.getitem(i) - - def __iter__(self): - for i in range(len(self)): - yield self.getitem(i) - raise StopIteration - - cdef void _set_array(self, const shared_ptr[CArray]& sp_array): - self.sp_array = sp_array - self.ap = <CListArray*> sp_array.get() - self.value_type = box_data_type(self.ap.value_type()) - - cdef getitem(self, int64_t i): - cdef int64_t j = self.ap.value_offset(self.index) + i - return box_scalar(self.value_type, self.ap.values(), j) - - def as_py(self): - cdef: - int64_t j - list result = [] - - for j in range(len(self)): - result.append(self.getitem(j).as_py()) - - return result - - -cdef class FixedSizeBinaryValue(ArrayValue): - - def as_py(self): - cdef: - CFixedSizeBinaryArray* ap - CFixedSizeBinaryType* ap_type - int32_t length - const char* data - ap = <CFixedSizeBinaryArray*> self.sp_array.get() - ap_type = <CFixedSizeBinaryType*> ap.type().get() - length = ap_type.byte_width() - data = <const char*> ap.GetValue(self.index) - return cp.PyBytes_FromStringAndSize(data, length) - - - -cdef dict _scalar_classes = { - Type_BOOL: BooleanValue, - Type_UINT8: Int8Value, - Type_UINT16: Int16Value, - Type_UINT32: Int32Value, - Type_UINT64: Int64Value, - Type_INT8: Int8Value, - Type_INT16: Int16Value, - Type_INT32: Int32Value, - Type_INT64: Int64Value, - Type_DATE32: Date32Value, - Type_DATE64: Date64Value, - Type_TIMESTAMP: TimestampValue, - Type_FLOAT: FloatValue, - Type_DOUBLE: DoubleValue, - Type_LIST: ListValue, - Type_BINARY: BinaryValue, - Type_STRING: StringValue, - Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue, - Type_DECIMAL: DecimalValue, -} - -cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array, - int64_t index): - cdef ArrayValue val - if type.type.id() == Type_NA: - return NA - elif sp_array.get().IsNull(index): - return NA - else: - val = _scalar_classes[type.type.id()]() - val.init(type, sp_array, index) - return val http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/schema.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/schema.pxd b/python/pyarrow/schema.pxd deleted file mode 100644 index eceedba..0000000 --- a/python/pyarrow/schema.pxd +++ /dev/null @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (CDataType, - CDictionaryType, - CTimestampType, - CFixedSizeBinaryType, - CDecimalType, - CField, CSchema) - -cdef class DataType: - cdef: - shared_ptr[CDataType] sp_type - CDataType* type - - cdef void init(self, const shared_ptr[CDataType]& type) - - -cdef class DictionaryType(DataType): - cdef: - const CDictionaryType* dict_type - - -cdef class TimestampType(DataType): - cdef: - const CTimestampType* ts_type - - -cdef class FixedSizeBinaryType(DataType): - cdef: - const CFixedSizeBinaryType* fixed_size_binary_type - - -cdef class DecimalType(FixedSizeBinaryType): - cdef: - const CDecimalType* decimal_type - - -cdef class Field: - cdef: - shared_ptr[CField] sp_field - CField* field - - cdef readonly: - DataType type - - cdef init(self, const shared_ptr[CField]& field) - - -cdef class Schema: - cdef: - shared_ptr[CSchema] sp_schema - CSchema* schema - - cdef init(self, const vector[shared_ptr[CField]]& fields) - cdef init_schema(self, const shared_ptr[CSchema]& schema) - - -cdef DataType box_data_type(const shared_ptr[CDataType]& type) -cdef Field box_field(const shared_ptr[CField]& field) -cdef Schema box_schema(const shared_ptr[CSchema]& schema)