westonpace commented on a change in pull request #10431: URL: https://github.com/apache/arrow/pull/10431#discussion_r680278216
########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } Review comment: return `Status::NotImplemented`? ########## File path: ci/scripts/cpp_build.sh ########## @@ -85,6 +85,8 @@ cmake -G "${CMAKE_GENERATOR:-Ninja}" \ -DARROW_PYTHON=${ARROW_PYTHON:-OFF} \ -DARROW_RUNTIME_SIMD_LEVEL=${ARROW_RUNTIME_SIMD_LEVEL:-MAX} \ -DARROW_S3=${ARROW_S3:-OFF} \ + -DARROW_RADOS=${ARROW_RADOS:-OFF} \ + -DARROW_CLS=${ARROW_CLS:-OFF} \ Review comment: Ok, coming back to this now that there has been a bit of discussion. Yes, I think you can name it `ARROW_SKYHOOK`. In the future it might be extracted into its own build file. ########## File path: .travis.yml ########## @@ -79,6 +79,44 @@ jobs: ARCH: arm64v8 ARROW_CI_MODULES: "GO" DOCKER_IMAGE_ID: debian-go + Review comment: Ideally we would just be able to add ARROW_RADOS=ON to the existing Travis build. It appears you have identified two issues with the current build (S3 and no-cache leading to failing / timed out Travis builds). I don't think these are unique to this PR. It might be a good idea (and very helpful use of your good work) to raise these issues through a separate JIRA and get them resolved on the main repo and then there is no need for a separate build job here. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, Review comment: No mutable references. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, Review comment: Style guide prohibits mutable references. Use `std::shared_ptr<arrow::Table>* result_table` instead. ########## File path: .github/workflows/docs.yml ########## @@ -0,0 +1,57 @@ +# # Licensed to the Apache Software Foundation (ASF) under one Review comment: Just a reminder to revert this at some point ########## File path: ci/scripts/integration_ceph.sh ########## @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e +set -x +set -u + +ARROW_BUILD_DIR=${1}/cpp +DIR=/tmp/ + +# reset +pkill ceph || true +rm -rf ${DIR}/* +LOG_DIR=${DIR}/log +MON_DATA=${DIR}/mon +MDS_DATA=${DIR}/mds +MOUNTPT=${MDS_DATA}/mnt +OSD_DATA=${DIR}/osd +mkdir ${LOG_DIR} ${MON_DATA} ${OSD_DATA} ${MDS_DATA} ${MOUNTPT} +MDS_NAME="Z" +MON_NAME="a" +MGR_NAME="x" +MIRROR_ID="m" + +# cluster wide parameters +cat >> ${DIR}/ceph.conf <<EOF +[global] +fsid = $(uuidgen) +osd crush chooseleaf type = 0 +run dir = ${DIR}/run +auth cluster required = none +auth service required = none +auth client required = none +osd pool default size = 1 +mon host = ${HOSTNAME} +[mds.${MDS_NAME}] +host = ${HOSTNAME} +[mon.${MON_NAME}] +log file = ${LOG_DIR}/mon.log +chdir = "" +mon cluster log file = ${LOG_DIR}/mon-cluster.log +mon data = ${MON_DATA} +mon data avail crit = 0 +mon addr = ${HOSTNAME} +mon allow pool delete = true +[osd.0] +log file = ${LOG_DIR}/osd.log +chdir = "" +osd data = ${OSD_DATA} +osd journal = ${OSD_DATA}.journal +osd journal size = 100 +osd objectstore = memstore +osd class load list = * +osd class default list = * +EOF + +export CEPH_CONF=${DIR}/ceph.conf +cp $CEPH_CONF /etc/ceph/ceph.conf + +# start an osd +ceph-mon --id ${MON_NAME} --mkfs --keyring /dev/null +touch ${MON_DATA}/keyring +ceph-mon --id ${MON_NAME} + +# start an osd +OSD_ID=$(ceph osd create) +ceph osd crush add osd.${OSD_ID} 1 root=default +ceph-osd --id ${OSD_ID} --mkjournal --mkfs +ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID} || ceph-osd --id ${OSD_ID} + +# start an mds for cephfs +ceph auth get-or-create mds.${MDS_NAME} mon 'profile mds' mgr 'profile mds' mds 'allow *' osd 'allow *' > ${MDS_DATA}/keyring +ceph osd pool create cephfs_data 8 +ceph osd pool create cephfs_metadata 8 +ceph fs new cephfs cephfs_metadata cephfs_data +ceph fs ls +ceph-mds -i ${MDS_NAME} +ceph status +while [[ ! $(ceph mds stat | grep "up:active") ]]; do sleep 1; done + +# start a manager +ceph-mgr --id ${MGR_NAME} + +# test the setup +ceph --version +ceph status + +apt update +apt install -y ceph-fuse attr + +pushd ${ARROW_BUILD_DIR} + # create the rados-classes, if not there already + mkdir -p /usr/lib/x86_64-linux-gnu/rados-classes/ + cp debug/libcls_arrow* /usr/lib/x86_64-linux-gnu/rados-classes/ + + # mount a ceph filesystem to /mnt/cephfs in the user-space using ceph-fuse + mkdir -p /mnt/cephfs + ceph-fuse /mnt/cephfs + sleep 5 + + # download an example dataset and copy into the mounted dir + rm -rf nyc* + wget https://raw.githubusercontent.com/JayjeetAtGithub/zips/main/nyc.zip Review comment: We'll probably want to find a better place for this data to live. I've started a topic on Zulip (https://ursalabs.zulipchat.com/#narrow/stream/180245-dev/topic/Data.20storage.20for.20CI/near/247775557) to ask where the best place to store such test data might be. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file); + + auto format = std::make_shared<arrow::dataset::ParquetFileFormat>(); + + auto fragment_scan_options = + std::make_shared<arrow::dataset::ParquetFragmentScanOptions>(); + + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); Review comment: Why set `UseThreads` to `false`? ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. Review comment: ```suggestion /// Read a specified number of bytes from a specified position. ``` ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, Review comment: Seems like there is some overlap between `ScanParquetObject` and `ScanIpcObject` you could lift into a common helper method. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { Review comment: You should ensure you `Close` the file in `~RandomAccessObject` for safety and maintainability. ########## File path: ci/scripts/integration_ceph.sh ########## @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e +set -x +set -u + +ARROW_BUILD_DIR=${1}/cpp +DIR=/tmp/ Review comment: Given the below `rm -rf ${DIR}/*` command can we maybe use `/tmp/integration-ceph` or something? ########## File path: format/ScanRequest.fbs ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review comment: So here is my current understanding. Let me know if this seems off. There are two pieces to this. There is a ceph object class (called Skyhook?) which processes scan tasks and lives in a "contrib" directory. There is a fragment / file format for Arrow that understands how to send scan requests to a ceph storage server in the skyhook format. These two components aren't tightly coupled. The only source of agreement is the Arrow columnar format and this flatbuffers file. So for example (these are thought exercises, not things that will necessarily ever happen): * Ceph could be running an older version of Skyhook built with Arrow version X and the dataset client could be running a newer version of Arrow version X+N. * Skyhook could switch to some other library entirely in the future and as long as it continued to respect the flatbuffers format it would continue to work. * A different non-arrow library (or an Arrow implementation in a different language) could decide to start sending requests to Skyhook and as long as they agreed upon the flatbuffers and arrow columnar format everything would continue to work. Given the above I think the proper place for this flatbuffers file to live is in the same directory as the ceph object class. This flatbuffers file is the API for skyhook. Then, for building everything, the make files for that directory could produce two artifacts: A ceph object class and a small C++ "client library" which is just the output of the flatbuffers compiler. Or you could skip the "client library" step and add an extra build step for the datasets module which runs the flatbuffers compiler. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file); + + auto format = std::make_shared<arrow::dataset::ParquetFileFormat>(); + + auto fragment_scan_options = + std::make_shared<arrow::dataset::ParquetFragmentScanOptions>(); + + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief The scanning operation to register on Ceph nodes. The request is +/// deserialized, the object is scanned, and the resulting table is serialized +/// and sent to the client. +/// \param[in] hctx RADOS object context. +/// \param[in] in Input bufferlist. +/// \param[out] out Output bufferlist. +/// \return Status code. +static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in, + ceph::bufferlist* out) { + // Components required to construct a File fragment. + arrow::Status s; + arrow::compute::Expression filter; + arrow::compute::Expression partition_expression; + std::shared_ptr<arrow::Schema> projection_schema; + std::shared_ptr<arrow::Schema> dataset_schema; + int64_t file_size; + int file_format = 0; // 0 = Parquet, 1 = Ipc + + // Deserialize the scan request + if (!(s = arrow::dataset::DeserializeScanRequest(&filter, &partition_expression, Review comment: I'm not sure you should need anything called a projection schema. I think you just need to load the materialized fields (from scan options). ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file); + + auto format = std::make_shared<arrow::dataset::ParquetFileFormat>(); + + auto fragment_scan_options = + std::make_shared<arrow::dataset::ParquetFragmentScanOptions>(); + + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief The scanning operation to register on Ceph nodes. The request is +/// deserialized, the object is scanned, and the resulting table is serialized +/// and sent to the client. +/// \param[in] hctx RADOS object context. +/// \param[in] in Input bufferlist. +/// \param[out] out Output bufferlist. +/// \return Status code. +static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in, + ceph::bufferlist* out) { + // Components required to construct a File fragment. + arrow::Status s; + arrow::compute::Expression filter; + arrow::compute::Expression partition_expression; + std::shared_ptr<arrow::Schema> projection_schema; + std::shared_ptr<arrow::Schema> dataset_schema; + int64_t file_size; + int file_format = 0; // 0 = Parquet, 1 = Ipc + + // Deserialize the scan request + if (!(s = arrow::dataset::DeserializeScanRequest(&filter, &partition_expression, + &projection_schema, &dataset_schema, + file_size, file_format, *in)) + .ok()) { + CLS_LOG(0, "error: %s", s.message().c_str()); Review comment: Maybe extract this into a helper method `LogArrowError` or something? ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); Review comment: I think the approach used elsewhere is to subclass buffer and delete the memory in the constructor. For example, see `arrow::io::MemoryMappedFile::MemoryMap::Region` in `src/arrow/io/file.cc`. It seems odd to me that the data would be deleted when the object is closed. ########## File path: cpp/src/arrow/adapters/arrow-rados-cls/cls_arrow.cc ########## @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#define _FILE_OFFSET_BITS 64 +#include <rados/objclass.h> +#include <memory> + +#include "arrow/api.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/dataset/file_skyhook.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/util/compression.h" +#include "parquet/api/reader.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" + +#define SCAN_ERR_CODE 25 +#define SCAN_REQ_DESER_ERR_CODE 26 +#define SCAN_RES_SER_ERR_CODE 27 + +CLS_VER(1, 0) +CLS_NAME(arrow) + +cls_handle_t h_class; +cls_method_handle_t h_scan_op; + +/// \class RandomAccessObject +/// \brief An interface to provide a file-like view over RADOS objects. +class RandomAccessObject : public arrow::io::RandomAccessFile { + public: + explicit RandomAccessObject(cls_method_context_t hctx, int64_t file_size) { + hctx_ = hctx; + content_length_ = file_size; + chunks_ = std::vector<ceph::bufferlist*>(); + } + + /// Check if the file stream is closed. + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + /// Check if the position of the object is valid. + arrow::Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return arrow::Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return arrow::Status::IOError("Cannot ", action, " past end of file"); + } + return arrow::Status::OK(); + } + + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { return 0; } + + /// Read at a specified number of bytes from a specified position. + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + if (nbytes > 0) { + ceph::bufferlist* bl = new ceph::bufferlist(); + cls_cxx_read(hctx_, position, nbytes, bl); + chunks_.push_back(bl); + return std::make_shared<arrow::Buffer>((uint8_t*)bl->c_str(), bl->length()); + } + return std::make_shared<arrow::Buffer>(""); + } + + /// Read a specified number of bytes from the current position. + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + /// Read a specified number of bytes from the current position into an output stream. + arrow::Result<int64_t> Read(int64_t nbytes, void* out) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + /// Return the size of the file. + arrow::Result<int64_t> GetSize() { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + /// Sets the file-pointer offset, measured from the beginning of the + /// file, at which the next read or write occurs. + arrow::Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return arrow::Status::OK(); + } + + /// Returns the file-pointer offset. + arrow::Result<int64_t> Tell() const { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + /// Closes the file stream and deletes the chunks and releases the memory + /// used by the chunks. + arrow::Status Close() { + closed_ = true; + for (auto chunk : chunks_) { + delete chunk; + } + return arrow::Status::OK(); + } + + bool closed() const { return closed_; } + + protected: + cls_method_context_t hctx_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; + std::vector<ceph::bufferlist*> chunks_; +}; + +/// \brief Scan RADOS objects containing Arrow IPC data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanIpcObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file, arrow::Compression::LZ4_FRAME); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief Scan RADOS objects containing Parquet binary data. +/// \param[in] hctx RADOS object context. +/// \param[in] filter The filter expression to apply. +/// \param[in] partition_expression The partition expression to use. +/// \param[in] projection_schema The projection schema. +/// \param[in] dataset_schema The dataset schema. +/// \param[out] result_table Table to store the resultant data. +/// \param[in] object_size The size of the object. +/// \return Status. +static arrow::Status ScanParquetObject(cls_method_context_t hctx, + arrow::compute::Expression filter, + arrow::compute::Expression partition_expression, + std::shared_ptr<arrow::Schema> projection_schema, + std::shared_ptr<arrow::Schema> dataset_schema, + std::shared_ptr<arrow::Table>& result_table, + int64_t object_size) { + auto file = std::make_shared<RandomAccessObject>(hctx, object_size); + arrow::dataset::FileSource source(file); + + auto format = std::make_shared<arrow::dataset::ParquetFileFormat>(); + + auto fragment_scan_options = + std::make_shared<arrow::dataset::ParquetFragmentScanOptions>(); + + ARROW_ASSIGN_OR_RAISE(auto fragment, + format->MakeFragment(source, partition_expression)); + auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto builder = + std::make_shared<arrow::dataset::ScannerBuilder>(dataset_schema, fragment, options); + + ARROW_RETURN_NOT_OK(builder->Filter(filter)); + ARROW_RETURN_NOT_OK(builder->Project(projection_schema->field_names())); + ARROW_RETURN_NOT_OK(builder->UseThreads(false)); + ARROW_RETURN_NOT_OK(builder->FragmentScanOptions(fragment_scan_options)); + + ARROW_ASSIGN_OR_RAISE(auto scanner, builder->Finish()); + ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable()); + + result_table = table; + + ARROW_RETURN_NOT_OK(file->Close()); + return arrow::Status::OK(); +} + +/// \brief The scanning operation to register on Ceph nodes. The request is +/// deserialized, the object is scanned, and the resulting table is serialized +/// and sent to the client. +/// \param[in] hctx RADOS object context. +/// \param[in] in Input bufferlist. +/// \param[out] out Output bufferlist. +/// \return Status code. +static int scan_op(cls_method_context_t hctx, ceph::bufferlist* in, + ceph::bufferlist* out) { + // Components required to construct a File fragment. + arrow::Status s; + arrow::compute::Expression filter; + arrow::compute::Expression partition_expression; + std::shared_ptr<arrow::Schema> projection_schema; + std::shared_ptr<arrow::Schema> dataset_schema; + int64_t file_size; + int file_format = 0; // 0 = Parquet, 1 = Ipc + + // Deserialize the scan request + if (!(s = arrow::dataset::DeserializeScanRequest(&filter, &partition_expression, + &projection_schema, &dataset_schema, + file_size, file_format, *in)) + .ok()) { + CLS_LOG(0, "error: %s", s.message().c_str()); + return SCAN_REQ_DESER_ERR_CODE; + } + + // Scan the object + std::shared_ptr<arrow::Table> table; + if (file_format == 0) { Review comment: You might want to consider an enum here instead of magic numbers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
