paleolimbot commented on code in PR #12323: URL: https://github.com/apache/arrow/pull/12323#discussion_r848522035
########## r/src/io.cpp: ########## @@ -207,7 +209,209 @@ void io___BufferOutputStream__Write( StopIfNotOk(stream->Write(RAW(bytes), bytes.size())); } -// TransformInputStream::TransformFunc wrapper +// ------ RConnectionInputStream / RConnectionOutputStream + +class RConnectionFileInterface : public virtual arrow::io::FileInterface { + public: + explicit RConnectionFileInterface(cpp11::sexp connection_sexp) + : connection_sexp_(connection_sexp), closed_(false) { + check_closed(); + } + + arrow::Status Close() { + if (closed_) { + return arrow::Status::OK(); + } + + auto result = SafeCallIntoR<bool>([&]() { + cpp11::package("base")["close"](connection_sexp_); + return true; + }); + + RETURN_NOT_OK(result); + closed_ = true; + return arrow::Status::OK(); + } + + arrow::Result<int64_t> Tell() const { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_); + return cpp11::as_cpp<int64_t>(result); + } + + bool closed() const { return closed_; } + + protected: + cpp11::sexp connection_sexp_; + + // Define the logic here because multiple inheritance makes it difficult + // for this base class, the InputStream and the RandomAccessFile + // interfaces to co-exist. + arrow::Result<int64_t> ReadBase(int64_t nbytes, void* out) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + return SafeCallIntoR<int64_t>([&] { + cpp11::function read_bin = cpp11::package("base")["readBin"]; + cpp11::writable::raws ptype((R_xlen_t)0); + cpp11::integers n = cpp11::as_sexp<int>(nbytes); + + cpp11::sexp result = read_bin(connection_sexp_, ptype, n); + + int64_t result_size = cpp11::safe[Rf_xlength](result); + memcpy(out, cpp11::safe[RAW](result), result_size); + return result_size; + }); + } + + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadBase(int64_t nbytes) { + arrow::BufferBuilder builder; + RETURN_NOT_OK(builder.Reserve(nbytes)); + + arrow::Result<int64_t> result; + RETURN_NOT_OK(result = ReadBase(nbytes, builder.mutable_data())); + + builder.UnsafeAdvance(result.ValueOrDie()); + return builder.Finish(); + } + + arrow::Status WriteBase(const void* data, int64_t nbytes) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + auto result = SafeCallIntoR<bool>([&]() { Review Comment: I couldn't figure out how to get a template specialization to return a different type, but I implemented `Status SafeCallIntoRVoid() {}` that reads much better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org