http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/pandas_convert.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc new file mode 100644 index 0000000..f2c2415 --- /dev/null +++ b/cpp/src/arrow/python/pandas_convert.cc @@ -0,0 +1,1936 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Functions for pandas conversion via NumPy + +#include <Python.h> + +#include "arrow/python/numpy_interop.h" +#include "arrow/python/pandas_convert.h" + +#include <algorithm> +#include <atomic> +#include <cmath> +#include <cstdint> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <unordered_map> +#include <vector> + +#include "arrow/array.h" +#include "arrow/column.h" +#include "arrow/loader.h" +#include "arrow/python/builtin_convert.h" +#include "arrow/python/common.h" +#include "arrow/python/config.h" +#include "arrow/python/type_traits.h" +#include "arrow/python/util/datetime.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace py { + +// ---------------------------------------------------------------------- +// Utility code + +int cast_npy_type_compat(int type_num) { +// Both LONGLONG and INT64 can be observed in the wild, which is buggy. We set +// U/LONGLONG to U/INT64 so things work properly. + +#if (NPY_INT64 == NPY_LONGLONG) && (NPY_SIZEOF_LONGLONG == 8) + if (type_num == NPY_LONGLONG) { type_num = NPY_INT64; } + if (type_num == NPY_ULONGLONG) { type_num = NPY_UINT64; } +#endif + + return type_num; +} + +static inline bool PyObject_is_null(const PyObject* obj) { + return obj == Py_None || obj == numpy_nan; +} + +static inline bool PyObject_is_string(const PyObject* obj) { +#if PY_MAJOR_VERSION >= 3 + return PyUnicode_Check(obj) || PyBytes_Check(obj); +#else + return PyString_Check(obj) || PyUnicode_Check(obj); +#endif +} + +template <int TYPE> +static int64_t ValuesToBitmap(const void* data, int64_t length, uint8_t* bitmap) { + typedef npy_traits<TYPE> traits; + typedef typename traits::value_type T; + + int64_t null_count = 0; + const T* values = reinterpret_cast<const T*>(data); + + // TODO(wesm): striding + for (int i = 0; i < length; ++i) { + if (traits::isnull(values[i])) { + ++null_count; + } else { + BitUtil::SetBit(bitmap, i); + } + } + + return null_count; +} + +// Returns null count +static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) { + int64_t null_count = 0; + const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask)); + // TODO(wesm): strided null mask + for (int i = 0; i < length; ++i) { + if (mask_values[i]) { + ++null_count; + } else { + BitUtil::SetBit(bitmap, i); + } + } + return null_count; +} + +template <int TYPE> +static int64_t ValuesToValidBytes( + const void* data, int64_t length, uint8_t* valid_bytes) { + typedef npy_traits<TYPE> traits; + typedef typename traits::value_type T; + + int64_t null_count = 0; + const T* values = reinterpret_cast<const T*>(data); + + // TODO(wesm): striding + for (int i = 0; i < length; ++i) { + valid_bytes[i] = !traits::isnull(values[i]); + if (traits::isnull(values[i])) null_count++; + } + + return null_count; +} + +Status CheckFlatNumpyArray(PyArrayObject* numpy_array, int np_type) { + if (PyArray_NDIM(numpy_array) != 1) { + return Status::Invalid("only handle 1-dimensional arrays"); + } + + if (PyArray_DESCR(numpy_array)->type_num != np_type) { + return Status::Invalid("can only handle exact conversions"); + } + + npy_intp* astrides = PyArray_STRIDES(numpy_array); + if (astrides[0] != PyArray_DESCR(numpy_array)->elsize) { + return Status::Invalid("No support for strided arrays in lists yet"); + } + return Status::OK(); +} + +Status AppendObjectStrings(StringBuilder& string_builder, PyObject** objects, + int64_t objects_length, bool* have_bytes) { + PyObject* obj; + + for (int64_t i = 0; i < objects_length; ++i) { + obj = objects[i]; + if (PyUnicode_Check(obj)) { + obj = PyUnicode_AsUTF8String(obj); + if (obj == NULL) { + PyErr_Clear(); + return Status::TypeError("failed converting unicode to UTF8"); + } + const int64_t length = PyBytes_GET_SIZE(obj); + Status s = string_builder.Append(PyBytes_AS_STRING(obj), length); + Py_DECREF(obj); + if (!s.ok()) { return s; } + } else if (PyBytes_Check(obj)) { + *have_bytes = true; + const int64_t length = PyBytes_GET_SIZE(obj); + RETURN_NOT_OK(string_builder.Append(PyBytes_AS_STRING(obj), length)); + } else { + string_builder.AppendNull(); + } + } + + return Status::OK(); +} + +template <typename T> +struct WrapBytes {}; + +template <> +struct WrapBytes<StringArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length); + } +}; + +template <> +struct WrapBytes<BinaryArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length); + } +}; + +static inline bool ListTypeSupported(const Type::type type_id) { + switch (type_id) { + case Type::UINT8: + case Type::INT8: + case Type::UINT16: + case Type::INT16: + case Type::UINT32: + case Type::INT32: + case Type::INT64: + case Type::UINT64: + case Type::FLOAT: + case Type::DOUBLE: + case Type::STRING: + case Type::TIMESTAMP: + // The above types are all supported. + return true; + default: + break; + } + return false; +} + +// ---------------------------------------------------------------------- +// Conversion from NumPy-in-Pandas to Arrow + +class PandasConverter : public TypeVisitor { + public: + PandasConverter( + MemoryPool* pool, PyObject* ao, PyObject* mo, const std::shared_ptr<DataType>& type) + : pool_(pool), + type_(type), + arr_(reinterpret_cast<PyArrayObject*>(ao)), + mask_(nullptr) { + if (mo != nullptr && mo != Py_None) { mask_ = reinterpret_cast<PyArrayObject*>(mo); } + length_ = PyArray_SIZE(arr_); + } + + bool is_strided() const { + npy_intp* astrides = PyArray_STRIDES(arr_); + return astrides[0] != PyArray_DESCR(arr_)->elsize; + } + + Status InitNullBitmap() { + int null_bytes = BitUtil::BytesForBits(length_); + + null_bitmap_ = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(null_bitmap_->Resize(null_bytes)); + + null_bitmap_data_ = null_bitmap_->mutable_data(); + memset(null_bitmap_data_, 0, null_bytes); + + return Status::OK(); + } + + // ---------------------------------------------------------------------- + // Traditional visitor conversion for non-object arrays + + template <typename ArrowType> + Status ConvertData(std::shared_ptr<Buffer>* data); + + template <typename ArrowType> + Status VisitNative() { + using traits = arrow_traits<ArrowType::type_id>; + + if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } + + std::shared_ptr<Buffer> data; + RETURN_NOT_OK(ConvertData<ArrowType>(&data)); + + int64_t null_count = 0; + if (mask_ != nullptr) { + null_count = MaskToBitmap(mask_, length_, null_bitmap_data_); + } else if (traits::supports_nulls) { + // TODO(wesm): this presumes the NumPy C type and arrow C type are the + // same + null_count = ValuesToBitmap<traits::npy_type>( + PyArray_DATA(arr_), length_, null_bitmap_data_); + } + + std::vector<FieldMetadata> fields(1); + fields[0].length = length_; + fields[0].null_count = null_count; + fields[0].offset = 0; + + return LoadArray(type_, fields, {null_bitmap_, data}, &out_); + } + +#define VISIT_NATIVE(TYPE) \ + Status Visit(const TYPE& type) override { return VisitNative<TYPE>(); } + + VISIT_NATIVE(BooleanType); + VISIT_NATIVE(Int8Type); + VISIT_NATIVE(Int16Type); + VISIT_NATIVE(Int32Type); + VISIT_NATIVE(Int64Type); + VISIT_NATIVE(UInt8Type); + VISIT_NATIVE(UInt16Type); + VISIT_NATIVE(UInt32Type); + VISIT_NATIVE(UInt64Type); + VISIT_NATIVE(FloatType); + VISIT_NATIVE(DoubleType); + VISIT_NATIVE(TimestampType); + +#undef VISIT_NATIVE + + Status Convert(std::shared_ptr<Array>* out) { + if (PyArray_NDIM(arr_) != 1) { + return Status::Invalid("only handle 1-dimensional arrays"); + } + // TODO(wesm): strided arrays + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } + + if (type_ == nullptr) { return Status::Invalid("Must pass data type"); } + + // Visit the type to perform conversion + RETURN_NOT_OK(type_->Accept(this)); + + *out = out_; + return Status::OK(); + } + + // ---------------------------------------------------------------------- + // Conversion logic for various object dtype arrays + + template <int ITEM_TYPE, typename ArrowType> + Status ConvertTypedLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + + Status ConvertObjectStrings(std::shared_ptr<Array>* out); + Status ConvertBooleans(std::shared_ptr<Array>* out); + Status ConvertDates(std::shared_ptr<Array>* out); + Status ConvertLists(const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + Status ConvertObjects(std::shared_ptr<Array>* out); + + protected: + MemoryPool* pool_; + std::shared_ptr<DataType> type_; + PyArrayObject* arr_; + PyArrayObject* mask_; + int64_t length_; + + // Used in visitor pattern + std::shared_ptr<Array> out_; + + std::shared_ptr<ResizableBuffer> null_bitmap_; + uint8_t* null_bitmap_data_; +}; + +template <typename ArrowType> +inline Status PandasConverter::ConvertData(std::shared_ptr<Buffer>* data) { + using traits = arrow_traits<ArrowType::type_id>; + + // Handle LONGLONG->INT64 and other fun things + int type_num_compat = cast_npy_type_compat(PyArray_DESCR(arr_)->type_num); + + if (traits::npy_type != type_num_compat) { + return Status::NotImplemented("NumPy type casts not yet implemented"); + } + + *data = std::make_shared<NumPyBuffer>(arr_); + return Status::OK(); +} + +template <> +inline Status PandasConverter::ConvertData<BooleanType>(std::shared_ptr<Buffer>* data) { + int nbytes = BitUtil::BytesForBits(length_); + auto buffer = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); + + const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); + + uint8_t* bitmap = buffer->mutable_data(); + + memset(bitmap, 0, nbytes); + for (int i = 0; i < length_; ++i) { + if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } + } + + *data = buffer; + return Status::OK(); +} + +Status PandasConverter::ConvertDates(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + Date64Builder date_builder(pool_); + RETURN_NOT_OK(date_builder.Resize(length_)); + + Status s; + PyObject* obj; + for (int64_t i = 0; i < length_; ++i) { + obj = objects[i]; + if (PyDate_CheckExact(obj)) { + PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj); + date_builder.Append(PyDate_to_ms(pydate)); + } else { + date_builder.AppendNull(); + } + } + return date_builder.Finish(out); +} + +Status PandasConverter::ConvertObjectStrings(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + + // The output type at this point is inconclusive because there may be bytes + // and unicode mixed in the object array + + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + StringBuilder string_builder(pool_); + RETURN_NOT_OK(string_builder.Resize(length_)); + + Status s; + bool have_bytes = false; + RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes)); + RETURN_NOT_OK(string_builder.Finish(out)); + + if (have_bytes) { + const auto& arr = static_cast<const StringArray&>(*out->get()); + *out = std::make_shared<BinaryArray>(arr.length(), arr.value_offsets(), arr.data(), + arr.null_bitmap(), arr.null_count()); + } + return Status::OK(); +} + +Status PandasConverter::ConvertBooleans(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + + int nbytes = BitUtil::BytesForBits(length_); + auto data = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(data->Resize(nbytes)); + uint8_t* bitmap = data->mutable_data(); + memset(bitmap, 0, nbytes); + + int64_t null_count = 0; + for (int64_t i = 0; i < length_; ++i) { + if (objects[i] == Py_True) { + BitUtil::SetBit(bitmap, i); + BitUtil::SetBit(null_bitmap_data_, i); + } else if (objects[i] != Py_False) { + ++null_count; + } else { + BitUtil::SetBit(null_bitmap_data_, i); + } + } + + *out = std::make_shared<BooleanArray>(length_, data, null_bitmap_, null_count); + + return Status::OK(); +} + +Status PandasConverter::ConvertObjects(std::shared_ptr<Array>* out) { + // Python object arrays are annoying, since we could have one of: + // + // * Strings + // * Booleans with nulls + // * Mixed type (not supported at the moment by arrow format) + // + // Additionally, nulls may be encoded either as np.nan or None. So we have to + // do some type inference and conversion + + RETURN_NOT_OK(InitNullBitmap()); + + // TODO: mask not supported here + if (mask_ != nullptr) { + return Status::NotImplemented("mask not supported in object conversions yet"); + } + + const PyObject** objects; + { + PyAcquireGIL lock; + objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_)); + PyDateTime_IMPORT; + } + + if (type_) { + switch (type_->type) { + case Type::STRING: + return ConvertObjectStrings(out); + case Type::BOOL: + return ConvertBooleans(out); + case Type::DATE64: + return ConvertDates(out); + case Type::LIST: { + const auto& list_field = static_cast<const ListType&>(*type_); + return ConvertLists(list_field.value_field()->type, out); + } + default: + return Status::TypeError("No known conversion to Arrow type"); + } + } else { + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + continue; + } else if (PyObject_is_string(objects[i])) { + return ConvertObjectStrings(out); + } else if (PyBool_Check(objects[i])) { + return ConvertBooleans(out); + } else if (PyDate_CheckExact(objects[i])) { + return ConvertDates(out); + } else { + return Status::TypeError("unhandled python type"); + } + } + } + + return Status::TypeError("Unable to infer type of object array, were all null"); +} + +template <int ITEM_TYPE, typename ArrowType> +inline Status PandasConverter::ConvertTypedLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + typedef npy_traits<ITEM_TYPE> traits; + typedef typename traits::value_type T; + typedef typename traits::BuilderClass BuilderT; + + PyAcquireGIL lock; + + auto value_builder = std::make_shared<BuilderT>(pool_, type); + ListBuilder list_builder(pool_, value_builder); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + RETURN_NOT_OK(list_builder.AppendNull()); + } else if (PyArray_Check(objects[i])) { + auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); + RETURN_NOT_OK(list_builder.Append(true)); + + // TODO(uwe): Support more complex numpy array structures + RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE)); + + int64_t size = PyArray_DIM(numpy_array, 0); + auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_array)); + if (traits::supports_nulls) { + null_bitmap_->Resize(size, false); + // TODO(uwe): A bitmap would be more space-efficient but the Builder API doesn't + // currently support this. + // ValuesToBitmap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); + ValuesToValidBytes<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); + RETURN_NOT_OK(value_builder->Append(data, size, null_bitmap_->data())); + } else { + RETURN_NOT_OK(value_builder->Append(data, size)); + } + + } else if (PyList_Check(objects[i])) { + int64_t size; + std::shared_ptr<DataType> inferred_type; + RETURN_NOT_OK(list_builder.Append(true)); + RETURN_NOT_OK(InferArrowType(objects[i], &size, &inferred_type)); + if (inferred_type->type != type->type) { + std::stringstream ss; + ss << inferred_type->ToString() << " cannot be converted to " << type->ToString(); + return Status::TypeError(ss.str()); + } + RETURN_NOT_OK(AppendPySequence(objects[i], type, value_builder)); + } else { + return Status::TypeError("Unsupported Python type for list items"); + } + } + return list_builder.Finish(out); +} + +template <> +inline Status PandasConverter::ConvertTypedLists<NPY_OBJECT, StringType>( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + // TODO: If there are bytes involed, convert to Binary representation + bool have_bytes = false; + + auto value_builder = std::make_shared<StringBuilder>(pool_); + ListBuilder list_builder(pool_, value_builder); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + RETURN_NOT_OK(list_builder.AppendNull()); + } else if (PyArray_Check(objects[i])) { + auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); + RETURN_NOT_OK(list_builder.Append(true)); + + // TODO(uwe): Support more complex numpy array structures + RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, NPY_OBJECT)); + + int64_t size = PyArray_DIM(numpy_array, 0); + auto data = reinterpret_cast<PyObject**>(PyArray_DATA(numpy_array)); + RETURN_NOT_OK(AppendObjectStrings(*value_builder.get(), data, size, &have_bytes)); + } else if (PyList_Check(objects[i])) { + int64_t size; + std::shared_ptr<DataType> inferred_type; + RETURN_NOT_OK(list_builder.Append(true)); + RETURN_NOT_OK(InferArrowType(objects[i], &size, &inferred_type)); + if (inferred_type->type != Type::STRING) { + std::stringstream ss; + ss << inferred_type->ToString() << " cannot be converted to STRING."; + return Status::TypeError(ss.str()); + } + RETURN_NOT_OK(AppendPySequence(objects[i], inferred_type, value_builder)); + } else { + return Status::TypeError("Unsupported Python type for list items"); + } + } + return list_builder.Finish(out); +} + +#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType) \ + case Type::TYPE: { \ + return ConvertTypedLists<NUMPY_TYPE, ArrowType>(type, out); \ + } + +Status PandasConverter::ConvertLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + switch (type->type) { + LIST_CASE(UINT8, NPY_UINT8, UInt8Type) + LIST_CASE(INT8, NPY_INT8, Int8Type) + LIST_CASE(UINT16, NPY_UINT16, UInt16Type) + LIST_CASE(INT16, NPY_INT16, Int16Type) + LIST_CASE(UINT32, NPY_UINT32, UInt32Type) + LIST_CASE(INT32, NPY_INT32, Int32Type) + LIST_CASE(UINT64, NPY_UINT64, UInt64Type) + LIST_CASE(INT64, NPY_INT64, Int64Type) + LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType) + LIST_CASE(FLOAT, NPY_FLOAT, FloatType) + LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType) + LIST_CASE(STRING, NPY_OBJECT, StringType) + default: + return Status::TypeError("Unknown list item type"); + } + + return Status::TypeError("Unknown list type"); +} + +Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PandasConverter converter(pool, ao, mo, type); + return converter.Convert(out); +} + +Status PandasObjectsToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PandasConverter converter(pool, ao, mo, type); + return converter.ConvertObjects(out); +} + +Status PandasDtypeToArrow(PyObject* dtype, std::shared_ptr<DataType>* out) { + PyArray_Descr* descr = reinterpret_cast<PyArray_Descr*>(dtype); + + int type_num = cast_npy_type_compat(descr->type_num); + +#define TO_ARROW_TYPE_CASE(NPY_NAME, FACTORY) \ + case NPY_##NPY_NAME: \ + *out = FACTORY(); \ + break; + + switch (type_num) { + TO_ARROW_TYPE_CASE(BOOL, boolean); + TO_ARROW_TYPE_CASE(INT8, int8); + TO_ARROW_TYPE_CASE(INT16, int16); + TO_ARROW_TYPE_CASE(INT32, int32); + TO_ARROW_TYPE_CASE(INT64, int64); +#if (NPY_INT64 != NPY_LONGLONG) + TO_ARROW_TYPE_CASE(LONGLONG, int64); +#endif + TO_ARROW_TYPE_CASE(UINT8, uint8); + TO_ARROW_TYPE_CASE(UINT16, uint16); + TO_ARROW_TYPE_CASE(UINT32, uint32); + TO_ARROW_TYPE_CASE(UINT64, uint64); +#if (NPY_UINT64 != NPY_ULONGLONG) + TO_ARROW_CASE(ULONGLONG); +#endif + TO_ARROW_TYPE_CASE(FLOAT32, float32); + TO_ARROW_TYPE_CASE(FLOAT64, float64); + case NPY_DATETIME: { + auto date_dtype = + reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + TimeUnit unit; + switch (date_dtype->meta.base) { + case NPY_FR_s: + unit = TimeUnit::SECOND; + break; + case NPY_FR_ms: + unit = TimeUnit::MILLI; + break; + case NPY_FR_us: + unit = TimeUnit::MICRO; + break; + case NPY_FR_ns: + unit = TimeUnit::NANO; + break; + default: + return Status::NotImplemented("Unsupported datetime64 time unit"); + } + *out = timestamp(unit); + } break; + default: { + std::stringstream ss; + ss << "Unsupported numpy type " << descr->type_num << std::endl; + return Status::NotImplemented(ss.str()); + } + } + +#undef TO_ARROW_TYPE_CASE + + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// pandas 0.x DataFrame conversion internals + +inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { + if (type == NPY_DATETIME) { + PyArray_Descr* descr = PyArray_DESCR(out); + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + if (datatype->type == Type::TIMESTAMP) { + auto timestamp_type = static_cast<TimestampType*>(datatype); + + switch (timestamp_type->unit) { + case TimestampType::Unit::SECOND: + date_dtype->meta.base = NPY_FR_s; + break; + case TimestampType::Unit::MILLI: + date_dtype->meta.base = NPY_FR_ms; + break; + case TimestampType::Unit::MICRO: + date_dtype->meta.base = NPY_FR_us; + break; + case TimestampType::Unit::NANO: + date_dtype->meta.base = NPY_FR_ns; + break; + } + } else { + // datatype->type == Type::DATE64 + date_dtype->meta.base = NPY_FR_D; + } + } +} + +class PandasBlock { + public: + enum type { + OBJECT, + UINT8, + INT8, + UINT16, + INT16, + UINT32, + INT32, + UINT64, + INT64, + FLOAT, + DOUBLE, + BOOL, + DATETIME, + DATETIME_WITH_TZ, + CATEGORICAL + }; + + PandasBlock(int64_t num_rows, int num_columns) + : num_rows_(num_rows), num_columns_(num_columns) {} + virtual ~PandasBlock() {} + + virtual Status Allocate() = 0; + virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) = 0; + + PyObject* block_arr() const { return block_arr_.obj(); } + + virtual Status GetPyResult(PyObject** output) { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); + + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); + + *output = result; + + return Status::OK(); + } + + protected: + Status AllocateNDArray(int npy_type, int ndim = 2) { + PyAcquireGIL lock; + + PyObject* block_arr; + if (ndim == 2) { + npy_intp block_dims[2] = {num_columns_, num_rows_}; + block_arr = PyArray_SimpleNew(2, block_dims, npy_type); + } else { + npy_intp block_dims[1] = {num_rows_}; + block_arr = PyArray_SimpleNew(1, block_dims, npy_type); + } + + if (block_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } + + npy_intp placement_dims[1] = {num_columns_}; + PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64); + if (placement_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } + + block_arr_.reset(block_arr); + placement_arr_.reset(placement_arr); + + block_data_ = reinterpret_cast<uint8_t*>( + PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr))); + + placement_data_ = reinterpret_cast<int64_t*>( + PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr))); + + return Status::OK(); + } + + int64_t num_rows_; + int num_columns_; + + OwnedRef block_arr_; + uint8_t* block_data_; + + // ndarray<int32> + OwnedRef placement_arr_; + int64_t* placement_data_; + + private: + DISALLOW_COPY_AND_ASSIGN(PandasBlock); +}; + +template <typename T> +inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + } +} + +template <typename T> +inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } +} + +template <typename InType, typename OutType> +inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values = in_values[i]; + } + } +} + +static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<BooleanArray*>(arr.get()); + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values++ = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + *out_values++ = Py_True; + } else { + // False + Py_INCREF(Py_False); + *out_values++ = Py_False; + } + } + } + return Status::OK(); +} + +static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<BooleanArray*>(arr.get()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = static_cast<uint8_t>(bool_arr->Value(i)); + } + } +} + +template <typename ArrayType> +inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = static_cast<ArrayType*>(data.chunk(c).get()); + + const uint8_t* data_ptr; + int32_t length; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = arr->GetValue(i, &length); + *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length); + if (*out_values == nullptr) { + PyErr_Clear(); + std::stringstream ss; + ss << "Wrapping " + << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed"; + return Status::UnknownError(ss.str()); + } + } + ++out_values; + } + } + return Status::OK(); +} + +template <typename ArrowType> +inline Status ConvertListsLike( + const std::shared_ptr<Column>& col, PyObject** out_values) { + const ChunkedArray& data = *col->data().get(); + auto list_type = std::static_pointer_cast<ListType>(col->type()); + + // Get column of underlying value arrays + std::vector<std::shared_ptr<Array>> value_arrays; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<ListArray>(data.chunk(c)); + value_arrays.emplace_back(arr->values()); + } + auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays); + // TODO(ARROW-489): Currently we don't have a Python reference for single columns. + // Storing a reference to the whole Array would be to expensive. + PyObject* numpy_array; + RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array)); + + PyAcquireGIL lock; + + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<ListArray>(data.chunk(c)); + + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + PyObject* start = PyLong_FromLong(arr->value_offset(i)); + PyObject* end = PyLong_FromLong(arr->value_offset(i + 1)); + PyObject* slice = PySlice_New(start, end, NULL); + *out_values = PyObject_GetItem(numpy_array, slice); + Py_DECREF(start); + Py_DECREF(end); + Py_DECREF(slice); + } + ++out_values; + } + } + + Py_XDECREF(numpy_array); + return Status::OK(); +} + +template <typename T> +inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + const uint8_t* valid_bits = arr->null_bitmap_data(); + + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; + } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } + } +} + +template <typename InType, typename OutType> +inline void ConvertNumericNullableCast( + const ChunkedArray& data, OutType na_value, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]); + } + } +} + +template <typename T> +inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + // There are 1000 * 60 * 60 * 24 = 86400000ms in a day + *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; + } + } +} + +template <typename InType, int SHIFT> +inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? kPandasTimestampNull + : (static_cast<int64_t>(in_values[i]) * SHIFT); + } + } +} + +#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \ + case Type::ArrowEnum: \ + RETURN_NOT_OK((ConvertListsLike<ArrowType>(col, out_buffer))); \ + break; + +class ObjectBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + virtual ~ObjectBlock() {} + + Status Allocate() override { return AllocateNDArray(NPY_OBJECT); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + PyObject** out_buffer = + reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + if (type == Type::BOOL) { + RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer)); + } else if (type == Type::BINARY) { + RETURN_NOT_OK(ConvertBinaryLike<BinaryArray>(data, out_buffer)); + } else if (type == Type::STRING) { + RETURN_NOT_OK(ConvertBinaryLike<StringArray>(data, out_buffer)); + } else if (type == Type::LIST) { + auto list_type = std::static_pointer_cast<ListType>(col->type()); + switch (list_type->value_type()->type) { + CONVERTLISTSLIKE_CASE(UInt8Type, UINT8) + CONVERTLISTSLIKE_CASE(Int8Type, INT8) + CONVERTLISTSLIKE_CASE(UInt16Type, UINT16) + CONVERTLISTSLIKE_CASE(Int16Type, INT16) + CONVERTLISTSLIKE_CASE(UInt32Type, UINT32) + CONVERTLISTSLIKE_CASE(Int32Type, INT32) + CONVERTLISTSLIKE_CASE(UInt64Type, UINT64) + CONVERTLISTSLIKE_CASE(Int64Type, INT64) + CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP) + CONVERTLISTSLIKE_CASE(FloatType, FLOAT) + CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE) + CONVERTLISTSLIKE_CASE(StringType, STRING) + default: { + std::stringstream ss; + ss << "Not implemented type for lists: " << list_type->value_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + } + } else { + std::stringstream ss; + ss << "Unsupported type for object array output: " << col->type()->ToString(); + return Status::NotImplemented(ss.str()); + } + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +template <int ARROW_TYPE, typename C_TYPE> +class IntBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type); + } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + C_TYPE* out_buffer = + reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); } + + ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +using UInt8Block = IntBlock<Type::UINT8, uint8_t>; +using Int8Block = IntBlock<Type::INT8, int8_t>; +using UInt16Block = IntBlock<Type::UINT16, uint16_t>; +using Int16Block = IntBlock<Type::INT16, int16_t>; +using UInt32Block = IntBlock<Type::UINT32, uint32_t>; +using Int32Block = IntBlock<Type::INT32, int32_t>; +using UInt64Block = IntBlock<Type::UINT64, uint64_t>; +using Int64Block = IntBlock<Type::INT64, int64_t>; + +class Float32Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); } + + float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_; + + ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +class Float64Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + double* out_buffer = + reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + +#define INTEGER_CASE(IN_TYPE) \ + ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \ + break; + + switch (type) { + case Type::UINT8: + INTEGER_CASE(uint8_t); + case Type::INT8: + INTEGER_CASE(int8_t); + case Type::UINT16: + INTEGER_CASE(uint16_t); + case Type::INT16: + INTEGER_CASE(int16_t); + case Type::UINT32: + INTEGER_CASE(uint32_t); + case Type::INT32: + INTEGER_CASE(int32_t); + case Type::UINT64: + INTEGER_CASE(uint64_t); + case Type::INT64: + INTEGER_CASE(int64_t); + case Type::FLOAT: + ConvertNumericNullableCast<float, double>(data, NAN, out_buffer); + break; + case Type::DOUBLE: + ConvertNumericNullable<double>(data, NAN, out_buffer); + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + +#undef INTEGER_CASE + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +class BoolBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { return AllocateNDArray(NPY_BOOL); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); } + + uint8_t* out_buffer = + reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_; + + ConvertBooleanNoNulls(*col->data().get(), out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +class DatetimeBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status AllocateDatetime(int ndim) { + RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME, ndim)); + + PyAcquireGIL lock; + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>( + PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))->c_metadata); + date_dtype->meta.base = NPY_FR_ns; + return Status::OK(); + } + + Status Allocate() override { return AllocateDatetime(2); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + int64_t* out_buffer = + reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col.get()->data(); + + if (type == Type::DATE64) { + // Date64Type is millisecond timestamp stored as int64_t + // TODO(wesm): Do we want to make sure to zero out the milliseconds? + ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); + } else if (type == Type::TIMESTAMP) { + auto ts_type = static_cast<TimestampType*>(col->type().get()); + + if (ts_type->unit == TimeUnit::NANO) { + ConvertNumericNullable<int64_t>(data, kPandasTimestampNull, out_buffer); + } else if (ts_type->unit == TimeUnit::MICRO) { + ConvertDatetimeNanos<int64_t, 1000L>(data, out_buffer); + } else if (ts_type->unit == TimeUnit::MILLI) { + ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); + } else if (ts_type->unit == TimeUnit::SECOND) { + ConvertDatetimeNanos<int64_t, 1000000000L>(data, out_buffer); + } else { + return Status::NotImplemented("Unsupported time unit"); + } + } else { + return Status::NotImplemented(col->type()->ToString()); + } + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } +}; + +class DatetimeTZBlock : public DatetimeBlock { + public: + DatetimeTZBlock(const std::string& timezone, int64_t num_rows) + : DatetimeBlock(num_rows, 1), timezone_(timezone) {} + + // Like Categorical, the internal ndarray is 1-dimensional + Status Allocate() override { return AllocateDatetime(1); } + + Status GetPyResult(PyObject** output) override { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); + + PyObject* py_tz = PyUnicode_FromStringAndSize( + timezone_.c_str(), static_cast<Py_ssize_t>(timezone_.size())); + RETURN_IF_PYERROR(); + + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "timezone", py_tz); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); + + *output = result; + + return Status::OK(); + } + + private: + std::string timezone_; +}; + +template <int ARROW_INDEX_TYPE> +class CategoricalBlock : public PandasBlock { + public: + explicit CategoricalBlock(int64_t num_rows) : PandasBlock(num_rows, 1) {} + + Status Allocate() override { + constexpr int npy_type = arrow_traits<ARROW_INDEX_TYPE>::npy_type; + + if (!(npy_type == NPY_INT8 || npy_type == NPY_INT16 || npy_type == NPY_INT32 || + npy_type == NPY_INT64)) { + return Status::Invalid("Category indices must be signed integers"); + } + return AllocateNDArray(npy_type, 1); + } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + using T = typename arrow_traits<ARROW_INDEX_TYPE>::T; + + T* out_values = reinterpret_cast<T*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + const auto& dict_arr = static_cast<const DictionaryArray&>(*arr); + const auto& indices = static_cast<const PrimitiveArray&>(*dict_arr.indices()); + auto in_values = reinterpret_cast<const T*>(indices.data()->data()); + + // Null is -1 in CategoricalBlock + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = indices.IsNull(i) ? -1 : in_values[i]; + } + } + + placement_data_[rel_placement] = abs_placement; + + auto dict_type = static_cast<const DictionaryType*>(col->type().get()); + + PyObject* dict; + RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, &dict)); + dictionary_.reset(dict); + + return Status::OK(); + } + + Status GetPyResult(PyObject** output) override { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); + + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "dictionary", dictionary_.obj()); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); + + *output = result; + + return Status::OK(); + } + + protected: + OwnedRef dictionary_; +}; + +Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, + std::shared_ptr<PandasBlock>* block) { +#define BLOCK_CASE(NAME, TYPE) \ + case PandasBlock::NAME: \ + *block = std::make_shared<TYPE>(num_rows, num_columns); \ + break; + + switch (type) { + BLOCK_CASE(OBJECT, ObjectBlock); + BLOCK_CASE(UINT8, UInt8Block); + BLOCK_CASE(INT8, Int8Block); + BLOCK_CASE(UINT16, UInt16Block); + BLOCK_CASE(INT16, Int16Block); + BLOCK_CASE(UINT32, UInt32Block); + BLOCK_CASE(INT32, Int32Block); + BLOCK_CASE(UINT64, UInt64Block); + BLOCK_CASE(INT64, Int64Block); + BLOCK_CASE(FLOAT, Float32Block); + BLOCK_CASE(DOUBLE, Float64Block); + BLOCK_CASE(BOOL, BoolBlock); + BLOCK_CASE(DATETIME, DatetimeBlock); + default: + return Status::NotImplemented("Unsupported block type"); + } + +#undef BLOCK_CASE + + return (*block)->Allocate(); +} + +static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& type, + int64_t num_rows, std::shared_ptr<PandasBlock>* block) { + // All categoricals become a block with a single column + auto dict_type = static_cast<const DictionaryType*>(type.get()); + switch (dict_type->index_type()->type) { + case Type::INT8: + *block = std::make_shared<CategoricalBlock<Type::INT8>>(num_rows); + break; + case Type::INT16: + *block = std::make_shared<CategoricalBlock<Type::INT16>>(num_rows); + break; + case Type::INT32: + *block = std::make_shared<CategoricalBlock<Type::INT32>>(num_rows); + break; + case Type::INT64: + *block = std::make_shared<CategoricalBlock<Type::INT64>>(num_rows); + break; + default: { + std::stringstream ss; + ss << "Categorical index type not implemented: " + << dict_type->index_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + } + return (*block)->Allocate(); +} + +using BlockMap = std::unordered_map<int, std::shared_ptr<PandasBlock>>; + +// Construct the exact pandas 0.x "BlockManager" memory layout +// +// * For each column determine the correct output pandas type +// * Allocate 2D blocks (ncols x nrows) for each distinct data type in output +// * Allocate block placement arrays +// * Write Arrow columns out into each slice of memory; populate block +// * placement arrays as we go +class DataFrameBlockCreator { + public: + explicit DataFrameBlockCreator(const std::shared_ptr<Table>& table) : table_(table) {} + + Status Convert(int nthreads, PyObject** output) { + column_types_.resize(table_->num_columns()); + column_block_placement_.resize(table_->num_columns()); + type_counts_.clear(); + blocks_.clear(); + + RETURN_NOT_OK(CreateBlocks()); + RETURN_NOT_OK(WriteTableToBlocks(nthreads)); + + return GetResultList(output); + } + + Status CreateBlocks() { + for (int i = 0; i < table_->num_columns(); ++i) { + std::shared_ptr<Column> col = table_->column(i); + PandasBlock::type output_type; + + Type::type column_type = col->type()->type; + switch (column_type) { + case Type::BOOL: + output_type = col->null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL; + break; + case Type::UINT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT8; + break; + case Type::INT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT8; + break; + case Type::UINT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT16; + break; + case Type::INT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT16; + break; + case Type::UINT32: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT32; + break; + case Type::INT32: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT32; + break; + case Type::INT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT64; + break; + case Type::UINT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT64; + break; + case Type::FLOAT: + output_type = PandasBlock::FLOAT; + break; + case Type::DOUBLE: + output_type = PandasBlock::DOUBLE; + break; + case Type::STRING: + case Type::BINARY: + output_type = PandasBlock::OBJECT; + break; + case Type::DATE64: + output_type = PandasBlock::DATETIME; + break; + case Type::TIMESTAMP: { + const auto& ts_type = static_cast<const TimestampType&>(*col->type()); + if (ts_type.timezone != "") { + output_type = PandasBlock::DATETIME_WITH_TZ; + } else { + output_type = PandasBlock::DATETIME; + } + } break; + case Type::LIST: { + auto list_type = std::static_pointer_cast<ListType>(col->type()); + if (!ListTypeSupported(list_type->value_type()->type)) { + std::stringstream ss; + ss << "Not implemented type for lists: " + << list_type->value_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + output_type = PandasBlock::OBJECT; + } break; + case Type::DICTIONARY: + output_type = PandasBlock::CATEGORICAL; + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + + int block_placement = 0; + std::shared_ptr<PandasBlock> block; + if (output_type == PandasBlock::CATEGORICAL) { + RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); + categorical_blocks_[i] = block; + } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { + const auto& ts_type = static_cast<const TimestampType&>(*col->type()); + block = std::make_shared<DatetimeTZBlock>(ts_type.timezone, table_->num_rows()); + RETURN_NOT_OK(block->Allocate()); + datetimetz_blocks_[i] = block; + } else { + auto it = type_counts_.find(output_type); + if (it != type_counts_.end()) { + block_placement = it->second; + // Increment count + it->second += 1; + } else { + // Add key to map + type_counts_[output_type] = 1; + } + } + + column_types_[i] = output_type; + column_block_placement_[i] = block_placement; + } + + // Create normal non-categorical blocks + for (const auto& it : type_counts_) { + PandasBlock::type type = static_cast<PandasBlock::type>(it.first); + std::shared_ptr<PandasBlock> block; + RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block)); + blocks_[type] = block; + } + return Status::OK(); + } + + Status WriteTableToBlocks(int nthreads) { + auto WriteColumn = [this](int i) { + std::shared_ptr<Column> col = this->table_->column(i); + PandasBlock::type output_type = this->column_types_[i]; + + int rel_placement = this->column_block_placement_[i]; + + std::shared_ptr<PandasBlock> block; + if (output_type == PandasBlock::CATEGORICAL) { + auto it = this->categorical_blocks_.find(i); + if (it == this->blocks_.end()) { + return Status::KeyError("No categorical block allocated"); + } + block = it->second; + } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { + auto it = this->datetimetz_blocks_.find(i); + if (it == this->datetimetz_blocks_.end()) { + return Status::KeyError("No datetimetz block allocated"); + } + block = it->second; + } else { + auto it = this->blocks_.find(output_type); + if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); } + block = it->second; + } + return block->Write(col, i, rel_placement); + }; + + nthreads = std::min<int>(nthreads, table_->num_columns()); + + if (nthreads == 1) { + for (int i = 0; i < table_->num_columns(); ++i) { + RETURN_NOT_OK(WriteColumn(i)); + } + } else { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() { + int column_num; + while (!error_occurred) { + column_num = task_counter.fetch_add(1); + if (column_num >= this->table_->num_columns()) { break; } + Status s = WriteColumn(column_num); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + + if (error_occurred) { return error; } + } + return Status::OK(); + } + + Status AppendBlocks(const BlockMap& blocks, PyObject* list) { + for (const auto& it : blocks) { + PyObject* item; + RETURN_NOT_OK(it.second->GetPyResult(&item)); + if (PyList_Append(list, item) < 0) { RETURN_IF_PYERROR(); } + } + return Status::OK(); + } + + Status GetResultList(PyObject** out) { + PyAcquireGIL lock; + + PyObject* result = PyList_New(0); + RETURN_IF_PYERROR(); + + RETURN_NOT_OK(AppendBlocks(blocks_, result)); + RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result)); + RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result)); + + *out = result; + return Status::OK(); + } + + private: + std::shared_ptr<Table> table_; + + // column num -> block type id + std::vector<PandasBlock::type> column_types_; + + // column num -> relative placement within internal block + std::vector<int> column_block_placement_; + + // block type -> type count + std::unordered_map<int, int> type_counts_; + + // block type -> block + BlockMap blocks_; + + // column number -> categorical block + BlockMap categorical_blocks_; + + // column number -> datetimetz block + BlockMap datetimetz_blocks_; +}; + +class ArrowDeserializer { + public: + ArrowDeserializer(const std::shared_ptr<Column>& col, PyObject* py_ref) + : col_(col), data_(*col->data().get()), py_ref_(py_ref) {} + + Status AllocateOutput(int type) { + PyAcquireGIL lock; + + npy_intp dims[1] = {col_->length()}; + result_ = PyArray_SimpleNew(1, dims, type); + arr_ = reinterpret_cast<PyArrayObject*>(result_); + + if (arr_ == NULL) { + // Error occurred, trust that SimpleNew set the error state + return Status::OK(); + } + + set_numpy_metadata(type, col_->type().get(), arr_); + + return Status::OK(); + } + + template <int TYPE> + Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr<Array> arr) { + typedef typename arrow_traits<TYPE>::T T; + + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast<T*>(in_values); + + PyAcquireGIL lock; + + // Zero-Copy. We can pass the data pointer directly to NumPy. + npy_intp dims[1] = {col_->length()}; + result_ = PyArray_SimpleNewFromData(1, dims, npy_type, data); + arr_ = reinterpret_cast<PyArrayObject*>(result_); + + if (arr_ == NULL) { + // Error occurred, trust that SimpleNew set the error state + return Status::OK(); + } + + set_numpy_metadata(npy_type, col_->type().get(), arr_); + + if (PyArray_SetBaseObject(arr_, py_ref_) == -1) { + // Error occurred, trust that SetBaseObject set the error state + return Status::OK(); + } else { + // PyArray_SetBaseObject steals our reference to py_ref_ + Py_INCREF(py_ref_); + } + + // Arrow data is immutable. + PyArray_CLEARFLAGS(arr_, NPY_ARRAY_WRITEABLE); + + return Status::OK(); + } + + // ---------------------------------------------------------------------- + // Allocate new array and deserialize. Can do a zero copy conversion for some + // types + + Status Convert(PyObject** out) { +#define CONVERT_CASE(TYPE) \ + case Type::TYPE: { \ + RETURN_NOT_OK(ConvertValues<Type::TYPE>()); \ + } break; + + switch (col_->type()->type) { + CONVERT_CASE(BOOL); + CONVERT_CASE(INT8); + CONVERT_CASE(INT16); + CONVERT_CASE(INT32); + CONVERT_CASE(INT64); + CONVERT_CASE(UINT8); + CONVERT_CASE(UINT16); + CONVERT_CASE(UINT32); + CONVERT_CASE(UINT64); + CONVERT_CASE(FLOAT); + CONVERT_CASE(DOUBLE); + CONVERT_CASE(BINARY); + CONVERT_CASE(STRING); + CONVERT_CASE(DATE64); + CONVERT_CASE(TIMESTAMP); + CONVERT_CASE(DICTIONARY); + CONVERT_CASE(LIST); + default: { + std::stringstream ss; + ss << "Arrow type reading not implemented for " << col_->type()->ToString(); + return Status::NotImplemented(ss.str()); + } + } + +#undef CONVERT_CASE + + *out = result_; + return Status::OK(); + } + + template <int TYPE> + inline typename std::enable_if< + (TYPE != Type::DATE64) & arrow_traits<TYPE>::is_numeric_nullable, Status>::type + ConvertValues() { + typedef typename arrow_traits<TYPE>::T T; + int npy_type = arrow_traits<TYPE>::npy_type; + + if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) { + return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0)); + } + + RETURN_NOT_OK(AllocateOutput(npy_type)); + auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_)); + ConvertNumericNullable<T>(data_, arrow_traits<TYPE>::na_value, out_values); + + return Status::OK(); + } + + template <int TYPE> + inline typename std::enable_if<TYPE == Type::DATE64, Status>::type ConvertValues() { + typedef typename arrow_traits<TYPE>::T T; + + RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); + auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_)); + ConvertDates<T>(data_, arrow_traits<TYPE>::na_value, out_values); + return Status::OK(); + } + + // Integer specialization + template <int TYPE> + inline + typename std::enable_if<arrow_traits<TYPE>::is_numeric_not_nullable, Status>::type + ConvertValues() { + typedef typename arrow_traits<TYPE>::T T; + int npy_type = arrow_traits<TYPE>::npy_type; + + if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) { + return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0)); + } + + if (data_.null_count() > 0) { + RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + auto out_values = reinterpret_cast<double*>(PyArray_DATA(arr_)); + ConvertIntegerWithNulls<T>(data_, out_values); + } else { + RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); + auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_)); + ConvertIntegerNoNullsSameType<T>(data_, out_values); + } + + return Status::OK(); + } + + // Boolean specialization + template <int TYPE> + inline typename std::enable_if<arrow_traits<TYPE>::is_boolean, Status>::type + ConvertValues() { + if (data_.null_count() > 0) { + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + RETURN_NOT_OK(ConvertBooleanWithNulls(data_, out_values)); + } else { + RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); + auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(arr_)); + ConvertBooleanNoNulls(data_, out_values); + } + return Status::OK(); + } + + // UTF8 strings + template <int TYPE> + inline typename std::enable_if<TYPE == Type::STRING, Status>::type ConvertValues() { + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + return ConvertBinaryLike<StringArray>(data_, out_values); + } + + template <int T2> + inline typename std::enable_if<T2 == Type::BINARY, Status>::type ConvertValues() { + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + return ConvertBinaryLike<BinaryArray>(data_, out_values); + } + +#define CONVERTVALUES_LISTSLIKE_CASE(ArrowType, ArrowEnum) \ + case Type::ArrowEnum: \ + return ConvertListsLike<ArrowType>(col_, out_values); + + template <int T2> + inline typename std::enable_if<T2 == Type::LIST, Status>::type ConvertValues() { + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + auto list_type = std::static_pointer_cast<ListType>(col_->type()); + switch (list_type->value_type()->type) { + CONVERTVALUES_LISTSLIKE_CASE(UInt8Type, UINT8) + CONVERTVALUES_LISTSLIKE_CASE(Int8Type, INT8) + CONVERTVALUES_LISTSLIKE_CASE(UInt16Type, UINT16) + CONVERTVALUES_LISTSLIKE_CASE(Int16Type, INT16) + CONVERTVALUES_LISTSLIKE_CASE(UInt32Type, UINT32) + CONVERTVALUES_LISTSLIKE_CASE(Int32Type, INT32) + CONVERTVALUES_LISTSLIKE_CASE(UInt64Type, UINT64) + CONVERTVALUES_LISTSLIKE_CASE(Int64Type, INT64) + CONVERTVALUES_LISTSLIKE_CASE(TimestampType, TIMESTAMP) + CONVERTVALUES_LISTSLIKE_CASE(FloatType, FLOAT) + CONVERTVALUES_LISTSLIKE_CASE(DoubleType, DOUBLE) + CONVERTVALUES_LISTSLIKE_CASE(StringType, STRING) + default: { + std::stringstream ss; + ss << "Not implemented type for lists: " << list_type->value_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + } + } + + template <int TYPE> + inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type ConvertValues() { + std::shared_ptr<PandasBlock> block; + RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block)); + RETURN_NOT_OK(block->Write(col_, 0, 0)); + + auto dict_type = static_cast<const DictionaryType*>(col_->type().get()); + + PyAcquireGIL lock; + result_ = PyDict_New(); + RETURN_IF_PYERROR(); + + PyObject* dictionary; + RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, &dictionary)); + + PyDict_SetItemString(result_, "indices", block->block_arr()); + PyDict_SetItemString(result_, "dictionary", dictionary); + + return Status::OK(); + } + + private: + std::shared_ptr<Column> col_; + const ChunkedArray& data_; + PyObject* py_ref_; + PyArrayObject* arr_; + PyObject* result_; +}; + +Status ConvertArrayToPandas( + const std::shared_ptr<Array>& arr, PyObject* py_ref, PyObject** out) { + static std::string dummy_name = "dummy"; + auto field = std::make_shared<Field>(dummy_name, arr->type()); + auto col = std::make_shared<Column>(field, arr); + return ConvertColumnToPandas(col, py_ref, out); +} + +Status ConvertColumnToPandas( + const std::shared_ptr<Column>& col, PyObject* py_ref, PyObject** out) { + ArrowDeserializer converter(col, py_ref); + return converter.Convert(out); +} + +Status ConvertTableToPandas( + const std::shared_ptr<Table>& table, int nthreads, PyObject** out) { + DataFrameBlockCreator helper(table); + return helper.Convert(nthreads, out); +} + +} // namespace py +} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/pandas_convert.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/pandas_convert.h b/cpp/src/arrow/python/pandas_convert.h new file mode 100644 index 0000000..a33741e --- /dev/null +++ b/cpp/src/arrow/python/pandas_convert.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Functions for converting between pandas's NumPy-based data representation +// and Arrow data structures + +#ifndef ARROW_PYTHON_ADAPTERS_PANDAS_H +#define ARROW_PYTHON_ADAPTERS_PANDAS_H + +#include <Python.h> + +#include <memory> + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Column; +class DataType; +class MemoryPool; +class Status; +class Table; + +namespace py { + +ARROW_EXPORT +Status ConvertArrayToPandas( + const std::shared_ptr<Array>& arr, PyObject* py_ref, PyObject** out); + +ARROW_EXPORT +Status ConvertColumnToPandas( + const std::shared_ptr<Column>& col, PyObject* py_ref, PyObject** out); + +struct PandasOptions { + bool strings_to_categorical; +}; + +// Convert a whole table as efficiently as possible to a pandas.DataFrame. +// +// The returned Python object is a list of tuples consisting of the exact 2D +// BlockManager structure of the pandas.DataFrame used as of pandas 0.19.x. +// +// tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2]) +ARROW_EXPORT +Status ConvertTableToPandas( + const std::shared_ptr<Table>& table, int nthreads, PyObject** out); + +ARROW_EXPORT +Status PandasDtypeToArrow(PyObject* dtype, std::shared_ptr<DataType>* out); + +ARROW_EXPORT +Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + +/// Convert dtype=object arrays. If target data type is not known, pass a type +/// with nullptr +ARROW_EXPORT +Status PandasObjectsToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + +} // namespace py +} // namespace arrow + +#endif // ARROW_PYTHON_ADAPTERS_PANDAS_H http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/type_traits.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/type_traits.h b/cpp/src/arrow/python/type_traits.h new file mode 100644 index 0000000..f78dc36 --- /dev/null +++ b/cpp/src/arrow/python/type_traits.h @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <Python.h> + +#include <cstdint> +#include <limits> + +#include "arrow/python/numpy_interop.h" + +#include "arrow/builder.h" +#include "arrow/type.h" + +namespace arrow { +namespace py { + +template <int TYPE> +struct npy_traits {}; + +template <> +struct npy_traits<NPY_BOOL> { + typedef uint8_t value_type; + using TypeClass = BooleanType; + using BuilderClass = BooleanBuilder; + + static constexpr bool supports_nulls = false; + static inline bool isnull(uint8_t v) { return false; } +}; + +#define NPY_INT_DECL(TYPE, CapType, T) \ + template <> \ + struct npy_traits<NPY_##TYPE> { \ + typedef T value_type; \ + using TypeClass = CapType##Type; \ + using BuilderClass = CapType##Builder; \ + \ + static constexpr bool supports_nulls = false; \ + static inline bool isnull(T v) { return false; } \ + }; + +NPY_INT_DECL(INT8, Int8, int8_t); +NPY_INT_DECL(INT16, Int16, int16_t); +NPY_INT_DECL(INT32, Int32, int32_t); +NPY_INT_DECL(INT64, Int64, int64_t); + +NPY_INT_DECL(UINT8, UInt8, uint8_t); +NPY_INT_DECL(UINT16, UInt16, uint16_t); +NPY_INT_DECL(UINT32, UInt32, uint32_t); +NPY_INT_DECL(UINT64, UInt64, uint64_t); + +#if NPY_INT64 != NPY_LONGLONG +NPY_INT_DECL(LONGLONG, Int64, int64_t); +NPY_INT_DECL(ULONGLONG, UInt64, uint64_t); +#endif + +template <> +struct npy_traits<NPY_FLOAT32> { + typedef float value_type; + using TypeClass = FloatType; + using BuilderClass = FloatBuilder; + + static constexpr bool supports_nulls = true; + + static inline bool isnull(float v) { return v != v; } +}; + +template <> +struct npy_traits<NPY_FLOAT64> { + typedef double value_type; + using TypeClass = DoubleType; + using BuilderClass = DoubleBuilder; + + static constexpr bool supports_nulls = true; + + static inline bool isnull(double v) { return v != v; } +}; + +template <> +struct npy_traits<NPY_DATETIME> { + typedef int64_t value_type; + using TypeClass = TimestampType; + using BuilderClass = TimestampBuilder; + + static constexpr bool supports_nulls = true; + + static inline bool isnull(int64_t v) { + // NaT = -2**63 + // = -0x8000000000000000 + // = -9223372036854775808; + // = std::numeric_limits<int64_t>::min() + return v == std::numeric_limits<int64_t>::min(); + } +}; + +template <> +struct npy_traits<NPY_OBJECT> { + typedef PyObject* value_type; + static constexpr bool supports_nulls = true; +}; + +template <int TYPE> +struct arrow_traits {}; + +template <> +struct arrow_traits<Type::BOOL> { + static constexpr int npy_type = NPY_BOOL; + static constexpr bool supports_nulls = false; + static constexpr bool is_boolean = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; + +#define INT_DECL(TYPE) \ + template <> \ + struct arrow_traits<Type::TYPE> { \ + static constexpr int npy_type = NPY_##TYPE; \ + static constexpr bool supports_nulls = false; \ + static constexpr double na_value = NAN; \ + static constexpr bool is_boolean = false; \ + static constexpr bool is_numeric_not_nullable = true; \ + static constexpr bool is_numeric_nullable = false; \ + typedef typename npy_traits<NPY_##TYPE>::value_type T; \ + }; + +INT_DECL(INT8); +INT_DECL(INT16); +INT_DECL(INT32); +INT_DECL(INT64); +INT_DECL(UINT8); +INT_DECL(UINT16); +INT_DECL(UINT32); +INT_DECL(UINT64); + +template <> +struct arrow_traits<Type::FLOAT> { + static constexpr int npy_type = NPY_FLOAT32; + static constexpr bool supports_nulls = true; + static constexpr float na_value = NAN; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_FLOAT32>::value_type T; +}; + +template <> +struct arrow_traits<Type::DOUBLE> { + static constexpr int npy_type = NPY_FLOAT64; + static constexpr bool supports_nulls = true; + static constexpr double na_value = NAN; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_FLOAT64>::value_type T; +}; + +static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min(); + +template <> +struct arrow_traits<Type::TIMESTAMP> { + static constexpr int npy_type = NPY_DATETIME; + static constexpr bool supports_nulls = true; + static constexpr int64_t na_value = kPandasTimestampNull; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_DATETIME>::value_type T; +}; + +template <> +struct arrow_traits<Type::DATE64> { + static constexpr int npy_type = NPY_DATETIME; + static constexpr bool supports_nulls = true; + static constexpr int64_t na_value = kPandasTimestampNull; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_DATETIME>::value_type T; +}; + +template <> +struct arrow_traits<Type::STRING> { + static constexpr int npy_type = NPY_OBJECT; + static constexpr bool supports_nulls = true; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; + +template <> +struct arrow_traits<Type::BINARY> { + static constexpr int npy_type = NPY_OBJECT; + static constexpr bool supports_nulls = true; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; + +} // namespace py +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/util/CMakeLists.txt b/cpp/src/arrow/python/util/CMakeLists.txt new file mode 100644 index 0000000..4cc20f6 --- /dev/null +++ b/cpp/src/arrow/python/util/CMakeLists.txt @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####################################### +# arrow/python_test_main +####################################### + +if (PYARROW_BUILD_TESTS) + add_library(arrow/python_test_main STATIC + test_main.cc) + + if (APPLE) + target_link_libraries(arrow/python_test_main + gtest + dl) + set_target_properties(arrow/python_test_main + PROPERTIES LINK_FLAGS "-undefined dynamic_lookup") + else() + target_link_libraries(arrow/python_test_main + gtest + pthread + dl + ) + endif() +endif() http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/util/datetime.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/util/datetime.h b/cpp/src/arrow/python/util/datetime.h new file mode 100644 index 0000000..f704a96 --- /dev/null +++ b/cpp/src/arrow/python/util/datetime.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYARROW_UTIL_DATETIME_H +#define PYARROW_UTIL_DATETIME_H + +#include <Python.h> +#include <datetime.h> + +namespace arrow { +namespace py { + +inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { + struct tm date = {0}; + date.tm_year = PyDateTime_GET_YEAR(pydate) - 1900; + date.tm_mon = PyDateTime_GET_MONTH(pydate) - 1; + date.tm_mday = PyDateTime_GET_DAY(pydate); + struct tm epoch = {0}; + epoch.tm_year = 70; + epoch.tm_mday = 1; + // Milliseconds since the epoch + return lrint(difftime(mktime(&date), mktime(&epoch)) * 1000); +} + +} // namespace py +} // namespace arrow + +#endif // PYARROW_UTIL_DATETIME_H http://git-wip-us.apache.org/repos/asf/arrow/blob/3aac4ade/cpp/src/arrow/python/util/test_main.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/util/test_main.cc b/cpp/src/arrow/python/util/test_main.cc new file mode 100644 index 0000000..c83514d --- /dev/null +++ b/cpp/src/arrow/python/util/test_main.cc @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <Python.h> + +#include <gtest/gtest.h> + +#include "arrow/python/do_import_numpy.h" +#include "arrow/python/numpy_interop.h" + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + Py_Initialize(); + arrow::py::import_numpy(); + + int ret = RUN_ALL_TESTS(); + + Py_Finalize(); + + return ret; +}