JayjeetAtGithub commented on a change in pull request #10431: URL: https://github.com/apache/arrow/pull/10431#discussion_r687885903
########## File path: cpp/src/arrow/dataset/file_skyhook.h ########## @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This API is EXPERIMENTAL. +#define _FILE_OFFSET_BITS 64 + +#pragma once + +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include <functional> +#include <memory> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/discovery.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/rados.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" +#include "arrow/filesystem/api.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/iterator.h" +#include "arrow/util/macros.h" +#include "parquet/arrow/writer.h" +#include "parquet/exception.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_ERR_MSG "failed to scan file fragment" + +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request" + +#define SCAN_RES_SER_ERR_CODE 27 +#define SCAN_RES_SER_ERR_MSG "failed to serialize result table" + +namespace arrow { +namespace dataset { + +enum SkyhookFileType { PARQUET, IPC }; + +/// \addtogroup dataset-file-formats +/// +/// @{ + +namespace connection { +/// \brief An interface for general connections. +class ARROW_DS_EXPORT Connection { + public: + virtual Status Connect() = 0; + + Connection() = default; + virtual ~Connection() = default; +}; + +/// \class RadosConnection +/// \brief An interface to connect to a Rados cluster and hold the connection +/// information for usage in later stages. +class ARROW_DS_EXPORT RadosConnection : public Connection { + public: + struct RadosConnectionCtx { + std::string ceph_config_path; + std::string data_pool; + std::string user_name; + std::string cluster_name; + std::string cls_name; + + RadosConnectionCtx(const std::string& ceph_config_path, const std::string& data_pool, + const std::string& user_name, const std::string& cluster_name, + const std::string& cls_name) + : ceph_config_path(ceph_config_path), + data_pool(data_pool), + user_name(user_name), + cluster_name(cluster_name), + cls_name(cls_name) {} + }; + explicit RadosConnection(const RadosConnectionCtx& ctx) + : Connection(), + ctx(ctx), + rados(new RadosWrapper()), + ioCtx(new IoCtxWrapper()), + connected(false) {} + + ~RadosConnection(); + + /// \brief Connect to the Rados cluster. + /// \return Status. + Status Connect(); + + /// \brief Shutdown the connection to the Rados cluster. + /// \return Status. + Status Shutdown(); + + RadosConnectionCtx ctx; + RadosInterface* rados; + IoCtxInterface* ioCtx; + bool connected; +}; + +} // namespace connection + +/// \class SkyhookDirectObjectAccess +/// \brief Interface for translating the name of a file in CephFS to its +/// corresponding object ID in RADOS assuming 1:1 mapping between a file +/// and its underlying object. +class ARROW_DS_EXPORT SkyhookDirectObjectAccess { + public: + explicit SkyhookDirectObjectAccess( + const std::shared_ptr<connection::RadosConnection>& connection) + : connection_(std::move(connection)) {} + + /// \brief Executes the POSIX stat call on a file. + /// \param[in] path Path of the file. + /// \param[out] st Refernce to the struct object to store the result. + /// \return Status. + Status Stat(const std::string& path, struct stat& st) { + struct stat file_st; + if (stat(path.c_str(), &file_st) < 0) + return Status::Invalid("stat returned non-zero exit code."); + st = file_st; + return Status::OK(); + } + + // Helper function to convert Inode to ObjectID because Rados calls work with + // ObjectIDs. + std::string ConvertFileInodeToObjectID(uint64_t inode) { + std::stringstream ss; + ss << std::hex << inode; + std::string oid(ss.str() + ".00000000"); + return oid; + } + + /// \brief Executes query on the librados node. It uses the librados::exec API to + /// perform queries on the storage node and stores the result in the output bufferlist. + /// \param[in] inode inode of the file. + /// \param[in] fn The function to be executed by the librados::exec call. + /// \param[in] in The input bufferlist. + /// \param[out] out The output bufferlist. + /// \return Status. + Status Exec(uint64_t inode, const std::string& fn, ceph::bufferlist& in, + ceph::bufferlist& out) { + std::string oid = ConvertFileInodeToObjectID(inode); + int e = connection_->ioCtx->exec(oid.c_str(), connection_->ctx.cls_name.c_str(), + fn.c_str(), in, out); + if (e == SCAN_ERR_CODE) return Status::Invalid(SCAN_ERR_MSG); + if (e == SCAN_REQ_DESER_ERR_CODE) return Status::Invalid(SCAN_REQ_DESER_ERR_MSG); + if (e == SCAN_RES_SER_ERR_CODE) return Status::Invalid(SCAN_RES_SER_ERR_MSG); + return Status::OK(); + } + + protected: + std::shared_ptr<connection::RadosConnection> connection_; +}; + +/// \class SkyhookFileFormat +/// \brief A ParquetFileFormat implementation that offloads the fragment +/// scan operations to the Ceph OSDs +class ARROW_DS_EXPORT SkyhookFileFormat : public ParquetFileFormat { Review comment: Thanks a lot for the analysis @westonpace. As per the latest PR, we don't need to modify `FileFormat` at all. We just modified `ScanOptions` with a property `skip_compute` to skip filtering on client. I would try to create a `SkyhookFileFragment` abstraction with a `handles_filtering` and `handles_projection` property. That would ideally keep the Arrow code base untouched. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org