Repository: parquet-cpp Updated Branches: refs/heads/master 05b00fa7e -> 6faff712d
PARQUET-1041: Support Arrow's NullArray Closes #358. This only includes an Arrow version bump to pick up ARROW-1143 Author: Uwe L. Korn <u...@apache.org> Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #360 from wesm/PARQUET-1041 and squashes the following commits: 3f8f0bc [Wes McKinney] Bump Arrow version to master c134bd6 [Uwe L. Korn] Fix int conversion 1def8a4 [Uwe L. Korn] PARQUET-1041: Support Arrow's NullArray Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/6faff712 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/6faff712 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/6faff712 Branch: refs/heads/master Commit: 6faff712d15d999c08678a5a23b9f689f2f085d8 Parents: 05b00fa Author: Uwe L. Korn <u...@apache.org> Authored: Fri Jun 23 12:20:36 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Jun 23 12:20:36 2017 -0400 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 2 +- src/parquet/arrow/arrow-reader-writer-test.cc | 21 ++++++++++++++++++ src/parquet/arrow/reader.cc | 11 ++++++---- src/parquet/arrow/schema.cc | 12 ++++++++--- src/parquet/arrow/writer.cc | 25 +++++++++++++++++++++- src/parquet/file/metadata.cc | 1 + src/parquet/schema.cc | 3 +++ src/parquet/types.h | 3 ++- 8 files changed, 68 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index f958620..2b24e93 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -520,7 +520,7 @@ if (NOT ARROW_FOUND) endif() if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "a8f8ba0cbcf5f596f042e90b7a208e7a0c3925b7") + set(ARROW_VERSION "e209e5865ea58e57925cae24d4bf3f63d58ee21d") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 97bb19b..3beca35 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -802,6 +802,27 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); } +using TestNullParquetIO = TestParquetIO<::arrow::NullType>; + +TEST_F(TestNullParquetIO, NullColumn) { + std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE); + std::shared_ptr<Table> table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); + + std::shared_ptr<Table> out; + std::unique_ptr<FileReader> reader; + this->ReaderFromSink(&reader); + this->ReadTableFromFile(std::move(reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + template <typename T> using ParquetCDataType = typename ParquetDataType<T>::c_type; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 7c1b381..ef9ac34 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -495,10 +495,6 @@ Status FileReader::Impl::ReadTable( std::shared_ptr<::arrow::Schema> schema; RETURN_NOT_OK(GetSchema(indices, &schema)); - int num_fields = static_cast<int>(schema->num_fields()); - int nthreads = std::min<int>(num_threads_, num_fields); - std::vector<std::shared_ptr<Column>> columns(num_fields); - // We only need to read schema fields which have columns indicated // in the indices vector std::vector<int> field_indices; @@ -507,6 +503,7 @@ Status FileReader::Impl::ReadTable( return Status::Invalid("Invalid column index"); } + std::vector<std::shared_ptr<Column>> columns(field_indices.size()); auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) { std::shared_ptr<Array> array; RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array)); @@ -514,6 +511,8 @@ Status FileReader::Impl::ReadTable( return Status::OK(); }; + int num_fields = static_cast<int>(field_indices.size()); + int nthreads = std::min<int>(num_threads_, num_fields); if (nthreads == 1) { for (int i = 0; i < num_fields; i++) { RETURN_NOT_OK(ReadColumnFunc(i)); @@ -1262,6 +1261,10 @@ Status PrimitiveImpl::NextBatch( } switch (field_->type()->id()) { + case ::arrow::Type::NA: + *out = std::make_shared<::arrow::NullArray>(batch_size); + return Status::OK(); + break; TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index a78a23b..2a4ddcd 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -166,6 +166,11 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { } Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) { + if (primitive->logical_type() == LogicalType::NA) { + *out = ::arrow::null(); + return Status::OK(); + } + switch (primitive->physical_type()) { case ParquetType::BOOLEAN: *out = ::arrow::boolean(); @@ -410,9 +415,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field, int length = -1; switch (field->type()->id()) { - // TODO: - // case ArrowType::NA: - // break; + case ArrowType::NA: + type = ParquetType::INT32; + logical_type = LogicalType::NA; + break; case ArrowType::BOOL: type = ParquetType::BOOLEAN; break; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 3344d1b..af4f754 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -62,6 +62,15 @@ class LevelBuilder { Status VisitInline(const Array& array); + Status Visit(const ::arrow::NullArray& array) { + array_offsets_.push_back(static_cast<int32_t>(array.offset())); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.length()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); + } + Status Visit(const ::arrow::PrimitiveArray& array) { array_offsets_.push_back(static_cast<int32_t>(array.offset())); valid_bitmaps_.push_back(array.null_bitmap_data()); @@ -98,7 +107,6 @@ class LevelBuilder { "Level generation for ArrowTypePrefix not supported yet"); \ } - NOT_IMPLEMENTED_VISIT(Null) NOT_IMPLEMENTED_VISIT(Struct) NOT_IMPLEMENTED_VISIT(Union) NOT_IMPLEMENTED_VISIT(Decimal) @@ -141,6 +149,8 @@ class LevelBuilder { reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data()); if (array.null_count() == 0) { std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1); + } else if (array.null_count() == array.length()) { + std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0); } else { const uint8_t* valid_bits = array.null_bitmap_data(); INIT_BITSET(valid_bits, static_cast<int>(array.offset())); @@ -510,6 +520,18 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>( } template <> +Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>( + ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer); + + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr)); + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +template <> Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { @@ -639,6 +661,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { column_writer, values_array, num_levels, def_levels, rep_levels); } } + WRITE_BATCH_CASE(NA, NullType, Int32Type) WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) WRITE_BATCH_CASE(INT8, Int8Type, Int32Type) WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index aea7a74..b37ef4f 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -76,6 +76,7 @@ SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) { case LogicalType::BSON: case LogicalType::JSON: return SortOrder::UNSIGNED; + case LogicalType::NA: case LogicalType::DECIMAL: case LogicalType::LIST: case LogicalType::MAP: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc index 1209ad1..4efa0b2 100644 --- a/src/parquet/schema.cc +++ b/src/parquet/schema.cc @@ -190,6 +190,9 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio throw ParquetException(ss.str()); } break; + case LogicalType::NA: + // NA can annotate any type + break; default: ss << LogicalTypeToString(logical_type); ss << " can not be applied to a primitive type"; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/types.h ---------------------------------------------------------------------- diff --git a/src/parquet/types.h b/src/parquet/types.h index 2b9b11f..8504f5d 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -81,7 +81,8 @@ struct LogicalType { INT_64, JSON, BSON, - INTERVAL + INTERVAL, + NA = 25 }; };