http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 40079b4..863cf54 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -19,7 +19,6 @@ #include <Python.h> -#include "pyarrow/adapters/builtin.h" #include "pyarrow/adapters/pandas.h" #include "pyarrow/numpy_interop.h" @@ -34,120 +33,39 @@ #include <thread> #include <unordered_map> -#include "arrow/api.h" +#include "arrow/array.h" +#include "arrow/column.h" #include "arrow/loader.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" #include "arrow/util/macros.h" +#include "pyarrow/adapters/builtin.h" #include "pyarrow/common.h" #include "pyarrow/config.h" +#include "pyarrow/type_traits.h" #include "pyarrow/util/datetime.h" -namespace pyarrow { - -using arrow::Array; -using arrow::ChunkedArray; -using arrow::Column; -using arrow::DictionaryType; -using arrow::Field; -using arrow::DataType; -using arrow::ListType; -using arrow::ListBuilder; -using arrow::Status; -using arrow::Table; -using arrow::Type; - -namespace BitUtil = arrow::BitUtil; +namespace arrow { +namespace py { // ---------------------------------------------------------------------- // Utility code -template <int TYPE> -struct npy_traits {}; - -template <> -struct npy_traits<NPY_BOOL> { - typedef uint8_t value_type; - using TypeClass = arrow::BooleanType; - using BuilderClass = arrow::BooleanBuilder; - - static constexpr bool supports_nulls = false; - static inline bool isnull(uint8_t v) { return false; } -}; - -#define NPY_INT_DECL(TYPE, CapType, T) \ - template <> \ - struct npy_traits<NPY_##TYPE> { \ - typedef T value_type; \ - using TypeClass = arrow::CapType##Type; \ - using BuilderClass = arrow::CapType##Builder; \ - \ - static constexpr bool supports_nulls = false; \ - static inline bool isnull(T v) { return false; } \ - }; - -NPY_INT_DECL(INT8, Int8, int8_t); -NPY_INT_DECL(INT16, Int16, int16_t); -NPY_INT_DECL(INT32, Int32, int32_t); -NPY_INT_DECL(INT64, Int64, int64_t); +int cast_npy_type_compat(int type_num) { +// Both LONGLONG and INT64 can be observed in the wild, which is buggy. We set +// U/LONGLONG to U/INT64 so things work properly. -NPY_INT_DECL(UINT8, UInt8, uint8_t); -NPY_INT_DECL(UINT16, UInt16, uint16_t); -NPY_INT_DECL(UINT32, UInt32, uint32_t); -NPY_INT_DECL(UINT64, UInt64, uint64_t); - -#if NPY_INT64 != NPY_LONGLONG -NPY_INT_DECL(LONGLONG, Int64, int64_t); -NPY_INT_DECL(ULONGLONG, UInt64, uint64_t); +#if (NPY_INT64 == NPY_LONGLONG) && (NPY_SIZEOF_LONGLONG == 8) + if (type_num == NPY_LONGLONG) { type_num = NPY_INT64; } + if (type_num == NPY_ULONGLONG) { type_num = NPY_UINT64; } #endif -template <> -struct npy_traits<NPY_FLOAT32> { - typedef float value_type; - using TypeClass = arrow::FloatType; - using BuilderClass = arrow::FloatBuilder; - - static constexpr bool supports_nulls = true; - - static inline bool isnull(float v) { return v != v; } -}; - -template <> -struct npy_traits<NPY_FLOAT64> { - typedef double value_type; - using TypeClass = arrow::DoubleType; - using BuilderClass = arrow::DoubleBuilder; - - static constexpr bool supports_nulls = true; - - static inline bool isnull(double v) { return v != v; } -}; - -template <> -struct npy_traits<NPY_DATETIME> { - typedef int64_t value_type; - using TypeClass = arrow::TimestampType; - using BuilderClass = arrow::TimestampBuilder; - - static constexpr bool supports_nulls = true; - - static inline bool isnull(int64_t v) { - // NaT = -2**63 - // = -0x8000000000000000 - // = -9223372036854775808; - // = std::numeric_limits<int64_t>::min() - return v == std::numeric_limits<int64_t>::min(); - } -}; - -template <> -struct npy_traits<NPY_OBJECT> { - typedef PyObject* value_type; - static constexpr bool supports_nulls = true; -}; + return type_num; +} static inline bool PyObject_is_null(const PyObject* obj) { return obj == Py_None || obj == numpy_nan; @@ -181,8 +99,24 @@ static int64_t ValuesToBitmap(const void* data, int64_t length, uint8_t* bitmap) return null_count; } +// Returns null count +static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) { + int64_t null_count = 0; + const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask)); + // TODO(wesm): strided null mask + for (int i = 0; i < length; ++i) { + if (mask_values[i]) { + ++null_count; + } else { + BitUtil::SetBit(bitmap, i); + } + } + return null_count; +} + template <int TYPE> -static int64_t ValuesToBytemap(const void* data, int64_t length, uint8_t* valid_bytes) { +static int64_t ValuesToValidBytes( + const void* data, int64_t length, uint8_t* valid_bytes) { typedef npy_traits<TYPE> traits; typedef typename traits::value_type T; @@ -214,7 +148,7 @@ Status CheckFlatNumpyArray(PyArrayObject* numpy_array, int np_type) { return Status::OK(); } -Status AppendObjectStrings(arrow::StringBuilder& string_builder, PyObject** objects, +Status AppendObjectStrings(StringBuilder& string_builder, PyObject** objects, int64_t objects_length, bool* have_bytes) { PyObject* obj; @@ -242,360 +176,561 @@ Status AppendObjectStrings(arrow::StringBuilder& string_builder, PyObject** obje return Status::OK(); } -template <int TYPE> -struct arrow_traits {}; +template <typename T> +struct WrapBytes {}; template <> -struct arrow_traits<Type::BOOL> { - static constexpr int npy_type = NPY_BOOL; - static constexpr bool supports_nulls = false; - static constexpr bool is_boolean = true; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; +struct WrapBytes<StringArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length); + } }; -#define INT_DECL(TYPE) \ - template <> \ - struct arrow_traits<Type::TYPE> { \ - static constexpr int npy_type = NPY_##TYPE; \ - static constexpr bool supports_nulls = false; \ - static constexpr double na_value = NAN; \ - static constexpr bool is_boolean = false; \ - static constexpr bool is_numeric_not_nullable = true; \ - static constexpr bool is_numeric_nullable = false; \ - typedef typename npy_traits<NPY_##TYPE>::value_type T; \ - }; - -INT_DECL(INT8); -INT_DECL(INT16); -INT_DECL(INT32); -INT_DECL(INT64); -INT_DECL(UINT8); -INT_DECL(UINT16); -INT_DECL(UINT32); -INT_DECL(UINT64); - template <> -struct arrow_traits<Type::FLOAT> { - static constexpr int npy_type = NPY_FLOAT32; - static constexpr bool supports_nulls = true; - static constexpr float na_value = NAN; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_FLOAT32>::value_type T; +struct WrapBytes<BinaryArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length); + } }; -template <> -struct arrow_traits<Type::DOUBLE> { - static constexpr int npy_type = NPY_FLOAT64; - static constexpr bool supports_nulls = true; - static constexpr double na_value = NAN; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_FLOAT64>::value_type T; -}; +static inline bool ListTypeSupported(const Type::type type_id) { + switch (type_id) { + case Type::UINT8: + case Type::INT8: + case Type::UINT16: + case Type::INT16: + case Type::UINT32: + case Type::INT32: + case Type::INT64: + case Type::UINT64: + case Type::FLOAT: + case Type::DOUBLE: + case Type::STRING: + case Type::TIMESTAMP: + // The above types are all supported. + return true; + default: + break; + } + return false; +} -static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min(); +// ---------------------------------------------------------------------- +// Conversion from NumPy-in-Pandas to Arrow -template <> -struct arrow_traits<Type::TIMESTAMP> { - static constexpr int npy_type = NPY_DATETIME; - static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = kPandasTimestampNull; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_DATETIME>::value_type T; -}; +class PandasConverter : public TypeVisitor { + public: + PandasConverter( + MemoryPool* pool, PyObject* ao, PyObject* mo, const std::shared_ptr<DataType>& type) + : pool_(pool), + type_(type), + arr_(reinterpret_cast<PyArrayObject*>(ao)), + mask_(nullptr) { + if (mo != nullptr and mo != Py_None) { mask_ = reinterpret_cast<PyArrayObject*>(mo); } + length_ = PyArray_SIZE(arr_); + } -template <> -struct arrow_traits<Type::DATE> { - static constexpr int npy_type = NPY_DATETIME; - static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = kPandasTimestampNull; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_DATETIME>::value_type T; -}; + bool is_strided() const { + npy_intp* astrides = PyArray_STRIDES(arr_); + return astrides[0] != PyArray_DESCR(arr_)->elsize; + } -template <> -struct arrow_traits<Type::STRING> { - static constexpr int npy_type = NPY_OBJECT; - static constexpr bool supports_nulls = true; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; -}; + Status InitNullBitmap() { + int null_bytes = BitUtil::BytesForBits(length_); -template <> -struct arrow_traits<Type::BINARY> { - static constexpr int npy_type = NPY_OBJECT; - static constexpr bool supports_nulls = true; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; -}; + null_bitmap_ = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(null_bitmap_->Resize(null_bytes)); -template <typename T> -struct WrapBytes {}; + null_bitmap_data_ = null_bitmap_->mutable_data(); + memset(null_bitmap_data_, 0, null_bytes); -template <> -struct WrapBytes<arrow::StringArray> { - static inline PyObject* Wrap(const uint8_t* data, int64_t length) { - return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length); + return Status::OK(); } -}; -template <> -struct WrapBytes<arrow::BinaryArray> { - static inline PyObject* Wrap(const uint8_t* data, int64_t length) { - return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length); - } -}; + // ---------------------------------------------------------------------- + // Traditional visitor conversion for non-object arrays -inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { - if (type == NPY_DATETIME) { - PyArray_Descr* descr = PyArray_DESCR(out); - auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); - if (datatype->type == Type::TIMESTAMP) { - auto timestamp_type = static_cast<arrow::TimestampType*>(datatype); + template <typename ArrowType> + Status ConvertData(std::shared_ptr<Buffer>* data); - switch (timestamp_type->unit) { - case arrow::TimestampType::Unit::SECOND: - date_dtype->meta.base = NPY_FR_s; - break; - case arrow::TimestampType::Unit::MILLI: - date_dtype->meta.base = NPY_FR_ms; - break; - case arrow::TimestampType::Unit::MICRO: - date_dtype->meta.base = NPY_FR_us; - break; - case arrow::TimestampType::Unit::NANO: - date_dtype->meta.base = NPY_FR_ns; - break; - } - } else { - // datatype->type == Type::DATE - date_dtype->meta.base = NPY_FR_D; + template <typename ArrowType> + Status VisitNative() { + using traits = arrow_traits<ArrowType::type_id>; + + if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } + + std::shared_ptr<Buffer> data; + RETURN_NOT_OK(ConvertData<ArrowType>(&data)); + + int64_t null_count = 0; + if (mask_ != nullptr) { + null_count = MaskToBitmap(mask_, length_, null_bitmap_data_); + } else if (traits::supports_nulls) { + // TODO(wesm): this presumes the NumPy C type and arrow C type are the + // same + null_count = ValuesToBitmap<traits::npy_type>( + PyArray_DATA(arr_), length_, null_bitmap_data_); } + + std::vector<FieldMetadata> fields(1); + fields[0].length = length_; + fields[0].null_count = null_count; + fields[0].offset = 0; + + return LoadArray(type_, fields, {null_bitmap_, data}, &out_); } -} -template <typename T> -inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - // Upcast to double, set NaN as appropriate +#define VISIT_NATIVE(TYPE) \ + Status Visit(const TYPE& type) override { return VisitNative<TYPE>(); } - for (int i = 0; i < arr->length(); ++i) { - *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; + VISIT_NATIVE(BooleanType); + VISIT_NATIVE(Int8Type); + VISIT_NATIVE(Int16Type); + VISIT_NATIVE(Int32Type); + VISIT_NATIVE(Int64Type); + VISIT_NATIVE(UInt8Type); + VISIT_NATIVE(UInt16Type); + VISIT_NATIVE(UInt32Type); + VISIT_NATIVE(UInt64Type); + VISIT_NATIVE(FloatType); + VISIT_NATIVE(DoubleType); + VISIT_NATIVE(TimestampType); + +#undef VISIT_NATIVE + + Status Convert(std::shared_ptr<Array>* out) { + if (PyArray_NDIM(arr_) != 1) { + return Status::Invalid("only handle 1-dimensional arrays"); } + // TODO(wesm): strided arrays + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } + + if (type_ == nullptr) { return Status::Invalid("Must pass data type"); } + + // Visit the type to perform conversion + RETURN_NOT_OK(type_->Accept(this)); + + *out = out_; + return Status::OK(); } -} -template <typename T> -inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); + // ---------------------------------------------------------------------- + // Conversion logic for various object dtype arrays + + template <int ITEM_TYPE, typename ArrowType> + Status ConvertTypedLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + + Status ConvertObjectStrings(std::shared_ptr<Array>* out); + Status ConvertBooleans(std::shared_ptr<Array>* out); + Status ConvertDates(std::shared_ptr<Array>* out); + Status ConvertLists(const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + Status ConvertObjects(std::shared_ptr<Array>* out); + + protected: + MemoryPool* pool_; + std::shared_ptr<DataType> type_; + PyArrayObject* arr_; + PyArrayObject* mask_; + int64_t length_; + + // Used in visitor pattern + std::shared_ptr<Array> out_; + + std::shared_ptr<ResizableBuffer> null_bitmap_; + uint8_t* null_bitmap_data_; +}; + +template <typename ArrowType> +inline Status PandasConverter::ConvertData(std::shared_ptr<Buffer>* data) { + using traits = arrow_traits<ArrowType::type_id>; + + // Handle LONGLONG->INT64 and other fun things + int type_num_compat = cast_npy_type_compat(PyArray_DESCR(arr_)->type_num); + + if (traits::npy_type != type_num_compat) { + return Status::NotImplemented("NumPy type casts not yet implemented"); } + + *data = std::make_shared<NumPyBuffer>(arr_); + return Status::OK(); } -template <typename InType, typename OutType> -inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values = in_values[i]; - } +template <> +inline Status PandasConverter::ConvertData<BooleanType>(std::shared_ptr<Buffer>* data) { + int nbytes = BitUtil::BytesForBits(length_); + auto buffer = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); + + const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); + + uint8_t* bitmap = buffer->mutable_data(); + + memset(bitmap, 0, nbytes); + for (int i = 0; i < length_; ++i) { + if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } } + + *data = buffer; + return Status::OK(); } -static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { +Status PandasConverter::ConvertDates(std::shared_ptr<Array>* out) { PyAcquireGIL lock; - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values++ = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - *out_values++ = Py_True; - } else { - // False - Py_INCREF(Py_False); - *out_values++ = Py_False; - } + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + DateBuilder date_builder(pool_); + RETURN_NOT_OK(date_builder.Resize(length_)); + + Status s; + PyObject* obj; + for (int64_t i = 0; i < length_; ++i) { + obj = objects[i]; + if (PyDate_CheckExact(obj)) { + PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj); + date_builder.Append(PyDate_to_ms(pydate)); + } else { + date_builder.AppendNull(); } } - return Status::OK(); + return date_builder.Finish(out); } -static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = static_cast<uint8_t>(bool_arr->Value(i)); - } +Status PandasConverter::ConvertObjectStrings(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + + // The output type at this point is inconclusive because there may be bytes + // and unicode mixed in the object array + + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + StringBuilder string_builder(pool_); + RETURN_NOT_OK(string_builder.Resize(length_)); + + Status s; + bool have_bytes = false; + RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes)); + RETURN_NOT_OK(string_builder.Finish(out)); + + if (have_bytes) { + const auto& arr = static_cast<const StringArray&>(*out->get()); + *out = std::make_shared<BinaryArray>(arr.length(), arr.value_offsets(), arr.data(), + arr.null_bitmap(), arr.null_count()); } + return Status::OK(); } -template <typename ArrayType> -inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { +Status PandasConverter::ConvertBooleans(std::shared_ptr<Array>* out) { PyAcquireGIL lock; - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = static_cast<ArrayType*>(data.chunk(c).get()); - const uint8_t* data_ptr; - int32_t length; - const bool has_nulls = data.null_count() > 0; - for (int64_t i = 0; i < arr->length(); ++i) { - if (has_nulls && arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; - } else { - data_ptr = arr->GetValue(i, &length); - *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length); - if (*out_values == nullptr) { - PyErr_Clear(); - std::stringstream ss; - ss << "Wrapping " - << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed"; - return Status::UnknownError(ss.str()); - } - } - ++out_values; + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + + int nbytes = BitUtil::BytesForBits(length_); + auto data = std::make_shared<PoolBuffer>(pool_); + RETURN_NOT_OK(data->Resize(nbytes)); + uint8_t* bitmap = data->mutable_data(); + memset(bitmap, 0, nbytes); + + int64_t null_count = 0; + for (int64_t i = 0; i < length_; ++i) { + if (objects[i] == Py_True) { + BitUtil::SetBit(bitmap, i); + BitUtil::SetBit(null_bitmap_data_, i); + } else if (objects[i] != Py_False) { + ++null_count; + } else { + BitUtil::SetBit(null_bitmap_data_, i); } } + + *out = std::make_shared<BooleanArray>(length_, data, null_bitmap_, null_count); + return Status::OK(); } -template <typename ArrowType> -inline Status ConvertListsLike( - const std::shared_ptr<Column>& col, PyObject** out_values) { - const ChunkedArray& data = *col->data().get(); - auto list_type = std::static_pointer_cast<ListType>(col->type()); +Status PandasConverter::ConvertObjects(std::shared_ptr<Array>* out) { + // Python object arrays are annoying, since we could have one of: + // + // * Strings + // * Booleans with nulls + // * Mixed type (not supported at the moment by arrow format) + // + // Additionally, nulls may be encoded either as np.nan or None. So we have to + // do some type inference and conversion - // Get column of underlying value arrays - std::vector<std::shared_ptr<Array>> value_arrays; - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); - value_arrays.emplace_back(arr->values()); - } - auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays); - // TODO(ARROW-489): Currently we don't have a Python reference for single columns. - // Storing a reference to the whole Array would be to expensive. - PyObject* numpy_array; - RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array)); + RETURN_NOT_OK(InitNullBitmap()); - PyAcquireGIL lock; + // TODO: mask not supported here + if (mask_ != nullptr) { + return Status::NotImplemented("mask not supported in object conversions yet"); + } - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); + const PyObject** objects; + { + PyAcquireGIL lock; + objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_)); + PyDateTime_IMPORT; + } - const uint8_t* data_ptr; - const bool has_nulls = data.null_count() > 0; - for (int64_t i = 0; i < arr->length(); ++i) { - if (has_nulls && arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; + if (type_) { + switch (type_->type) { + case Type::STRING: + return ConvertObjectStrings(out); + case Type::BOOL: + return ConvertBooleans(out); + case Type::DATE: + return ConvertDates(out); + case Type::LIST: { + const auto& list_field = static_cast<const ListType&>(*type_); + return ConvertLists(list_field.value_field()->type, out); + } + default: + return Status::TypeError("No known conversion to Arrow type"); + } + } else { + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + continue; + } else if (PyObject_is_string(objects[i])) { + return ConvertObjectStrings(out); + } else if (PyBool_Check(objects[i])) { + return ConvertBooleans(out); + } else if (PyDate_CheckExact(objects[i])) { + return ConvertDates(out); } else { - PyObject* start = PyLong_FromLong(arr->value_offset(i)); - PyObject* end = PyLong_FromLong(arr->value_offset(i + 1)); - PyObject* slice = PySlice_New(start, end, NULL); - *out_values = PyObject_GetItem(numpy_array, slice); - Py_DECREF(start); - Py_DECREF(end); - Py_DECREF(slice); + return Status::TypeError("unhandled python type"); } - ++out_values; } } - Py_XDECREF(numpy_array); - return Status::OK(); + return Status::TypeError("Unable to infer type of object array, were all null"); } -template <typename T> -inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); +template <int ITEM_TYPE, typename ArrowType> +inline Status PandasConverter::ConvertTypedLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + typedef npy_traits<ITEM_TYPE> traits; + typedef typename traits::value_type T; + typedef typename traits::BuilderClass BuilderT; - const uint8_t* valid_bits = arr->null_bitmap_data(); + PyAcquireGIL lock; - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; + auto value_builder = std::make_shared<BuilderT>(pool_, type); + ListBuilder list_builder(pool_, value_builder); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + RETURN_NOT_OK(list_builder.AppendNull()); + } else if (PyArray_Check(objects[i])) { + auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); + RETURN_NOT_OK(list_builder.Append(true)); + + // TODO(uwe): Support more complex numpy array structures + RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE)); + + int64_t size = PyArray_DIM(numpy_array, 0); + auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_array)); + if (traits::supports_nulls) { + null_bitmap_->Resize(size, false); + // TODO(uwe): A bitmap would be more space-efficient but the Builder API doesn't + // currently support this. + // ValuesToBitmap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); + ValuesToValidBytes<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); + RETURN_NOT_OK(value_builder->Append(data, size, null_bitmap_->data())); + } else { + RETURN_NOT_OK(value_builder->Append(data, size)); + } + + } else if (PyList_Check(objects[i])) { + int64_t size; + std::shared_ptr<DataType> inferred_type; + RETURN_NOT_OK(list_builder.Append(true)); + RETURN_NOT_OK(InferArrowType(objects[i], &size, &inferred_type)); + if (inferred_type->type != type->type) { + std::stringstream ss; + ss << inferred_type->ToString() << " cannot be converted to " << type->ToString(); + return Status::TypeError(ss.str()); } + RETURN_NOT_OK(AppendPySequence(objects[i], type, value_builder)); } else { - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); + return Status::TypeError("Unsupported Python type for list items"); } } + return list_builder.Finish(out); } -template <typename InType, typename OutType> -inline void ConvertNumericNullableCast( - const ChunkedArray& data, OutType na_value, OutType* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); +template <> +inline Status PandasConverter::ConvertTypedLists<NPY_OBJECT, StringType>( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PyAcquireGIL lock; + // TODO: If there are bytes involed, convert to Binary representation + bool have_bytes = false; - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]); + auto value_builder = std::make_shared<StringBuilder>(pool_); + ListBuilder list_builder(pool_, value_builder); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + RETURN_NOT_OK(list_builder.AppendNull()); + } else if (PyArray_Check(objects[i])) { + auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); + RETURN_NOT_OK(list_builder.Append(true)); + + // TODO(uwe): Support more complex numpy array structures + RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, NPY_OBJECT)); + + int64_t size = PyArray_DIM(numpy_array, 0); + auto data = reinterpret_cast<PyObject**>(PyArray_DATA(numpy_array)); + RETURN_NOT_OK(AppendObjectStrings(*value_builder.get(), data, size, &have_bytes)); + } else if (PyList_Check(objects[i])) { + int64_t size; + std::shared_ptr<DataType> inferred_type; + RETURN_NOT_OK(list_builder.Append(true)); + RETURN_NOT_OK(InferArrowType(objects[i], &size, &inferred_type)); + if (inferred_type->type != Type::STRING) { + std::stringstream ss; + ss << inferred_type->ToString() << " cannot be converted to STRING."; + return Status::TypeError(ss.str()); + } + RETURN_NOT_OK(AppendPySequence(objects[i], inferred_type, value_builder)); + } else { + return Status::TypeError("Unsupported Python type for list items"); } } + return list_builder.Finish(out); } -template <typename T> -inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - - for (int64_t i = 0; i < arr->length(); ++i) { - // There are 1000 * 60 * 60 * 24 = 86400000ms in a day - *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; - } +#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType) \ + case Type::TYPE: { \ + return ConvertTypedLists<NUMPY_TYPE, ArrowType>(type, out); \ + } + +Status PandasConverter::ConvertLists( + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + switch (type->type) { + LIST_CASE(UINT8, NPY_UINT8, UInt8Type) + LIST_CASE(INT8, NPY_INT8, Int8Type) + LIST_CASE(UINT16, NPY_UINT16, UInt16Type) + LIST_CASE(INT16, NPY_INT16, Int16Type) + LIST_CASE(UINT32, NPY_UINT32, UInt32Type) + LIST_CASE(INT32, NPY_INT32, Int32Type) + LIST_CASE(UINT64, NPY_UINT64, UInt64Type) + LIST_CASE(INT64, NPY_INT64, Int64Type) + LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType) + LIST_CASE(FLOAT, NPY_FLOAT, FloatType) + LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType) + LIST_CASE(STRING, NPY_OBJECT, StringType) + default: + return Status::TypeError("Unknown list item type"); } + + return Status::TypeError("Unknown list type"); } -template <typename InType, int SHIFT> -inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); +Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PandasConverter converter(pool, ao, mo, type); + return converter.Convert(out); +} - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? kPandasTimestampNull - : (static_cast<int64_t>(in_values[i]) * SHIFT); +Status PandasObjectsToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out) { + PandasConverter converter(pool, ao, mo, type); + return converter.ConvertObjects(out); +} + +Status PandasDtypeToArrow(PyObject* dtype, std::shared_ptr<DataType>* out) { + PyArray_Descr* descr = reinterpret_cast<PyArray_Descr*>(dtype); + + int type_num = cast_npy_type_compat(descr->type_num); + +#define TO_ARROW_TYPE_CASE(NPY_NAME, FACTORY) \ + case NPY_##NPY_NAME: \ + *out = FACTORY(); \ + break; + + switch (type_num) { + TO_ARROW_TYPE_CASE(BOOL, boolean); + TO_ARROW_TYPE_CASE(INT8, int8); + TO_ARROW_TYPE_CASE(INT16, int16); + TO_ARROW_TYPE_CASE(INT32, int32); + TO_ARROW_TYPE_CASE(INT64, int64); +#if (NPY_INT64 != NPY_LONGLONG) + TO_ARROW_TYPE_CASE(LONGLONG, int64); +#endif + TO_ARROW_TYPE_CASE(UINT8, uint8); + TO_ARROW_TYPE_CASE(UINT16, uint16); + TO_ARROW_TYPE_CASE(UINT32, uint32); + TO_ARROW_TYPE_CASE(UINT64, uint64); +#if (NPY_UINT64 != NPY_ULONGLONG) + TO_ARROW_CASE(ULONGLONG); +#endif + TO_ARROW_TYPE_CASE(FLOAT32, float32); + TO_ARROW_TYPE_CASE(FLOAT64, float64); + case NPY_DATETIME: { + auto date_dtype = + reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + TimeUnit unit; + switch (date_dtype->meta.base) { + case NPY_FR_s: + unit = TimeUnit::SECOND; + break; + case NPY_FR_ms: + unit = TimeUnit::MILLI; + break; + case NPY_FR_us: + unit = TimeUnit::MICRO; + break; + case NPY_FR_ns: + unit = TimeUnit::NANO; + break; + default: + return Status::NotImplemented("Unsupported datetime64 time unit"); + } + *out = timestamp(unit); + } break; + default: { + std::stringstream ss; + ss << "Unsupported numpy type " << descr->type_num << std::endl; + return Status::NotImplemented(ss.str()); } } + +#undef TO_ARROW_TYPE_CASE + + return Status::OK(); } // ---------------------------------------------------------------------- // pandas 0.x DataFrame conversion internals +inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { + if (type == NPY_DATETIME) { + PyArray_Descr* descr = PyArray_DESCR(out); + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + if (datatype->type == Type::TIMESTAMP) { + auto timestamp_type = static_cast<TimestampType*>(datatype); + + switch (timestamp_type->unit) { + case TimestampType::Unit::SECOND: + date_dtype->meta.base = NPY_FR_s; + break; + case TimestampType::Unit::MILLI: + date_dtype->meta.base = NPY_FR_ms; + break; + case TimestampType::Unit::MICRO: + date_dtype->meta.base = NPY_FR_us; + break; + case TimestampType::Unit::NANO: + date_dtype->meta.base = NPY_FR_ns; + break; + } + } else { + // datatype->type == Type::DATE + date_dtype->meta.base = NPY_FR_D; + } + } +} + class PandasBlock { public: enum type { @@ -688,10 +823,219 @@ class PandasBlock { DISALLOW_COPY_AND_ASSIGN(PandasBlock); }; -#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \ - case Type::ArrowEnum: \ - RETURN_NOT_OK((ConvertListsLike<::arrow::ArrowType>(col, out_buffer))); \ - break; +template <typename T> +inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + } +} + +template <typename T> +inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } +} + +template <typename InType, typename OutType> +inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values = in_values[i]; + } + } +} + +static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<BooleanArray*>(arr.get()); + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values++ = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + *out_values++ = Py_True; + } else { + // False + Py_INCREF(Py_False); + *out_values++ = Py_False; + } + } + } + return Status::OK(); +} + +static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<BooleanArray*>(arr.get()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = static_cast<uint8_t>(bool_arr->Value(i)); + } + } +} + +template <typename ArrayType> +inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = static_cast<ArrayType*>(data.chunk(c).get()); + + const uint8_t* data_ptr; + int32_t length; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = arr->GetValue(i, &length); + *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length); + if (*out_values == nullptr) { + PyErr_Clear(); + std::stringstream ss; + ss << "Wrapping " + << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed"; + return Status::UnknownError(ss.str()); + } + } + ++out_values; + } + } + return Status::OK(); +} + +template <typename ArrowType> +inline Status ConvertListsLike( + const std::shared_ptr<Column>& col, PyObject** out_values) { + const ChunkedArray& data = *col->data().get(); + auto list_type = std::static_pointer_cast<ListType>(col->type()); + + // Get column of underlying value arrays + std::vector<std::shared_ptr<Array>> value_arrays; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<ListArray>(data.chunk(c)); + value_arrays.emplace_back(arr->values()); + } + auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays); + // TODO(ARROW-489): Currently we don't have a Python reference for single columns. + // Storing a reference to the whole Array would be to expensive. + PyObject* numpy_array; + RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array)); + + PyAcquireGIL lock; + + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<ListArray>(data.chunk(c)); + + const uint8_t* data_ptr; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + PyObject* start = PyLong_FromLong(arr->value_offset(i)); + PyObject* end = PyLong_FromLong(arr->value_offset(i + 1)); + PyObject* slice = PySlice_New(start, end, NULL); + *out_values = PyObject_GetItem(numpy_array, slice); + Py_DECREF(start); + Py_DECREF(end); + Py_DECREF(slice); + } + ++out_values; + } + } + + Py_XDECREF(numpy_array); + return Status::OK(); +} + +template <typename T> +inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + const uint8_t* valid_bits = arr->null_bitmap_data(); + + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; + } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } + } +} + +template <typename InType, typename OutType> +inline void ConvertNumericNullableCast( + const ChunkedArray& data, OutType na_value, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]); + } + } +} + +template <typename T> +inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + // There are 1000 * 60 * 60 * 24 = 86400000ms in a day + *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; + } + } +} + +template <typename InType, int SHIFT> +inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? kPandasTimestampNull + : (static_cast<int64_t>(in_values[i]) * SHIFT); + } + } +} + +#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \ + case Type::ArrowEnum: \ + RETURN_NOT_OK((ConvertListsLike<ArrowType>(col, out_buffer))); \ + break; class ObjectBlock : public PandasBlock { public: @@ -712,9 +1056,9 @@ class ObjectBlock : public PandasBlock { if (type == Type::BOOL) { RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer)); } else if (type == Type::BINARY) { - RETURN_NOT_OK(ConvertBinaryLike<arrow::BinaryArray>(data, out_buffer)); + RETURN_NOT_OK(ConvertBinaryLike<BinaryArray>(data, out_buffer)); } else if (type == Type::STRING) { - RETURN_NOT_OK(ConvertBinaryLike<arrow::StringArray>(data, out_buffer)); + RETURN_NOT_OK(ConvertBinaryLike<StringArray>(data, out_buffer)); } else if (type == Type::LIST) { auto list_type = std::static_pointer_cast<ListType>(col->type()); switch (list_type->value_type()->type) { @@ -880,8 +1224,8 @@ class DatetimeBlock : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME)); + Status AllocateDatetime(int ndim) { + RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME, ndim)); PyAcquireGIL lock; auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>( @@ -890,6 +1234,8 @@ class DatetimeBlock : public PandasBlock { return Status::OK(); } + Status Allocate() override { return AllocateDatetime(2); } + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, int64_t rel_placement) override { Type::type type = col->type()->type; @@ -904,15 +1250,15 @@ class DatetimeBlock : public PandasBlock { // TODO(wesm): Do we want to make sure to zero out the milliseconds? ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); } else if (type == Type::TIMESTAMP) { - auto ts_type = static_cast<arrow::TimestampType*>(col->type().get()); + auto ts_type = static_cast<TimestampType*>(col->type().get()); - if (ts_type->unit == arrow::TimeUnit::NANO) { + if (ts_type->unit == TimeUnit::NANO) { ConvertNumericNullable<int64_t>(data, kPandasTimestampNull, out_buffer); - } else if (ts_type->unit == arrow::TimeUnit::MICRO) { + } else if (ts_type->unit == TimeUnit::MICRO) { ConvertDatetimeNanos<int64_t, 1000L>(data, out_buffer); - } else if (ts_type->unit == arrow::TimeUnit::MILLI) { + } else if (ts_type->unit == TimeUnit::MILLI) { ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); - } else if (ts_type->unit == arrow::TimeUnit::SECOND) { + } else if (ts_type->unit == TimeUnit::SECOND) { ConvertDatetimeNanos<int64_t, 1000000000L>(data, out_buffer); } else { return Status::NotImplemented("Unsupported time unit"); @@ -931,6 +1277,9 @@ class DatetimeTZBlock : public DatetimeBlock { DatetimeTZBlock(const std::string& timezone, int64_t num_rows) : DatetimeBlock(num_rows, 1), timezone_(timezone) {} + // Like Categorical, the internal ndarray is 1-dimensional + Status Allocate() override { return AllocateDatetime(1); } + Status GetPyResult(PyObject** output) override { PyObject* result = PyDict_New(); RETURN_IF_PYERROR(); @@ -977,9 +1326,8 @@ class CategoricalBlock : public PandasBlock { for (int c = 0; c < data.num_chunks(); c++) { const std::shared_ptr<Array> arr = data.chunk(c); - const auto& dict_arr = static_cast<const arrow::DictionaryArray&>(*arr); - const auto& indices = - static_cast<const arrow::PrimitiveArray&>(*dict_arr.indices()); + const auto& dict_arr = static_cast<const DictionaryArray&>(*arr); + const auto& indices = static_cast<const PrimitiveArray&>(*dict_arr.indices()); auto in_values = reinterpret_cast<const T*>(indices.data()->data()); // Null is -1 in CategoricalBlock @@ -1046,28 +1394,6 @@ Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, return (*block)->Allocate(); } -static inline bool ListTypeSupported(const Type::type type_id) { - switch (type_id) { - case Type::UINT8: - case Type::INT8: - case Type::UINT16: - case Type::INT16: - case Type::UINT32: - case Type::INT32: - case Type::INT64: - case Type::UINT64: - case Type::FLOAT: - case Type::DOUBLE: - case Type::STRING: - case Type::TIMESTAMP: - // The above types are all supported. - return true; - default: - break; - } - return false; -} - static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& type, int64_t num_rows, std::shared_ptr<PandasBlock>* block) { // All categoricals become a block with a single column @@ -1168,7 +1494,7 @@ class DataFrameBlockCreator { output_type = PandasBlock::DATETIME; break; case Type::TIMESTAMP: { - const auto& ts_type = static_cast<const arrow::TimestampType&>(*col->type()); + const auto& ts_type = static_cast<const TimestampType&>(*col->type()); if (ts_type.timezone != "") { output_type = PandasBlock::DATETIME_WITH_TZ; } else { @@ -1182,636 +1508,165 @@ class DataFrameBlockCreator { ss << "Not implemented type for lists: " << list_type->value_type()->ToString(); return Status::NotImplemented(ss.str()); - } - output_type = PandasBlock::OBJECT; - } break; - case Type::DICTIONARY: - output_type = PandasBlock::CATEGORICAL; - break; - default: - return Status::NotImplemented(col->type()->ToString()); - } - - int block_placement = 0; - std::shared_ptr<PandasBlock> block; - if (output_type == PandasBlock::CATEGORICAL) { - RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); - categorical_blocks_[i] = block; - } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { - const auto& ts_type = static_cast<const arrow::TimestampType&>(*col->type()); - block = std::make_shared<DatetimeTZBlock>(ts_type.timezone, table_->num_rows()); - RETURN_NOT_OK(block->Allocate()); - datetimetz_blocks_[i] = block; - } else { - auto it = type_counts_.find(output_type); - if (it != type_counts_.end()) { - block_placement = it->second; - // Increment count - it->second += 1; - } else { - // Add key to map - type_counts_[output_type] = 1; - } - } - - column_types_[i] = output_type; - column_block_placement_[i] = block_placement; - } - - // Create normal non-categorical blocks - for (const auto& it : type_counts_) { - PandasBlock::type type = static_cast<PandasBlock::type>(it.first); - std::shared_ptr<PandasBlock> block; - RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block)); - blocks_[type] = block; - } - return Status::OK(); - } - - Status WriteTableToBlocks(int nthreads) { - auto WriteColumn = [this](int i) { - std::shared_ptr<Column> col = this->table_->column(i); - PandasBlock::type output_type = this->column_types_[i]; - - int rel_placement = this->column_block_placement_[i]; - - std::shared_ptr<PandasBlock> block; - if (output_type == PandasBlock::CATEGORICAL) { - auto it = this->categorical_blocks_.find(i); - if (it == this->blocks_.end()) { - return Status::KeyError("No categorical block allocated"); - } - block = it->second; - } else { - auto it = this->blocks_.find(output_type); - if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); } - block = it->second; - } - return block->Write(col, i, rel_placement); - }; - - nthreads = std::min<int>(nthreads, table_->num_columns()); - - if (nthreads == 1) { - for (int i = 0; i < table_->num_columns(); ++i) { - RETURN_NOT_OK(WriteColumn(i)); - } - } else { - std::vector<std::thread> thread_pool; - thread_pool.reserve(nthreads); - std::atomic<int> task_counter(0); - - std::mutex error_mtx; - bool error_occurred = false; - Status error; - - for (int thread_id = 0; thread_id < nthreads; ++thread_id) { - thread_pool.emplace_back( - [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() { - int column_num; - while (!error_occurred) { - column_num = task_counter.fetch_add(1); - if (column_num >= this->table_->num_columns()) { break; } - Status s = WriteColumn(column_num); - if (!s.ok()) { - std::lock_guard<std::mutex> lock(error_mtx); - error_occurred = true; - error = s; - break; - } - } - }); - } - for (auto&& thread : thread_pool) { - thread.join(); - } - - if (error_occurred) { return error; } - } - return Status::OK(); - } - - Status AppendBlocks(const BlockMap& blocks, PyObject* list) { - for (const auto& it : blocks) { - PyObject* item; - RETURN_NOT_OK(it.second->GetPyResult(&item)); - if (PyList_Append(list, item) < 0) { RETURN_IF_PYERROR(); } - } - return Status::OK(); - } - - Status GetResultList(PyObject** out) { - PyAcquireGIL lock; - - PyObject* result = PyList_New(0); - RETURN_IF_PYERROR(); - - RETURN_NOT_OK(AppendBlocks(blocks_, result)); - RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result)); - RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result)); - - *out = result; - return Status::OK(); - } - - private: - std::shared_ptr<Table> table_; - - // column num -> block type id - std::vector<PandasBlock::type> column_types_; - - // column num -> relative placement within internal block - std::vector<int> column_block_placement_; - - // block type -> type count - std::unordered_map<int, int> type_counts_; - - // block type -> block - BlockMap blocks_; - - // column number -> categorical block - BlockMap categorical_blocks_; - - // column number -> datetimetz block - BlockMap datetimetz_blocks_; -}; - -Status ConvertTableToPandas( - const std::shared_ptr<Table>& table, int nthreads, PyObject** out) { - DataFrameBlockCreator helper(table); - return helper.Convert(nthreads, out); -} - -// ---------------------------------------------------------------------- -// Serialization - -template <int TYPE> -class ArrowSerializer { - public: - ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask) - : pool_(pool), arr_(arr), mask_(mask) { - length_ = PyArray_SIZE(arr_); - } - - void IndicateType(const std::shared_ptr<Field> field) { field_indicator_ = field; } - - Status Convert(std::shared_ptr<Array>* out); - - int stride() const { return PyArray_STRIDES(arr_)[0]; } - - Status InitNullBitmap() { - int null_bytes = BitUtil::BytesForBits(length_); - - null_bitmap_ = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(null_bitmap_->Resize(null_bytes)); - - null_bitmap_data_ = null_bitmap_->mutable_data(); - memset(null_bitmap_data_, 0, null_bytes); - - return Status::OK(); - } - - bool is_strided() const { - npy_intp* astrides = PyArray_STRIDES(arr_); - return astrides[0] != PyArray_DESCR(arr_)->elsize; - } - - private: - Status ConvertData(); - - Status ConvertDates(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; - - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - arrow::DateBuilder date_builder(pool_); - RETURN_NOT_OK(date_builder.Resize(length_)); - - Status s; - PyObject* obj; - for (int64_t i = 0; i < length_; ++i) { - obj = objects[i]; - if (PyDate_CheckExact(obj)) { - PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj); - date_builder.Append(PyDate_to_ms(pydate)); - } else { - date_builder.AppendNull(); - } - } - return date_builder.Finish(out); - } - - Status ConvertObjectStrings(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; - - // The output type at this point is inconclusive because there may be bytes - // and unicode mixed in the object array - - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - arrow::StringBuilder string_builder(pool_); - RETURN_NOT_OK(string_builder.Resize(length_)); - - Status s; - bool have_bytes = false; - RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes)); - RETURN_NOT_OK(string_builder.Finish(out)); - - if (have_bytes) { - const auto& arr = static_cast<const arrow::StringArray&>(*out->get()); - *out = std::make_shared<arrow::BinaryArray>(arr.length(), arr.value_offsets(), - arr.data(), arr.null_bitmap(), arr.null_count()); - } - return Status::OK(); - } - - Status ConvertBooleans(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; - - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - - int nbytes = BitUtil::BytesForBits(length_); - auto data = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(data->Resize(nbytes)); - uint8_t* bitmap = data->mutable_data(); - memset(bitmap, 0, nbytes); - - int64_t null_count = 0; - for (int64_t i = 0; i < length_; ++i) { - if (objects[i] == Py_True) { - BitUtil::SetBit(bitmap, i); - BitUtil::SetBit(null_bitmap_data_, i); - } else if (objects[i] != Py_False) { - ++null_count; - } else { - BitUtil::SetBit(null_bitmap_data_, i); - } - } - - *out = std::make_shared<arrow::BooleanArray>(length_, data, null_bitmap_, null_count); - - return Status::OK(); - } - - template <int ITEM_TYPE, typename ArrowType> - Status ConvertTypedLists( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out); - -#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType) \ - case Type::TYPE: { \ - return ConvertTypedLists<NUMPY_TYPE, ::arrow::ArrowType>(field, out); \ - } - - Status ConvertLists(const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - switch (field->type->type) { - LIST_CASE(UINT8, NPY_UINT8, UInt8Type) - LIST_CASE(INT8, NPY_INT8, Int8Type) - LIST_CASE(UINT16, NPY_UINT16, UInt16Type) - LIST_CASE(INT16, NPY_INT16, Int16Type) - LIST_CASE(UINT32, NPY_UINT32, UInt32Type) - LIST_CASE(INT32, NPY_INT32, Int32Type) - LIST_CASE(UINT64, NPY_UINT64, UInt64Type) - LIST_CASE(INT64, NPY_INT64, Int64Type) - LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType) - LIST_CASE(FLOAT, NPY_FLOAT, FloatType) - LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType) - LIST_CASE(STRING, NPY_OBJECT, StringType) - default: - return Status::TypeError("Unknown list item type"); - } - - return Status::TypeError("Unknown list type"); - } - - Status MakeDataType(std::shared_ptr<DataType>* out); - - arrow::MemoryPool* pool_; - - PyArrayObject* arr_; - PyArrayObject* mask_; - - int64_t length_; - - std::shared_ptr<Field> field_indicator_; - std::shared_ptr<arrow::Buffer> data_; - std::shared_ptr<arrow::ResizableBuffer> null_bitmap_; - uint8_t* null_bitmap_data_; -}; - -// Returns null count -static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) { - int64_t null_count = 0; - const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask)); - // TODO(wesm): strided null mask - for (int i = 0; i < length; ++i) { - if (mask_values[i]) { - ++null_count; - } else { - BitUtil::SetBit(bitmap, i); - } - } - return null_count; -} - -template <int TYPE> -inline Status ArrowSerializer<TYPE>::MakeDataType(std::shared_ptr<DataType>* out) { - out->reset(new typename npy_traits<TYPE>::TypeClass()); - return Status::OK(); -} - -template <> -inline Status ArrowSerializer<NPY_DATETIME>::MakeDataType( - std::shared_ptr<DataType>* out) { - PyArray_Descr* descr = PyArray_DESCR(arr_); - auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); - arrow::TimestampType::Unit unit; - - switch (date_dtype->meta.base) { - case NPY_FR_s: - unit = arrow::TimestampType::Unit::SECOND; - break; - case NPY_FR_ms: - unit = arrow::TimestampType::Unit::MILLI; - break; - case NPY_FR_us: - unit = arrow::TimestampType::Unit::MICRO; - break; - case NPY_FR_ns: - unit = arrow::TimestampType::Unit::NANO; - break; - default: - return Status::Invalid("Unknown NumPy datetime unit"); - } - - out->reset(new arrow::TimestampType(unit)); - return Status::OK(); -} - -template <int TYPE> -inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) { - typedef npy_traits<TYPE> traits; - - if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } - - int64_t null_count = 0; - if (mask_ != nullptr) { - null_count = MaskToBitmap(mask_, length_, null_bitmap_data_); - } else if (traits::supports_nulls) { - null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_); - } - - RETURN_NOT_OK(ConvertData()); - std::shared_ptr<DataType> type; - RETURN_NOT_OK(MakeDataType(&type)); - - std::vector<arrow::FieldMetadata> fields(1); - fields[0].length = length_; - fields[0].null_count = null_count; - fields[0].offset = 0; - - return arrow::LoadArray(type, fields, {null_bitmap_, data_}, out); -} - -template <> -inline Status ArrowSerializer<NPY_OBJECT>::Convert(std::shared_ptr<Array>* out) { - // Python object arrays are annoying, since we could have one of: - // - // * Strings - // * Booleans with nulls - // * Mixed type (not supported at the moment by arrow format) - // - // Additionally, nulls may be encoded either as np.nan or None. So we have to - // do some type inference and conversion - - RETURN_NOT_OK(InitNullBitmap()); - - // TODO: mask not supported here - const PyObject** objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_)); - { - PyAcquireGIL lock; - PyDateTime_IMPORT; - } - - if (field_indicator_) { - switch (field_indicator_->type->type) { - case Type::STRING: - return ConvertObjectStrings(out); - case Type::BOOL: - return ConvertBooleans(out); - case Type::DATE: - return ConvertDates(out); - case Type::LIST: { - auto list_field = static_cast<ListType*>(field_indicator_->type.get()); - return ConvertLists(list_field->value_field(), out); + } + output_type = PandasBlock::OBJECT; + } break; + case Type::DICTIONARY: + output_type = PandasBlock::CATEGORICAL; + break; + default: + return Status::NotImplemented(col->type()->ToString()); } - default: - return Status::TypeError("No known conversion to Arrow type"); - } - } else { - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - continue; - } else if (PyObject_is_string(objects[i])) { - return ConvertObjectStrings(out); - } else if (PyBool_Check(objects[i])) { - return ConvertBooleans(out); - } else if (PyDate_CheckExact(objects[i])) { - return ConvertDates(out); + + int block_placement = 0; + std::shared_ptr<PandasBlock> block; + if (output_type == PandasBlock::CATEGORICAL) { + RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); + categorical_blocks_[i] = block; + } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { + const auto& ts_type = static_cast<const TimestampType&>(*col->type()); + block = std::make_shared<DatetimeTZBlock>(ts_type.timezone, table_->num_rows()); + RETURN_NOT_OK(block->Allocate()); + datetimetz_blocks_[i] = block; } else { - return Status::TypeError("unhandled python type"); + auto it = type_counts_.find(output_type); + if (it != type_counts_.end()) { + block_placement = it->second; + // Increment count + it->second += 1; + } else { + // Add key to map + type_counts_[output_type] = 1; + } } + + column_types_[i] = output_type; + column_block_placement_[i] = block_placement; + } + + // Create normal non-categorical blocks + for (const auto& it : type_counts_) { + PandasBlock::type type = static_cast<PandasBlock::type>(it.first); + std::shared_ptr<PandasBlock> block; + RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block)); + blocks_[type] = block; } + return Status::OK(); } - return Status::TypeError("Unable to infer type of object array, were all null"); -} + Status WriteTableToBlocks(int nthreads) { + auto WriteColumn = [this](int i) { + std::shared_ptr<Column> col = this->table_->column(i); + PandasBlock::type output_type = this->column_types_[i]; -template <int TYPE> -inline Status ArrowSerializer<TYPE>::ConvertData() { - // TODO(wesm): strided arrays - if (is_strided()) { return Status::Invalid("no support for strided data yet"); } + int rel_placement = this->column_block_placement_[i]; - data_ = std::make_shared<NumPyBuffer>(arr_); - return Status::OK(); -} + std::shared_ptr<PandasBlock> block; + if (output_type == PandasBlock::CATEGORICAL) { + auto it = this->categorical_blocks_.find(i); + if (it == this->blocks_.end()) { + return Status::KeyError("No categorical block allocated"); + } + block = it->second; + } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { + auto it = this->datetimetz_blocks_.find(i); + if (it == this->datetimetz_blocks_.end()) { + return Status::KeyError("No datetimetz block allocated"); + } + block = it->second; + } else { + auto it = this->blocks_.find(output_type); + if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); } + block = it->second; + } + return block->Write(col, i, rel_placement); + }; -template <> -inline Status ArrowSerializer<NPY_BOOL>::ConvertData() { - if (is_strided()) { return Status::Invalid("no support for strided data yet"); } + nthreads = std::min<int>(nthreads, table_->num_columns()); - int nbytes = BitUtil::BytesForBits(length_); - auto buffer = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(nbytes)); + if (nthreads == 1) { + for (int i = 0; i < table_->num_columns(); ++i) { + RETURN_NOT_OK(WriteColumn(i)); + } + } else { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); - const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); + std::mutex error_mtx; + bool error_occurred = false; + Status error; - uint8_t* bitmap = buffer->mutable_data(); + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() { + int column_num; + while (!error_occurred) { + column_num = task_counter.fetch_add(1); + if (column_num >= this->table_->num_columns()) { break; } + Status s = WriteColumn(column_num); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } - memset(bitmap, 0, nbytes); - for (int i = 0; i < length_; ++i) { - if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } + if (error_occurred) { return error; } + } + return Status::OK(); } - data_ = buffer; - - return Status::OK(); -} - -template <int TYPE> -template <int ITEM_TYPE, typename ArrowType> -inline Status ArrowSerializer<TYPE>::ConvertTypedLists( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - typedef npy_traits<ITEM_TYPE> traits; - typedef typename traits::value_type T; - typedef typename traits::BuilderClass BuilderT; - PyAcquireGIL lock; - - auto value_builder = std::make_shared<BuilderT>(pool_, field->type); - ListBuilder list_builder(pool_, value_builder); - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - RETURN_NOT_OK(list_builder.AppendNull()); - } else if (PyArray_Check(objects[i])) { - auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); - RETURN_NOT_OK(list_builder.Append(true)); - - // TODO(uwe): Support more complex numpy array structures - RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE)); - - int64_t size = PyArray_DIM(numpy_array, 0); - auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_array)); - if (traits::supports_nulls) { - null_bitmap_->Resize(size, false); - // TODO(uwe): A bitmap would be more space-efficient but the Builder API doesn't - // currently support this. - // ValuesToBitmap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); - ValuesToBytemap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); - RETURN_NOT_OK(value_builder->Append(data, size, null_bitmap_->data())); - } else { - RETURN_NOT_OK(value_builder->Append(data, size)); - } - } else if (PyList_Check(objects[i])) { - int64_t size; - std::shared_ptr<arrow::DataType> type; - RETURN_NOT_OK(list_builder.Append(true)); - RETURN_NOT_OK(InferArrowType(objects[i], &size, &type)); - if (type->type != field->type->type) { - std::stringstream ss; - ss << type->ToString() << " cannot be converted to " << field->type->ToString(); - return Status::TypeError(ss.str()); - } - RETURN_NOT_OK(AppendPySequence(objects[i], field->type, value_builder)); - } else { - return Status::TypeError("Unsupported Python type for list items"); + Status AppendBlocks(const BlockMap& blocks, PyObject* list) { + for (const auto& it : blocks) { + PyObject* item; + RETURN_NOT_OK(it.second->GetPyResult(&item)); + if (PyList_Append(list, item) < 0) { RETURN_IF_PYERROR(); } } + return Status::OK(); } - return list_builder.Finish(out); -} -template <> -template <> -inline Status -ArrowSerializer<NPY_OBJECT>::ConvertTypedLists<NPY_OBJECT, ::arrow::StringType>( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - // TODO: If there are bytes involed, convert to Binary representation - PyAcquireGIL lock; - bool have_bytes = false; + Status GetResultList(PyObject** out) { + PyAcquireGIL lock; - auto value_builder = std::make_shared<arrow::StringBuilder>(pool_); - ListBuilder list_builder(pool_, value_builder); - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - RETURN_NOT_OK(list_builder.AppendNull()); - } else if (PyArray_Check(objects[i])) { - auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); - RETURN_NOT_OK(list_builder.Append(true)); + PyObject* result = PyList_New(0); + RETURN_IF_PYERROR(); - // TODO(uwe): Support more complex numpy array structures - RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, NPY_OBJECT)); + RETURN_NOT_OK(AppendBlocks(blocks_, result)); + RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result)); + RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result)); - int64_t size = PyArray_DIM(numpy_array, 0); - auto data = reinterpret_cast<PyObject**>(PyArray_DATA(numpy_array)); - RETURN_NOT_OK(AppendObjectStrings(*value_builder.get(), data, size, &have_bytes)); - } else if (PyList_Check(objects[i])) { - int64_t size; - std::shared_ptr<arrow::DataType> type; - RETURN_NOT_OK(list_builder.Append(true)); - RETURN_NOT_OK(InferArrowType(objects[i], &size, &type)); - if (type->type != Type::STRING) { - std::stringstream ss; - ss << type->ToString() << " cannot be converted to STRING."; - return Status::TypeError(ss.str()); - } - RETURN_NOT_OK(AppendPySequence(objects[i], type, value_builder)); - } else { - return Status::TypeError("Unsupported Python type for list items"); - } + *out = result; + return Status::OK(); } - return list_builder.Finish(out); -} -template <> -inline Status ArrowSerializer<NPY_OBJECT>::ConvertData() { - return Status::TypeError("NYI"); -} - -#define TO_ARROW_CASE(TYPE) \ - case NPY_##TYPE: { \ - ArrowSerializer<NPY_##TYPE> converter(pool, arr, mask); \ - RETURN_NOT_OK(converter.Convert(out)); \ - } break; + private: + std::shared_ptr<Table> table_; -Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - PyArrayObject* arr = reinterpret_cast<PyArrayObject*>(ao); - PyArrayObject* mask = nullptr; + // column num -> block type id + std::vector<PandasBlock::type> column_types_; - if (mo != nullptr and mo != Py_None) { mask = reinterpret_cast<PyArrayObject*>(mo); } + // column num -> relative placement within internal block + std::vector<int> column_block_placement_; - if (PyArray_NDIM(arr) != 1) { - return Status::Invalid("only handle 1-dimensional arrays"); - } + // block type -> type count + std::unordered_map<int, int> type_counts_; - int type_num = PyArray_DESCR(arr)->type_num; + // block type -> block + BlockMap blocks_; -#if (NPY_INT64 == NPY_LONGLONG) && (NPY_SIZEOF_LONGLONG == 8) - // Both LONGLONG and INT64 can be observed in the wild, which is buggy. We set - // U/LONGLONG to U/INT64 so things work properly. - if (type_num == NPY_LONGLONG) { type_num = NPY_INT64; } - if (type_num == NPY_ULONGLONG) { type_num = NPY_UINT64; } -#endif + // column number -> categorical block + BlockMap categorical_blocks_; - switch (type_num) { - TO_ARROW_CASE(BOOL); - TO_ARROW_CASE(INT8); - TO_ARROW_CASE(INT16); - TO_ARROW_CASE(INT32); - TO_ARROW_CASE(INT64); -#if (NPY_INT64 != NPY_LONGLONG) - TO_ARROW_CASE(LONGLONG); -#endif - TO_ARROW_CASE(UINT8); - TO_ARROW_CASE(UINT16); - TO_ARROW_CASE(UINT32); - TO_ARROW_CASE(UINT64); -#if (NPY_UINT64 != NPY_ULONGLONG) - TO_ARROW_CASE(ULONGLONG); -#endif - TO_ARROW_CASE(FLOAT32); - TO_ARROW_CASE(FLOAT64); - TO_ARROW_CASE(DATETIME); - case NPY_OBJECT: { - ArrowSerializer<NPY_OBJECT> converter(pool, arr, mask); - converter.IndicateType(field); - RETURN_NOT_OK(converter.Convert(out)); - } break; - default: - std::stringstream ss; - ss << "Unsupported numpy type " << PyArray_DESCR(arr)->type_num << std::endl; - return Status::NotImplemented(ss.str()); - } - return Status::OK(); -} + // column number -> datetimetz block + BlockMap datetimetz_blocks_; +}; class ArrowDeserializer { public: @@ -1839,7 +1694,7 @@ class ArrowDeserializer { Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr<Array> arr) { typedef typename arrow_traits<TYPE>::T T; - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto prim_arr = static_cast<PrimitiveArray*>(arr.get()); auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); // Zero-Copy. We can pass the data pointer directly to NumPy. @@ -1988,19 +1843,19 @@ class ArrowDeserializer { inline typename std::enable_if<TYPE == Type::STRING, Status>::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - return ConvertBinaryLike<arrow::StringArray>(data_, out_values); + return ConvertBinaryLike<StringArray>(data_, out_values); } template <int T2> inline typename std::enable_if<T2 == Type::BINARY, Status>::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values); + return ConvertBinaryLike<BinaryArray>(data_, out_values); } #define CONVERTVALUES_LISTSLIKE_CASE(ArrowType, ArrowEnum) \ case Type::ArrowEnum: \ - return ConvertListsLike<::arrow::ArrowType>(col_, out_values); + return ConvertListsLike<ArrowType>(col_, out_values); template <int T2> inline typename std::enable_if<T2 == Type::LIST, Status>::type ConvertValues() { @@ -2051,7 +1906,7 @@ class ArrowDeserializer { private: std::shared_ptr<Column> col_; - const arrow::ChunkedArray& data_; + const ChunkedArray& data_; PyObject* py_ref_; PyArrayObject* arr_; PyObject* result_; @@ -2071,4 +1926,11 @@ Status ConvertColumnToPandas( return converter.Convert(out); } -} // namespace pyarrow +Status ConvertTableToPandas( + const std::shared_ptr<Table>& table, int nthreads, PyObject** out) { + DataFrameBlockCreator helper(table); + return helper.Convert(nthreads, out); +} + +} // namespace py +} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/adapters/pandas.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h index b548f93..6862339 100644 --- a/python/src/pyarrow/adapters/pandas.h +++ b/python/src/pyarrow/adapters/pandas.h @@ -25,28 +25,26 @@ #include <memory> -#include "pyarrow/visibility.h" +#include "arrow/util/visibility.h" namespace arrow { class Array; class Column; -class Field; +class DataType; class MemoryPool; class Status; class Table; -} // namespace arrow - -namespace pyarrow { +namespace py { -PYARROW_EXPORT -arrow::Status ConvertArrayToPandas( - const std::shared_ptr<arrow::Array>& arr, PyObject* py_ref, PyObject** out); +ARROW_EXPORT +Status ConvertArrayToPandas( + const std::shared_ptr<Array>& arr, PyObject* py_ref, PyObject** out); -PYARROW_EXPORT -arrow::Status ConvertColumnToPandas( - const std::shared_ptr<arrow::Column>& col, PyObject* py_ref, PyObject** out); +ARROW_EXPORT +Status ConvertColumnToPandas( + const std::shared_ptr<Column>& col, PyObject* py_ref, PyObject** out); struct PandasOptions { bool strings_to_categorical; @@ -58,14 +56,24 @@ struct PandasOptions { // BlockManager structure of the pandas.DataFrame used as of pandas 0.19.x. // // tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2]) -PYARROW_EXPORT -arrow::Status ConvertTableToPandas( - const std::shared_ptr<arrow::Table>& table, int nthreads, PyObject** out); +ARROW_EXPORT +Status ConvertTableToPandas( + const std::shared_ptr<Table>& table, int nthreads, PyObject** out); + +ARROW_EXPORT +Status PandasDtypeToArrow(PyObject* dtype, std::shared_ptr<DataType>* out); -PYARROW_EXPORT -arrow::Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, - const std::shared_ptr<arrow::Field>& field, std::shared_ptr<arrow::Array>* out); +ARROW_EXPORT +Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); -} // namespace pyarrow +/// Convert dtype=object arrays. If target data type is not known, pass a type +/// with nullptr +ARROW_EXPORT +Status PandasObjectsToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, + const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* out); + +} // namespace py +} // namespace arrow #endif // PYARROW_ADAPTERS_PANDAS_H http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/common.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index d2f5291..c898f63 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -24,24 +24,23 @@ #include "arrow/memory_pool.h" #include "arrow/status.h" -using arrow::Status; - -namespace pyarrow { +namespace arrow { +namespace py { static std::mutex memory_pool_mutex; -static arrow::MemoryPool* default_pyarrow_pool = nullptr; +static MemoryPool* default_pyarrow_pool = nullptr; -void set_default_memory_pool(arrow::MemoryPool* pool) { +void set_default_memory_pool(MemoryPool* pool) { std::lock_guard<std::mutex> guard(memory_pool_mutex); default_pyarrow_pool = pool; } -arrow::MemoryPool* get_memory_pool() { +MemoryPool* get_memory_pool() { std::lock_guard<std::mutex> guard(memory_pool_mutex); if (default_pyarrow_pool) { return default_pyarrow_pool; } else { - return arrow::default_memory_pool(); + return default_memory_pool(); } } @@ -60,4 +59,5 @@ PyBytesBuffer::~PyBytesBuffer() { Py_DECREF(obj_); } -} // namespace pyarrow +} // namespace py +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/common.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index ad65ec7..0b4c6be 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -19,16 +19,16 @@ #define PYARROW_COMMON_H #include "pyarrow/config.h" -#include "pyarrow/visibility.h" #include "arrow/buffer.h" #include "arrow/util/macros.h" +#include "arrow/util/visibility.h" namespace arrow { + class MemoryPool; -} -namespace pyarrow { +namespace py { class PyAcquireGIL { public: @@ -98,10 +98,10 @@ struct PyObjectStringify { } // Return the common PyArrow memory pool -PYARROW_EXPORT void set_default_memory_pool(arrow::MemoryPool* pool); -PYARROW_EXPORT arrow::MemoryPool* get_memory_pool(); +ARROW_EXPORT void set_default_memory_pool(MemoryPool* pool); +ARROW_EXPORT MemoryPool* get_memory_pool(); -class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { +class ARROW_EXPORT NumPyBuffer : public Buffer { public: NumPyBuffer(PyArrayObject* arr) : Buffer(nullptr, 0) { arr_ = arr; @@ -118,7 +118,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { PyArrayObject* arr_; }; -class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { +class ARROW_EXPORT PyBytesBuffer : public Buffer { public: PyBytesBuffer(PyObject* obj); ~PyBytesBuffer(); @@ -127,6 +127,7 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { PyObject* obj_; }; -} // namespace pyarrow +} // namespace py +} // namespace arrow #endif // PYARROW_COMMON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/config.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/config.cc b/python/src/pyarrow/config.cc index e1002bf..0be6d96 100644 --- a/python/src/pyarrow/config.cc +++ b/python/src/pyarrow/config.cc @@ -19,7 +19,8 @@ #include "pyarrow/config.h" -namespace pyarrow { +namespace arrow { +namespace py { void pyarrow_init() {} @@ -30,4 +31,5 @@ void pyarrow_set_numpy_nan(PyObject* obj) { numpy_nan = obj; } -} // namespace pyarrow +} // namespace py +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/config.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/config.h b/python/src/pyarrow/config.h index 386ee4b..87fc5c2 100644 --- a/python/src/pyarrow/config.h +++ b/python/src/pyarrow/config.h @@ -20,24 +20,27 @@ #include <Python.h> +#include "arrow/util/visibility.h" + #include "pyarrow/numpy_interop.h" -#include "pyarrow/visibility.h" #if PY_MAJOR_VERSION >= 3 #define PyString_Check PyUnicode_Check #endif -namespace pyarrow { +namespace arrow { +namespace py { -PYARROW_EXPORT +ARROW_EXPORT extern PyObject* numpy_nan; -PYARROW_EXPORT +ARROW_EXPORT void pyarrow_init(); -PYARROW_EXPORT +ARROW_EXPORT void pyarrow_set_numpy_nan(PyObject* obj); -} // namespace pyarrow +} // namespace py +} // namespace arrow #endif // PYARROW_CONFIG_H http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/helpers.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/helpers.cc b/python/src/pyarrow/helpers.cc index 78fad16..edebea6 100644 --- a/python/src/pyarrow/helpers.cc +++ b/python/src/pyarrow/helpers.cc @@ -19,9 +19,8 @@ #include <arrow/api.h> -using namespace arrow; - -namespace pyarrow { +namespace arrow { +namespace py { #define GET_PRIMITIVE_TYPE(NAME, FACTORY) \ case Type::NAME: \ @@ -51,4 +50,5 @@ std::shared_ptr<DataType> GetPrimitiveType(Type::type type) { } } -} // namespace pyarrow +} // namespace py +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/helpers.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/helpers.h b/python/src/pyarrow/helpers.h index 788c3ee..611e814 100644 --- a/python/src/pyarrow/helpers.h +++ b/python/src/pyarrow/helpers.h @@ -18,19 +18,18 @@ #ifndef PYARROW_HELPERS_H #define PYARROW_HELPERS_H -#include <arrow/api.h> #include <memory> -#include "pyarrow/visibility.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" -namespace pyarrow { +namespace arrow { +namespace py { -using arrow::DataType; -using arrow::Type; - -PYARROW_EXPORT +ARROW_EXPORT std::shared_ptr<DataType> GetPrimitiveType(Type::type type); -} // namespace pyarrow +} // namespace py +} // namespace arrow #endif // PYARROW_HELPERS_H http://git-wip-us.apache.org/repos/asf/arrow/blob/00df40ce/python/src/pyarrow/io.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index aa4cb7b..0aa61dc 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -26,9 +26,8 @@ #include "pyarrow/common.h" -using arrow::Status; - -namespace pyarrow { +namespace arrow { +namespace py { // ---------------------------------------------------------------------- // Python file @@ -151,7 +150,7 @@ Status PyReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { return Status::OK(); } -Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) { +Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { PyAcquireGIL lock; PyObject* bytes_obj; @@ -214,8 +213,9 @@ Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { // A readable file that is backed by a PyBytes PyBytesReader::PyBytesReader(PyObject* obj) - : arrow::io::BufferReader(std::make_shared<PyBytesBuffer>(obj)) {} + : io::BufferReader(std::make_shared<PyBytesBuffer>(obj)) {} PyBytesReader::~PyBytesReader() {} -} // namespace pyarrow +} // namespace py +} // namespace arrow
