[ https://issues.apache.org/jira/browse/ARROW-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295885#comment-16295885 ]
ASF GitHub Bot commented on ARROW-1920: --------------------------------------- jcrist commented on a change in pull request #1418: ARROW-1920 [C++/Python] Add ORC Reader URL: https://github.com/apache/arrow/pull/1418#discussion_r157631664 ########## File path: cpp/src/arrow/adapters/orc/adapter.cc ########## @@ -0,0 +1,699 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include <cstdint> +#include <list> +#include <memory> +#include <sstream> +#include <string> +#include <vector> + +#include "arrow/adapters/orc/adapter.h" +#include "arrow/buffer.h" +#include "arrow/builder.h" +#include "arrow/io/interfaces.h" +#include "arrow/memory_pool.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/table_builder.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/decimal.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +#include "orc/OrcFile.hh" + +// alias to not interfere with nested orc namespace +namespace liborc = orc; + +namespace arrow { +namespace adapters { +namespace orc { + +#define ORC_THROW_NOT_OK(s) \ + do { \ + Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + throw liborc::ParseError(ss.str()); \ + } \ + } while (0) + +class ArrowInputFile : public liborc::InputStream { + public: + explicit ArrowInputFile(const std::shared_ptr<io::ReadableFileInterface>& file) + : file_(file) {} + + uint64_t getLength() const override { + int64_t size; + ORC_THROW_NOT_OK(file_->GetSize(&size)); + return static_cast<uint64_t>(size); + } + + uint64_t getNaturalReadSize() const override { return 128 * 1024; } + + void read(void* buf, uint64_t length, uint64_t offset) override { + int64_t bytes_read; + + ORC_THROW_NOT_OK(file_->ReadAt(offset, length, &bytes_read, buf)); + + if (static_cast<uint64_t>(bytes_read) != length) { + throw liborc::ParseError("Short read from arrow input file"); + } + } + + const std::string& getName() const override { + static const std::string filename("ArrowInputFile"); + return filename; + } + + private: + std::shared_ptr<io::ReadableFileInterface> file_; +}; + +struct StripeInformation { + uint64_t offset; + uint64_t length; + uint64_t num_rows; +}; + +Status get_dtype(const liborc::Type* type, std::shared_ptr<DataType>* out) { + if (type == nullptr) { + *out = null(); + return Status::OK(); + } + liborc::TypeKind kind = type->getKind(); + switch (kind) { + case liborc::BOOLEAN: + *out = boolean(); + break; + case liborc::BYTE: + *out = int8(); + break; + case liborc::SHORT: + *out = int16(); + break; + case liborc::INT: + *out = int32(); + break; + case liborc::LONG: + *out = int64(); + break; + case liborc::FLOAT: + *out = float32(); + break; + case liborc::DOUBLE: + *out = float64(); + break; + case liborc::VARCHAR: + case liborc::STRING: + *out = utf8(); + break; + case liborc::BINARY: + *out = binary(); + break; + case liborc::CHAR: + *out = fixed_size_binary(type->getMaximumLength()); + break; + case liborc::TIMESTAMP: + *out = timestamp(TimeUnit::NANO); + break; + case liborc::DATE: + *out = date64(); + break; + case liborc::DECIMAL: { + if (type->getPrecision() == 0) { + // In HIVE 0.11/0.12 precision is set as 0, but means max precision + *out = decimal(38, 6); + } else { + *out = decimal(type->getPrecision(), type->getScale()); + } + break; + } + case liborc::LIST: { + if (type->getSubtypeCount() != 1) { + return Status::Invalid("Invalid Orc List type"); + } + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(get_dtype(type->getSubtype(0), &elemtype)); + *out = list(elemtype); + break; + } + case liborc::MAP: { + if (type->getSubtypeCount() != 2) { + return Status::Invalid("Invalid Orc Map type"); + } + std::shared_ptr<DataType> keytype; + std::shared_ptr<DataType> valtype; + RETURN_NOT_OK(get_dtype(type->getSubtype(0), &keytype)); + RETURN_NOT_OK(get_dtype(type->getSubtype(1), &valtype)); + *out = list(struct_({field("key", keytype), field("value", valtype)})); + break; + } + case liborc::STRUCT: { + int size = type->getSubtypeCount(); + std::vector<std::shared_ptr<Field>> fields; + for (int child = 0; child < size; ++child) { + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(get_dtype(type->getSubtype(child), &elemtype)); + std::string name = type->getFieldName(child); + fields.push_back(field(name, elemtype)); + } + *out = struct_(fields); + break; + } + case liborc::UNION: { + int size = type->getSubtypeCount(); + std::vector<std::shared_ptr<Field>> fields; + std::vector<uint8_t> type_codes; + for (int child = 0; child < size; ++child) { + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(get_dtype(type->getSubtype(child), &elemtype)); + fields.push_back(field("_union_" + std::to_string(child), elemtype)); + type_codes.push_back((uint8_t)child); + } + *out = union_(fields, type_codes); + break; + } + default: { + std::stringstream ss; + ss << "Unknown Orc type kind: " << kind; + return Status::Invalid(ss.str()); + } + } + return Status::OK(); +} + +// The number of rows to read in a ColumnVectorBatch +constexpr int64_t kReadRowsBatch = 1000; + +class ORCFileReader::Impl { + public: + explicit Impl(MemoryPool* pool, std::unique_ptr<liborc::Reader> reader) + : pool_(pool), reader_(std::move(reader)) {} + + static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, + MemoryPool* pool, std::unique_ptr<Impl>* impl) { + std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file)); + liborc::ReaderOptions options; + std::unique_ptr<liborc::Reader> liborc_reader; + try { + liborc_reader = createReader(std::move(io_wrapper), options); + } catch (const liborc::ParseError& e) { + return Status::IOError(e.what()); + } + impl->reset(new Impl(pool, std::move(liborc_reader))); + RETURN_NOT_OK((*impl)->Init()); + + return Status::OK(); + } + + Status Init() { + int64_t nstripes = reader_->getNumberOfStripes(); + stripes_.resize(nstripes); + std::unique_ptr<liborc::StripeInformation> stripe; + for (int i = 0; i < nstripes; ++i) { + stripe = reader_->getStripe(i); + stripes_[i] = StripeInformation( + {stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows()}); + } + return Status::OK(); + } + + int64_t NumberOfStripes() { return stripes_.size(); } + + int64_t NumberOfRows() { return reader_->getNumberOfRows(); } + + Status ReadSchema(std::shared_ptr<Schema>* out) { + const liborc::Type& type = reader_->getType(); + return type_to_schema(type, out); + } + + Status type_to_schema(const liborc::Type& type, std::shared_ptr<Schema>* out) { + if (type.getKind() != liborc::STRUCT) { + return Status::NotImplemented( + "Only ORC files with a top-level struct " + "can be handled"); + } + int size = type.getSubtypeCount(); + std::vector<std::shared_ptr<Field>> fields; + for (int child = 0; child < size; ++child) { + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(get_dtype(type.getSubtype(child), &elemtype)); + std::string name = type.getFieldName(child); + fields.push_back(field(name, elemtype)); + } + std::list<std::string> keys = reader_->getMetadataKeys(); + std::shared_ptr<KeyValueMetadata> metadata; + if (!keys.empty()) { + metadata = std::make_shared<KeyValueMetadata>(); + for (auto it = keys.begin(); it != keys.end(); ++it) { + metadata->Append(*it, reader_->getMetadataValue(*it)); + } + } + + *out = std::make_shared<Schema>(fields, metadata); + return Status::OK(); + } + + Status Read(std::shared_ptr<RecordBatch>* out) { + liborc::RowReaderOptions opts; + return read_batch(opts, NumberOfRows(), out); + } + + Status Read(const std::list<uint64_t>& include_indices, + std::shared_ptr<RecordBatch>* out) { + liborc::RowReaderOptions opts; + opts.includeTypes(include_indices); + return read_batch(opts, NumberOfRows(), out); + } + + Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) { + liborc::RowReaderOptions opts; + RETURN_NOT_OK(select_stripe(&opts, stripe)); + return read_batch(opts, stripes_[stripe].num_rows, out); + } + + Status ReadStripe(int64_t stripe, const std::list<uint64_t>& include_indices, + std::shared_ptr<RecordBatch>* out) { + liborc::RowReaderOptions opts; + RETURN_NOT_OK(select_stripe(&opts, stripe)); + opts.includeTypes(include_indices); + return read_batch(opts, stripes_[stripe].num_rows, out); + } + + Status select_stripe(liborc::RowReaderOptions* opts, int64_t stripe) { + if (stripe < 0 || stripe >= NumberOfStripes()) { + std::stringstream ss; + ss << "Out of bounds stripe: " << stripe; + return Status::Invalid(ss.str()); + } + opts->range(stripes_[stripe].offset, stripes_[stripe].length); + return Status::OK(); + } + + Status read_batch(const liborc::RowReaderOptions& opts, int64_t nrows, + std::shared_ptr<RecordBatch>* out) { + std::unique_ptr<liborc::RowReader> rowreader; + std::unique_ptr<liborc::ColumnVectorBatch> batch; + try { + rowreader = reader_->createRowReader(opts); + batch = rowreader->createRowBatch(std::min(nrows, kReadRowsBatch)); + } catch (const liborc::ParseError& e) { + return Status::Invalid(e.what()); + } + const liborc::Type& type = rowreader->getSelectedType(); + std::shared_ptr<Schema> schema; + RETURN_NOT_OK(type_to_schema(type, &schema)); + + std::unique_ptr<RecordBatchBuilder> builder; + RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder)); + + // The top-level type must be a struct to read into an arrow table + auto struct_batch = static_cast<liborc::StructVectorBatch*>(batch.get()); + + int n_fields = builder->num_fields(); + while (rowreader->next(*batch)) { + for (int i = 0; i < n_fields; i++) { + RETURN_NOT_OK(append_batch(type.getSubtype(i), builder->GetField(i), + struct_batch->fields[i], 0, batch->numElements)); + } + } + RETURN_NOT_OK(builder->Flush(out)); + return Status::OK(); + } + + Status append_batch(const liborc::Type* type, ArrayBuilder* builder, Review comment: The ORC api also returns pointers for `ColumnVectorBatch` objects, so I've left those as pointers as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for reading ORC files > --------------------------------- > > Key: ARROW-1920 > URL: https://issues.apache.org/jira/browse/ARROW-1920 > Project: Apache Arrow > Issue Type: New Feature > Components: C++, Python > Reporter: Jim Crist > Labels: pull-request-available > > Would be nice to be able to read ORC files in pyarrow, similar to the already > existing parquet support. -- This message was sent by Atlassian JIRA (v6.4.14#64029)