Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/134#discussion_r138018196 --- Diff: c++/src/OrcHdfsFile.cc --- @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc/OrcFile.hh" + +#include "Adaptor.hh" +#include "Exceptions.hh" + +#include <errno.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include "hdfspp/hdfspp.h" + +namespace orc { + + class HdfsFileInputStream : public InputStream { + private: + std::string filename; + std::unique_ptr<hdfs::FileHandle> file; + std::unique_ptr<hdfs::FileSystem> file_system; + uint64_t totalLength; + + public: + HdfsFileInputStream(std::string _filename) { + filename = _filename ; + + //Building a URI object from the given uri_path + hdfs::URI uri; + try { + uri = hdfs::URI::parse_from_string(filename); + } catch (const hdfs::uri_parse_error&) { + throw ParseError("Malformed URI: " + filename); + } + + //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + //and loads configs core-site.xml and hdfs-site.xml from the conf path + hdfs::ConfigParser parser; + if(!parser.LoadDefaultResources()){ + throw ParseError("Could not load default resources. "); + } + auto stats = parser.ValidateResources(); + //validating core-site.xml + if(!stats[0].second.ok()){ + throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString()); + } + //validating hdfs-site.xml + if(!stats[1].second.ok()){ + throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString()); + } + hdfs::Options options; + if(!parser.get_options(options)){ + throw ParseError("Could not load Options object. "); + } + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping file_system into a unique pointer to guarantee deletion + file_system = std::unique_ptr<hdfs::FileSystem>( + hdfs::FileSystem::New(io_service, "", options)); + if (file_system.get() == nullptr) { + throw ParseError("Can't create FileSystem object. "); + } + hdfs::Status status; + //Checking if the user supplied the host + if(!uri.get_host().empty()){ + //Using port if supplied, otherwise using "" to look up port in configs + std::string port = uri.has_port() ? + std::to_string(uri.get_port()) : ""; + status = file_system->Connect(uri.get_host(), port); + if (!status.ok()) { + throw ParseError("Can't connect to " + uri.get_host() + + ":" + port + ". " + status.ToString()); + } + } else { + status = file_system->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + throw ParseError("Error connecting to " + + options.defaultFS.str() + ". " + status.ToString()); + } else { + throw ParseError( + "Error connecting to the cluster: defaultFS is empty. " + + status.ToString()); + } + } + } + + if (file_system.get() == nullptr) { + throw ParseError("Can't connect the file system. "); + } + + hdfs::FileHandle *file_raw = nullptr; + status = file_system->Open(uri.get_path(), &file_raw); + if (!status.ok()) { + throw ParseError("Can't open " + + uri.get_path() + ". " + status.ToString()); + } + //Wrapping file_raw into a unique pointer to guarantee deletion + file.reset(file_raw); + + hdfs::StatInfo stat_info; + status = file_system->GetFileInfo(uri.get_path(), stat_info); + if (!status.ok()) { + throw ParseError("Can't stat " + + uri.get_path() + ". " + status.ToString()); + } + totalLength = stat_info.length; + } + + uint64_t getLength() const override { + return totalLength; + } + + uint64_t getNaturalReadSize() const override { + return 1048576; //1 MB --- End diff -- nit: Use const variable?
---