romainfrancois commented on a change in pull request #9615: URL: https://github.com/apache/arrow/pull/9615#discussion_r628202812
########## File path: r/src/r_to_arrow.cpp ########## @@ -168,46 +207,85 @@ bool is_NA<int64_t>(int64_t value) { } template <typename T> -struct RVectorVisitor { +class RVectorIterator { + public: + using value_type = T; + RVectorIterator(SEXP x, int64_t start) + : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {} + + RVectorIterator& operator++() { + ++ptr_x_; + return *this; + } + + const T operator*() const { return *ptr_x_; } + + private: + const T* ptr_x_; +}; + +template <typename T> +class RVectorIterator_ALTREP { + public: + using value_type = T; using data_type = typename std::conditional<std::is_same<T, int64_t>::value, double, T>::type; using r_vector_type = cpp11::r_vector<data_type>; + using r_vector_iterator = typename r_vector_type::const_iterator; - template <typename AppendNull, typename AppendValue> - static Status Visit(SEXP x, int64_t size, AppendNull&& append_null, - AppendValue&& append_value) { - r_vector_type values(x); - auto it = values.begin(); - - for (R_xlen_t i = 0; i < size; i++, ++it) { - auto value = GetValue(*it); - - if (is_NA<T>(value)) { - RETURN_NOT_OK(append_null()); - } else { - RETURN_NOT_OK(append_value(value)); - } - } + RVectorIterator_ALTREP(SEXP x, int64_t start) + : vector_(x), it_(vector_.begin() + start) {} - return Status::OK(); + RVectorIterator_ALTREP& operator++() { + ++it_; + return *this; } + const T operator*() const { return GetValue(*it_); } + static T GetValue(data_type x) { return x; } + + private: + r_vector_type vector_; + r_vector_iterator it_; }; template <> -int64_t RVectorVisitor<int64_t>::GetValue(double x) { +int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) { int64_t value; memcpy(&value, &x, sizeof(int64_t)); return value; } +template <typename Iterator, typename AppendNull, typename AppendValue> +Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null, + AppendValue&& append_value) { + for (R_xlen_t i = 0; i < n; i++, ++it) { + auto value = *it; + + if (is_NA<typename Iterator::value_type>(value)) { + RETURN_NOT_OK(append_null()); + } else { + RETURN_NOT_OK(append_value(value)); + } + } + + return Status::OK(); +} + class RConverter : public Converter<SEXP, RConversionOptions> { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } virtual Status Extend(SEXP values, int64_t size) { - return Status::NotImplemented("ExtendMasked"); + return Status::NotImplemented("Extend"); + } + + // by default, just delay the ->Extend(), i.e. not run in parallel + // implementations might redefine so that ->Extend() is run in parallel + virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) { + auto task = [this, values, size]() { return this->Extend(values, size); }; Review comment: yes definitely, I thought I had dealt with all the stop cases, we can only do a `StopIfNotOk()` once all concurrency is finished -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org