Repository: arrow Updated Branches: refs/heads/master e93436503 -> 8b64a4fb2
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/schema.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx deleted file mode 100644 index 4749809..0000000 --- a/python/pyarrow/schema.pyx +++ /dev/null @@ -1,477 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -######################################## -# Data types, fields, schemas, and so forth - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -from pyarrow.compat import frombytes, tobytes -from pyarrow.array cimport Array -from pyarrow.error cimport check_status -from pyarrow.includes.libarrow cimport (CDataType, CStructType, CListType, - CFixedSizeBinaryType, - CDecimalType, - TimeUnit_SECOND, TimeUnit_MILLI, - TimeUnit_MICRO, TimeUnit_NANO, - Type, TimeUnit) -cimport pyarrow.includes.pyarrow as pyarrow -cimport pyarrow.includes.libarrow as la - -cimport cpython - -import six - - -cdef class DataType: - - def __cinit__(self): - pass - - cdef void init(self, const shared_ptr[CDataType]& type): - self.sp_type = type - self.type = type.get() - - def __str__(self): - return frombytes(self.type.ToString()) - - def __repr__(self): - return '{0.__class__.__name__}({0})'.format(self) - - def __richcmp__(DataType self, DataType other, int op): - if op == cpython.Py_EQ: - return self.type.Equals(deref(other.type)) - elif op == cpython.Py_NE: - return not self.type.Equals(deref(other.type)) - else: - raise TypeError('Invalid comparison') - - -cdef class DictionaryType(DataType): - - cdef void init(self, const shared_ptr[CDataType]& type): - DataType.init(self, type) - self.dict_type = <const CDictionaryType*> type.get() - - -cdef class TimestampType(DataType): - - cdef void init(self, const shared_ptr[CDataType]& type): - DataType.init(self, type) - self.ts_type = <const CTimestampType*> type.get() - - property unit: - - def __get__(self): - return timeunit_to_string(self.ts_type.unit()) - - property tz: - - def __get__(self): - if self.ts_type.timezone().size() > 0: - return frombytes(self.ts_type.timezone()) - else: - return None - - -cdef class FixedSizeBinaryType(DataType): - - cdef void init(self, const shared_ptr[CDataType]& type): - DataType.init(self, type) - self.fixed_size_binary_type = <const CFixedSizeBinaryType*> type.get() - - property byte_width: - - def __get__(self): - return self.fixed_size_binary_type.byte_width() - - -cdef class DecimalType(FixedSizeBinaryType): - - cdef void init(self, const shared_ptr[CDataType]& type): - DataType.init(self, type) - self.decimal_type = <const CDecimalType*> type.get() - - -cdef class Field: - - def __cinit__(self): - pass - - cdef init(self, const shared_ptr[CField]& field): - self.sp_field = field - self.field = field.get() - self.type = box_data_type(field.get().type()) - - @classmethod - def from_py(cls, object name, DataType type, bint nullable=True): - cdef Field result = Field() - result.type = type - result.sp_field.reset(new CField(tobytes(name), type.sp_type, - nullable)) - result.field = result.sp_field.get() - - return result - - def __repr__(self): - return 'Field({0!r}, type={1})'.format(self.name, str(self.type)) - - property nullable: - - def __get__(self): - return self.field.nullable() - - property name: - - def __get__(self): - if box_field(self.sp_field) is None: - raise ReferenceError( - 'Field not initialized (references NULL pointer)') - return frombytes(self.field.name()) - - -cdef class Schema: - - def __cinit__(self): - pass - - def __len__(self): - return self.schema.num_fields() - - def __getitem__(self, i): - if i < 0 or i >= len(self): - raise IndexError("{0} is out of bounds".format(i)) - - cdef Field result = Field() - result.init(self.schema.field(i)) - result.type = box_data_type(result.field.type()) - - return result - - cdef init(self, const vector[shared_ptr[CField]]& fields): - self.schema = new CSchema(fields) - self.sp_schema.reset(self.schema) - - cdef init_schema(self, const shared_ptr[CSchema]& schema): - self.schema = schema.get() - self.sp_schema = schema - - def equals(self, other): - """ - Test if this schema is equal to the other - """ - cdef Schema _other - _other = other - - return self.sp_schema.get().Equals(deref(_other.schema)) - - def field_by_name(self, name): - """ - Access a field by its name rather than the column index. - - Parameters - ---------- - name: str - - Returns - ------- - field: pyarrow.Field - """ - return box_field(self.schema.GetFieldByName(tobytes(name))) - - @classmethod - def from_fields(cls, fields): - cdef: - Schema result - Field field - vector[shared_ptr[CField]] c_fields - - c_fields.resize(len(fields)) - - for i in range(len(fields)): - field = fields[i] - c_fields[i] = field.sp_field - - result = Schema() - result.init(c_fields) - - return result - - def __str__(self): - return frombytes(self.schema.ToString()) - - def __repr__(self): - return self.__str__() - - -cdef dict _type_cache = {} - - -cdef DataType primitive_type(Type type): - if type in _type_cache: - return _type_cache[type] - - cdef DataType out = DataType() - out.init(pyarrow.GetPrimitiveType(type)) - - _type_cache[type] = out - return out - -#------------------------------------------------------------ -# Type factory functions - -def field(name, type, bint nullable=True): - return Field.from_py(name, type, nullable) - - -cdef set PRIMITIVE_TYPES = set([ - la.Type_NA, la.Type_BOOL, - la.Type_UINT8, la.Type_INT8, - la.Type_UINT16, la.Type_INT16, - la.Type_UINT32, la.Type_INT32, - la.Type_UINT64, la.Type_INT64, - la.Type_TIMESTAMP, la.Type_DATE32, - la.Type_DATE64, - la.Type_HALF_FLOAT, - la.Type_FLOAT, - la.Type_DOUBLE]) - - -def null(): - return primitive_type(la.Type_NA) - - -def bool_(): - return primitive_type(la.Type_BOOL) - - -def uint8(): - return primitive_type(la.Type_UINT8) - - -def int8(): - return primitive_type(la.Type_INT8) - - -def uint16(): - return primitive_type(la.Type_UINT16) - - -def int16(): - return primitive_type(la.Type_INT16) - - -def uint32(): - return primitive_type(la.Type_UINT32) - - -def int32(): - return primitive_type(la.Type_INT32) - - -def uint64(): - return primitive_type(la.Type_UINT64) - - -def int64(): - return primitive_type(la.Type_INT64) - - -cdef dict _timestamp_type_cache = {} - - -cdef timeunit_to_string(TimeUnit unit): - if unit == TimeUnit_SECOND: - return 's' - elif unit == TimeUnit_MILLI: - return 'ms' - elif unit == TimeUnit_MICRO: - return 'us' - elif unit == TimeUnit_NANO: - return 'ns' - - -def timestamp(unit_str, tz=None): - cdef: - TimeUnit unit - c_string c_timezone - - if unit_str == "s": - unit = TimeUnit_SECOND - elif unit_str == 'ms': - unit = TimeUnit_MILLI - elif unit_str == 'us': - unit = TimeUnit_MICRO - elif unit_str == 'ns': - unit = TimeUnit_NANO - else: - raise TypeError('Invalid TimeUnit string') - - cdef TimestampType out = TimestampType() - - if tz is None: - out.init(la.timestamp(unit)) - if unit in _timestamp_type_cache: - return _timestamp_type_cache[unit] - _timestamp_type_cache[unit] = out - else: - if not isinstance(tz, six.string_types): - tz = tz.zone - - c_timezone = tobytes(tz) - out.init(la.timestamp(unit, c_timezone)) - - return out - - -def date32(): - return primitive_type(la.Type_DATE32) - - -def date64(): - return primitive_type(la.Type_DATE64) - - -def float16(): - return primitive_type(la.Type_HALF_FLOAT) - - -def float32(): - return primitive_type(la.Type_FLOAT) - - -def float64(): - return primitive_type(la.Type_DOUBLE) - - -cpdef DataType decimal(int precision, int scale=0): - cdef shared_ptr[CDataType] decimal_type - decimal_type.reset(new CDecimalType(precision, scale)) - return box_data_type(decimal_type) - - -def string(): - """ - UTF8 string - """ - return primitive_type(la.Type_STRING) - - -def binary(int length=-1): - """Binary (PyBytes-like) type - - Parameters - ---------- - length : int, optional, default -1 - If length == -1 then return a variable length binary type. If length is - greater than or equal to 0 then return a fixed size binary type of - width `length`. - """ - if length == -1: - return primitive_type(la.Type_BINARY) - - cdef shared_ptr[CDataType] fixed_size_binary_type - fixed_size_binary_type.reset(new CFixedSizeBinaryType(length)) - return box_data_type(fixed_size_binary_type) - - -def list_(DataType value_type): - cdef DataType out = DataType() - cdef shared_ptr[CDataType] list_type - list_type.reset(new CListType(value_type.sp_type)) - out.init(list_type) - return out - - -def dictionary(DataType index_type, Array dictionary): - """ - Dictionary (categorical, or simply encoded) type - """ - cdef DictionaryType out = DictionaryType() - cdef shared_ptr[CDataType] dict_type - dict_type.reset(new CDictionaryType(index_type.sp_type, - dictionary.sp_array)) - out.init(dict_type) - return out - - -def struct(fields): - """ - - """ - cdef: - DataType out = DataType() - Field field - vector[shared_ptr[CField]] c_fields - cdef shared_ptr[CDataType] struct_type - - for field in fields: - c_fields.push_back(field.sp_field) - - struct_type.reset(new CStructType(c_fields)) - out.init(struct_type) - return out - - -def schema(fields): - return Schema.from_fields(fields) - - -cdef DataType box_data_type(const shared_ptr[CDataType]& type): - cdef: - DataType out - - if type.get() == NULL: - return None - - if type.get().id() == la.Type_DICTIONARY: - out = DictionaryType() - elif type.get().id() == la.Type_TIMESTAMP: - out = TimestampType() - elif type.get().id() == la.Type_FIXED_SIZE_BINARY: - out = FixedSizeBinaryType() - elif type.get().id() == la.Type_DECIMAL: - out = DecimalType() - else: - out = DataType() - - out.init(type) - return out - -cdef Field box_field(const shared_ptr[CField]& field): - if field.get() == NULL: - return None - cdef Field out = Field() - out.init(field) - return out - -cdef Schema box_schema(const shared_ptr[CSchema]& type): - cdef Schema out = Schema() - out.init_schema(type) - return out - - -def type_from_numpy_dtype(object dtype): - cdef shared_ptr[CDataType] c_type - with nogil: - check_status(pyarrow.NumPyDtypeToArrow(dtype, &c_type)) - - return box_data_type(c_type) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/table.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd deleted file mode 100644 index f564042..0000000 --- a/python/pyarrow/table.pxd +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.common cimport shared_ptr -from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable, - CRecordBatch) - -from pyarrow.schema cimport Schema - - -cdef class ChunkedArray: - cdef: - shared_ptr[CChunkedArray] sp_chunked_array - CChunkedArray* chunked_array - - cdef init(self, const shared_ptr[CChunkedArray]& chunked_array) - cdef _check_nullptr(self) - - -cdef class Column: - cdef: - shared_ptr[CColumn] sp_column - CColumn* column - - cdef init(self, const shared_ptr[CColumn]& column) - cdef _check_nullptr(self) - - -cdef class Table: - cdef: - shared_ptr[CTable] sp_table - CTable* table - - cdef init(self, const shared_ptr[CTable]& table) - cdef _check_nullptr(self) - - -cdef class RecordBatch: - cdef: - shared_ptr[CRecordBatch] sp_batch - CRecordBatch* batch - Schema _schema - - cdef init(self, const shared_ptr[CRecordBatch]& table) - cdef _check_nullptr(self) - -cdef object box_column(const shared_ptr[CColumn]& ccolumn) -cdef api object table_from_ctable(const shared_ptr[CTable]& ctable) -cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx deleted file mode 100644 index 3972bda..0000000 --- a/python/pyarrow/table.pyx +++ /dev/null @@ -1,915 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.common cimport * -cimport pyarrow.includes.pyarrow as pyarrow - -import pyarrow.config - -from pyarrow.array cimport Array, box_array, wrap_array_output -from pyarrow.error import ArrowException -from pyarrow.error cimport check_status -from pyarrow.schema cimport box_data_type, box_schema, DataType, Field - -from pyarrow.schema import field -from pyarrow.compat import frombytes, tobytes - -cimport cpython - -from collections import OrderedDict - - -cdef _pandas(): - import pandas as pd - return pd - - -cdef class ChunkedArray: - """ - Array backed via one or more memory chunks. - - Warning - ------- - Do not call this class's constructor directly. - """ - - def __cinit__(self): - self.chunked_array = NULL - - cdef init(self, const shared_ptr[CChunkedArray]& chunked_array): - self.sp_chunked_array = chunked_array - self.chunked_array = chunked_array.get() - - cdef _check_nullptr(self): - if self.chunked_array == NULL: - raise ReferenceError("ChunkedArray object references a NULL " - "pointer. Not initialized.") - - def length(self): - self._check_nullptr() - return self.chunked_array.length() - - def __len__(self): - return self.length() - - @property - def null_count(self): - """ - Number of null entires - - Returns - ------- - int - """ - self._check_nullptr() - return self.chunked_array.null_count() - - @property - def num_chunks(self): - """ - Number of underlying chunks - - Returns - ------- - int - """ - self._check_nullptr() - return self.chunked_array.num_chunks() - - def chunk(self, i): - """ - Select a chunk by its index - - Parameters - ---------- - i : int - - Returns - ------- - pyarrow.array.Array - """ - self._check_nullptr() - return box_array(self.chunked_array.chunk(i)) - - def iterchunks(self): - for i in range(self.num_chunks): - yield self.chunk(i) - - def to_pylist(self): - """ - Convert to a list of native Python objects. - """ - result = [] - for i in range(self.num_chunks): - result += self.chunk(i).to_pylist() - return result - - -cdef class Column: - """ - Named vector of elements of equal type. - - Warning - ------- - Do not call this class's constructor directly. - """ - - def __cinit__(self): - self.column = NULL - - cdef init(self, const shared_ptr[CColumn]& column): - self.sp_column = column - self.column = column.get() - - @staticmethod - def from_array(object field_or_name, Array arr): - cdef Field boxed_field - - if isinstance(field_or_name, Field): - boxed_field = field_or_name - else: - boxed_field = field(field_or_name, arr.type) - - cdef shared_ptr[CColumn] sp_column - sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array)) - return box_column(sp_column) - - def to_pandas(self): - """ - Convert the arrow::Column to a pandas.Series - - Returns - ------- - pandas.Series - """ - cdef: - PyObject* out - - check_status(pyarrow.ConvertColumnToPandas(self.sp_column, - <PyObject*> self, &out)) - - return _pandas().Series(wrap_array_output(out), name=self.name) - - def equals(self, Column other): - """ - Check if contents of two columns are equal - - Parameters - ---------- - other : pyarrow.Column - - Returns - ------- - are_equal : boolean - """ - cdef: - CColumn* my_col = self.column - CColumn* other_col = other.column - c_bool result - - self._check_nullptr() - other._check_nullptr() - - with nogil: - result = my_col.Equals(deref(other_col)) - - return result - - def to_pylist(self): - """ - Convert to a list of native Python objects. - """ - return self.data.to_pylist() - - cdef _check_nullptr(self): - if self.column == NULL: - raise ReferenceError("Column object references a NULL pointer." - "Not initialized.") - - def __len__(self): - self._check_nullptr() - return self.column.length() - - def length(self): - self._check_nullptr() - return self.column.length() - - @property - def shape(self): - """ - Dimensions of this columns - - Returns - ------- - (int,) - """ - self._check_nullptr() - return (self.length(),) - - @property - def null_count(self): - """ - Number of null entires - - Returns - ------- - int - """ - self._check_nullptr() - return self.column.null_count() - - @property - def name(self): - """ - Label of the column - - Returns - ------- - str - """ - return bytes(self.column.name()).decode('utf8') - - @property - def type(self): - """ - Type information for this column - - Returns - ------- - pyarrow.schema.DataType - """ - return box_data_type(self.column.type()) - - @property - def data(self): - """ - The underlying data - - Returns - ------- - pyarrow.table.ChunkedArray - """ - cdef ChunkedArray chunked_array = ChunkedArray() - chunked_array.init(self.column.data()) - return chunked_array - - -cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema): - cdef: - Array arr - Column col - c_string c_name - vector[shared_ptr[CField]] fields - cdef shared_ptr[CDataType] type_ - - cdef int K = len(arrays) - - fields.resize(K) - - if len(arrays) == 0: - raise ValueError('Must pass at least one array') - - if isinstance(arrays[0], Array): - if names is None: - raise ValueError('Must pass names when constructing ' - 'from Array objects') - for i in range(K): - arr = arrays[i] - type_ = arr.type.sp_type - c_name = tobytes(names[i]) - fields[i].reset(new CField(c_name, type_, True)) - elif isinstance(arrays[0], Column): - for i in range(K): - col = arrays[i] - type_ = col.sp_column.get().type() - c_name = tobytes(col.name) - fields[i].reset(new CField(c_name, type_, True)) - else: - raise TypeError(type(arrays[0])) - - schema.reset(new CSchema(fields)) - - - -cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema): - cdef: - list names = [] - list arrays = [] - DataType type = None - - for name in df.columns: - col = df[name] - if schema is not None: - type = schema.field_by_name(name).type - - arr = Array.from_numpy(col, type=type, - timestamps_to_ms=timestamps_to_ms) - names.append(name) - arrays.append(arr) - - return names, arrays - - -cdef class RecordBatch: - """ - Batch of rows of columns of equal length - - Warning - ------- - Do not call this class's constructor directly, use one of the ``from_*`` - methods instead. - """ - - def __cinit__(self): - self.batch = NULL - self._schema = None - - cdef init(self, const shared_ptr[CRecordBatch]& batch): - self.sp_batch = batch - self.batch = batch.get() - - cdef _check_nullptr(self): - if self.batch == NULL: - raise ReferenceError("Object not initialized") - - def __len__(self): - self._check_nullptr() - return self.batch.num_rows() - - @property - def num_columns(self): - """ - Number of columns - - Returns - ------- - int - """ - self._check_nullptr() - return self.batch.num_columns() - - @property - def num_rows(self): - """ - Number of rows - - Due to the definition of a RecordBatch, all columns have the same - number of rows. - - Returns - ------- - int - """ - return len(self) - - @property - def schema(self): - """ - Schema of the RecordBatch and its columns - - Returns - ------- - pyarrow.schema.Schema - """ - cdef Schema schema - self._check_nullptr() - if self._schema is None: - schema = Schema() - schema.init_schema(self.batch.schema()) - self._schema = schema - - return self._schema - - def __getitem__(self, i): - return box_array(self.batch.column(i)) - - def slice(self, offset=0, length=None): - """ - Compute zero-copy slice of this RecordBatch - - Parameters - ---------- - offset : int, default 0 - Offset from start of array to slice - length : int, default None - Length of slice (default is until end of batch starting from - offset) - - Returns - ------- - sliced : RecordBatch - """ - cdef shared_ptr[CRecordBatch] result - - if offset < 0: - raise IndexError('Offset must be non-negative') - - if length is None: - result = self.batch.Slice(offset) - else: - result = self.batch.Slice(offset, length) - - return batch_from_cbatch(result) - - def equals(self, RecordBatch other): - cdef: - CRecordBatch* my_batch = self.batch - CRecordBatch* other_batch = other.batch - c_bool result - - self._check_nullptr() - other._check_nullptr() - - with nogil: - result = my_batch.Equals(deref(other_batch)) - - return result - - def to_pydict(self): - """ - Converted the arrow::RecordBatch to an OrderedDict - - Returns - ------- - OrderedDict - """ - entries = [] - for i in range(self.batch.num_columns()): - name = bytes(self.batch.column_name(i)).decode('utf8') - column = self[i].to_pylist() - entries.append((name, column)) - return OrderedDict(entries) - - - def to_pandas(self, nthreads=None): - """ - Convert the arrow::RecordBatch to a pandas DataFrame - - Returns - ------- - pandas.DataFrame - """ - return Table.from_batches([self]).to_pandas(nthreads=nthreads) - - @classmethod - def from_pandas(cls, df, schema=None): - """ - Convert pandas.DataFrame to an Arrow RecordBatch - - Parameters - ---------- - df: pandas.DataFrame - schema: pyarrow.Schema (optional) - The expected schema of the RecordBatch. This can be used to - indicate the type of columns if we cannot infer it automatically. - - Returns - ------- - pyarrow.table.RecordBatch - """ - names, arrays = _dataframe_to_arrays(df, False, schema) - return cls.from_arrays(arrays, names) - - @staticmethod - def from_arrays(arrays, names): - """ - Construct a RecordBatch from multiple pyarrow.Arrays - - Parameters - ---------- - arrays: list of pyarrow.Array - column-wise data vectors - names: list of str - Labels for the columns - - Returns - ------- - pyarrow.table.RecordBatch - """ - cdef: - Array arr - c_string c_name - shared_ptr[CSchema] schema - shared_ptr[CRecordBatch] batch - vector[shared_ptr[CArray]] c_arrays - int64_t num_rows - - if len(arrays) == 0: - raise ValueError('Record batch cannot contain no arrays (for now)') - - num_rows = len(arrays[0]) - _schema_from_arrays(arrays, names, &schema) - - for i in range(len(arrays)): - arr = arrays[i] - c_arrays.push_back(arr.sp_array) - - batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) - return batch_from_cbatch(batch) - - -cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): - cdef: - PyObject* result_obj - CColumn* col - int i - - import pandas.core.internals as _int - from pandas import RangeIndex, Categorical - from pyarrow.compat import DatetimeTZDtype - - with nogil: - check_status(pyarrow.ConvertTableToPandas(table, nthreads, - &result_obj)) - - result = PyObject_to_object(result_obj) - - blocks = [] - for item in result: - block_arr = item['block'] - placement = item['placement'] - if 'dictionary' in item: - cat = Categorical(block_arr, - categories=item['dictionary'], - ordered=False, fastpath=True) - block = _int.make_block(cat, placement=placement, - klass=_int.CategoricalBlock, - fastpath=True) - elif 'timezone' in item: - dtype = DatetimeTZDtype('ns', tz=item['timezone']) - block = _int.make_block(block_arr, placement=placement, - klass=_int.DatetimeTZBlock, - dtype=dtype, fastpath=True) - else: - block = _int.make_block(block_arr, placement=placement) - blocks.append(block) - - names = [] - for i in range(table.get().num_columns()): - col = table.get().column(i).get() - names.append(frombytes(col.name())) - - axes = [names, RangeIndex(table.get().num_rows())] - return _int.BlockManager(blocks, axes) - - -cdef class Table: - """ - A collection of top-level named, equal length Arrow arrays. - - Warning - ------- - Do not call this class's constructor directly, use one of the ``from_*`` - methods instead. - """ - - def __cinit__(self): - self.table = NULL - - def __repr__(self): - return 'pyarrow.Table\n{0}'.format(str(self.schema)) - - cdef init(self, const shared_ptr[CTable]& table): - self.sp_table = table - self.table = table.get() - - cdef _check_nullptr(self): - if self.table == NULL: - raise ReferenceError("Table object references a NULL pointer." - "Not initialized.") - - def equals(self, Table other): - """ - Check if contents of two tables are equal - - Parameters - ---------- - other : pyarrow.Table - - Returns - ------- - are_equal : boolean - """ - cdef: - CTable* my_table = self.table - CTable* other_table = other.table - c_bool result - - self._check_nullptr() - other._check_nullptr() - - with nogil: - result = my_table.Equals(deref(other_table)) - - return result - - @classmethod - def from_pandas(cls, df, timestamps_to_ms=False, schema=None): - """ - Convert pandas.DataFrame to an Arrow Table - - Parameters - ---------- - df: pandas.DataFrame - - timestamps_to_ms: bool - Convert datetime columns to ms resolution. This is needed for - compability with other functionality like Parquet I/O which - only supports milliseconds. - - schema: pyarrow.Schema (optional) - The expected schema of the Arrow Table. This can be used to - indicate the type of columns if we cannot infer it automatically. - - Returns - ------- - pyarrow.table.Table - - Examples - -------- - - >>> import pandas as pd - >>> import pyarrow as pa - >>> df = pd.DataFrame({ - ... 'int': [1, 2], - ... 'str': ['a', 'b'] - ... }) - >>> pa.Table.from_pandas(df) - <pyarrow.table.Table object at 0x7f05d1fb1b40> - """ - names, arrays = _dataframe_to_arrays(df, - timestamps_to_ms=timestamps_to_ms, - schema=schema) - return cls.from_arrays(arrays, names=names) - - @staticmethod - def from_arrays(arrays, names=None): - """ - Construct a Table from Arrow arrays or columns - - Parameters - ---------- - arrays: list of pyarrow.Array or pyarrow.Column - Equal-length arrays that should form the table. - names: list of str, optional - Names for the table columns. If Columns passed, will be - inferred. If Arrays passed, this argument is required - - Returns - ------- - pyarrow.table.Table - - """ - cdef: - vector[shared_ptr[CField]] fields - vector[shared_ptr[CColumn]] columns - shared_ptr[CSchema] schema - shared_ptr[CTable] table - - _schema_from_arrays(arrays, names, &schema) - - cdef int K = len(arrays) - columns.resize(K) - - for i in range(K): - if isinstance(arrays[i], Array): - columns[i].reset(new CColumn(schema.get().field(i), - (<Array> arrays[i]).sp_array)) - elif isinstance(arrays[i], Column): - columns[i] = (<Column> arrays[i]).sp_column - else: - raise ValueError(type(arrays[i])) - - table.reset(new CTable(schema, columns)) - return table_from_ctable(table) - - @staticmethod - def from_batches(batches): - """ - Construct a Table from a list of Arrow RecordBatches - - Parameters - ---------- - - batches: list of RecordBatch - RecordBatch list to be converted, schemas must be equal - """ - cdef: - vector[shared_ptr[CRecordBatch]] c_batches - shared_ptr[CTable] c_table - RecordBatch batch - - for batch in batches: - c_batches.push_back(batch.sp_batch) - - with nogil: - check_status(CTable.FromRecordBatches(c_batches, &c_table)) - - return table_from_ctable(c_table) - - def to_pandas(self, nthreads=None): - """ - Convert the arrow::Table to a pandas DataFrame - - Parameters - ---------- - nthreads : int, default max(1, multiprocessing.cpu_count() / 2) - For the default, we divide the CPU count by 2 because most modern - computers have hyperthreading turned on, so doubling the CPU count - beyond the number of physical cores does not help - - Returns - ------- - pandas.DataFrame - """ - if nthreads is None: - nthreads = pyarrow.config.cpu_count() - - mgr = table_to_blockmanager(self.sp_table, nthreads) - return _pandas().DataFrame(mgr) - - def to_pydict(self): - """ - Converted the arrow::Table to an OrderedDict - - Returns - ------- - OrderedDict - """ - entries = [] - for i in range(self.table.num_columns()): - name = self.column(i).name - column = self.column(i).to_pylist() - entries.append((name, column)) - return OrderedDict(entries) - - @property - def schema(self): - """ - Schema of the table and its columns - - Returns - ------- - pyarrow.schema.Schema - """ - return box_schema(self.table.schema()) - - def column(self, index): - """ - Select a column by its numeric index. - - Parameters - ---------- - index: int - - Returns - ------- - pyarrow.table.Column - """ - self._check_nullptr() - cdef Column column = Column() - column.init(self.table.column(index)) - return column - - def __getitem__(self, i): - return self.column(i) - - def itercolumns(self): - """ - Iterator over all columns in their numerical order - """ - for i in range(self.num_columns): - yield self.column(i) - - @property - def num_columns(self): - """ - Number of columns in this table - - Returns - ------- - int - """ - self._check_nullptr() - return self.table.num_columns() - - @property - def num_rows(self): - """ - Number of rows in this table. - - Due to the definition of a table, all columns have the same number of rows. - - Returns - ------- - int - """ - self._check_nullptr() - return self.table.num_rows() - - def __len__(self): - return self.num_rows - - @property - def shape(self): - """ - Dimensions of the table: (#rows, #columns) - - Returns - ------- - (int, int) - """ - return (self.num_rows, self.num_columns) - - def add_column(self, int i, Column column): - """ - Add column to Table at position. Returns new table - """ - cdef: - shared_ptr[CTable] c_table - - with nogil: - check_status(self.table.AddColumn(i, column.sp_column, &c_table)) - - return table_from_ctable(c_table) - - def append_column(self, Column column): - """ - Append column at end of columns. Returns new table - """ - return self.add_column(self.num_columns, column) - - def remove_column(self, int i): - """ - Create new Table with the indicated column removed - """ - cdef shared_ptr[CTable] c_table - - with nogil: - check_status(self.table.RemoveColumn(i, &c_table)) - - return table_from_ctable(c_table) - - -def concat_tables(tables): - """ - Perform zero-copy concatenation of pyarrow.Table objects. Raises exception - if all of the Table schemas are not the same - - Parameters - ---------- - tables : iterable of pyarrow.Table objects - output_name : string, default None - A name for the output table, if any - """ - cdef: - vector[shared_ptr[CTable]] c_tables - shared_ptr[CTable] c_result - Table table - - for table in tables: - c_tables.push_back(table.sp_table) - - with nogil: - check_status(ConcatenateTables(c_tables, &c_result)) - - return table_from_ctable(c_result) - - -cdef object box_column(const shared_ptr[CColumn]& ccolumn): - cdef Column column = Column() - column.init(ccolumn) - return column - - -cdef api object table_from_ctable(const shared_ptr[CTable]& ctable): - cdef Table table = Table() - table.init(ctable) - return table - - -cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch): - cdef RecordBatch batch = RecordBatch() - batch.init(cbatch) - return batch http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index cba9464..6f8040f 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -25,7 +25,7 @@ import pyarrow as pa from pyarrow.compat import guid from pyarrow.feather import (read_feather, write_feather, FeatherReader) -from pyarrow.io import FeatherWriter +from pyarrow._io import FeatherWriter def random_path(): http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index b8f7e25..d2a5479 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -26,8 +26,6 @@ import pandas.util.testing as pdt import pytest from pyarrow.compat import guid -from pyarrow.filesystem import HdfsClient -import pyarrow.io as io import pyarrow as pa import pyarrow.tests.test_parquet as test_parquet @@ -45,7 +43,7 @@ def hdfs_test_client(driver='libhdfs'): raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' 'an integer') - return HdfsClient(host, port, user, driver=driver) + return pa.HdfsClient(host, port, user, driver=driver) @pytest.mark.hdfs @@ -190,7 +188,7 @@ class TestLibHdfs(HdfsTestCases, unittest.TestCase): @classmethod def check_driver(cls): - if not io.have_libhdfs(): + if not pa.have_libhdfs(): pytest.fail('No libhdfs available on system') def test_hdfs_orphaned_file(self): @@ -209,5 +207,5 @@ class TestLibHdfs3(HdfsTestCases, unittest.TestCase): @classmethod def check_driver(cls): - if not io.have_libhdfs3(): + if not pa.have_libhdfs3(): pytest.fail('No libhdfs3 available on system') http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_io.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index beb6113..c5d3708 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -24,7 +24,6 @@ import numpy as np from pyarrow.compat import u, guid import pyarrow as pa -import pyarrow.io as io # ---------------------------------------------------------------------- # Python file-like objects @@ -33,7 +32,7 @@ import pyarrow.io as io def test_python_file_write(): buf = BytesIO() - f = io.PythonFileInterface(buf) + f = pa.PythonFileInterface(buf) assert f.tell() == 0 @@ -57,7 +56,7 @@ def test_python_file_read(): data = b'some sample data' buf = BytesIO(data) - f = io.PythonFileInterface(buf, mode='r') + f = pa.PythonFileInterface(buf, mode='r') assert f.size() == len(data) @@ -82,7 +81,7 @@ def test_python_file_read(): def test_bytes_reader(): # Like a BytesIO, but zero-copy underneath for C++ consumers data = b'some sample data' - f = io.BufferReader(data) + f = pa.BufferReader(data) assert f.tell() == 0 assert f.size() == len(data) @@ -103,7 +102,7 @@ def test_bytes_reader(): def test_bytes_reader_non_bytes(): with pytest.raises(ValueError): - io.BufferReader(u('some sample data')) + pa.BufferReader(u('some sample data')) def test_bytes_reader_retains_parent_reference(): @@ -112,7 +111,7 @@ def test_bytes_reader_retains_parent_reference(): # ARROW-421 def get_buffer(): data = b'some sample data' * 1000 - reader = io.BufferReader(data) + reader = pa.BufferReader(data) reader.seek(5) return reader.read_buffer(6) @@ -129,7 +128,7 @@ def test_buffer_bytes(): val = b'some data' buf = pa.frombuffer(val) - assert isinstance(buf, io.Buffer) + assert isinstance(buf, pa.Buffer) result = buf.to_pybytes() @@ -140,7 +139,7 @@ def test_buffer_memoryview(): val = b'some data' buf = pa.frombuffer(val) - assert isinstance(buf, io.Buffer) + assert isinstance(buf, pa.Buffer) result = memoryview(buf) @@ -151,7 +150,7 @@ def test_buffer_bytearray(): val = bytearray(b'some data') buf = pa.frombuffer(val) - assert isinstance(buf, io.Buffer) + assert isinstance(buf, pa.Buffer) result = bytearray(buf) @@ -162,7 +161,7 @@ def test_buffer_memoryview_is_immutable(): val = b'some data' buf = pa.frombuffer(val) - assert isinstance(buf, io.Buffer) + assert isinstance(buf, pa.Buffer) result = memoryview(buf) @@ -180,7 +179,7 @@ def test_memory_output_stream(): # 10 bytes val = b'dataabcdef' - f = io.InMemoryOutputStream() + f = pa.InMemoryOutputStream() K = 1000 for i in range(K): @@ -193,7 +192,7 @@ def test_memory_output_stream(): def test_inmemory_write_after_closed(): - f = io.InMemoryOutputStream() + f = pa.InMemoryOutputStream() f.write(b'ok') f.get_result() @@ -213,7 +212,7 @@ def test_buffer_protocol_ref_counting(): def test_nativefile_write_memoryview(): - f = io.InMemoryOutputStream() + f = pa.InMemoryOutputStream() data = b'ok' arr = np.frombuffer(data, dtype='S1') @@ -289,7 +288,7 @@ def test_memory_map_retain_buffer_reference(sample_disk_data): def test_os_file_reader(sample_disk_data): - _check_native_file_reader(io.OSFile, sample_disk_data) + _check_native_file_reader(pa.OSFile, sample_disk_data) def _try_delete(path): @@ -354,10 +353,10 @@ def test_os_file_writer(): f.write(data) # Truncates file - f2 = io.OSFile(path, mode='w') + f2 = pa.OSFile(path, mode='w') f2.write('foo') - with io.OSFile(path) as f3: + with pa.OSFile(path) as f3: assert f3.size() == 3 with pytest.raises(IOError): http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index de1b148..a5c70aa 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -24,7 +24,6 @@ import pytest from pyarrow.compat import guid, u from pyarrow.filesystem import LocalFilesystem import pyarrow as pa -import pyarrow.io as paio from .pandas_examples import dataframe_with_arrays, dataframe_with_lists import numpy as np @@ -180,10 +179,10 @@ def _test_dataframe(size=10000, seed=0): def test_pandas_parquet_native_file_roundtrip(tmpdir): df = _test_dataframe(10000) arrow_table = pa.Table.from_pandas(df) - imos = paio.InMemoryOutputStream() + imos = pa.InMemoryOutputStream() pq.write_table(arrow_table, imos, version="2.0") buf = imos.get_result() - reader = paio.BufferReader(buf) + reader = pa.BufferReader(buf) df_read = pq.read_table(reader).to_pandas() tm.assert_frame_equal(df, df_read) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_schema.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index 5588840..53b6b68 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -16,13 +16,9 @@ # under the License. import pytest - -import pyarrow as pa - import numpy as np -# XXX: pyarrow.schema.schema masks the module on imports -sch = pa._schema +import pyarrow as pa def test_type_integers(): @@ -62,7 +58,7 @@ def test_type_from_numpy_dtype_timestamps(): ] for dt, pt in cases: - result = sch.type_from_numpy_dtype(dt) + result = pa.from_numpy_dtype(dt) assert result == pt http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 99bac15..3991856 100644 --- a/python/setup.py +++ b/python/setup.py @@ -99,16 +99,14 @@ class build_ext(_build_ext): os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0')) CYTHON_MODULE_NAMES = [ - 'array', - 'config', - 'error', - 'io', - 'jemalloc', - 'memory', + '_array', + '_config', + '_error', + '_io', + '_jemalloc', + '_memory', '_parquet', - 'scalar', - 'schema', - 'table'] + '_table'] def _run_cmake(self): # The directory containing this setup.py @@ -261,7 +259,7 @@ class build_ext(_build_ext): def _failure_permitted(self, name): if name == '_parquet' and not self.with_parquet: return True - if name == 'jemalloc' and not self.with_jemalloc: + if name == '_jemalloc' and not self.with_jemalloc: return True return False