westonpace commented on a change in pull request #9615:
URL: https://github.com/apache/arrow/pull/9615#discussion_r625721013



##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;

Review comment:
       You could use `FnOnce` in place of `std::function`.  It's in 
`arrow/util/functional.h`.  You should mostly be able to drop it in except you 
need to move the task when you run it (you might have to use a move iterator 
for `delayed_serial_tasks`) or when you pass it down into `parallel_tasks`.  
The main advantage is anything captured by the function will be released 
immediately after the function runs.  There has been some efforts in Arrow to 
be consistent with using `FnOnce` where possible (ARROW-10966, ARROW-11191).

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;
+
+  RTasks()
+      : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded(
+            arrow::internal::GetCpuThreadPool())) {}
+
+  Status Finish() {

Review comment:
       To avoid nested deadlock this method (`Finish()`) must never be called 
from a thread pool thread.  This is because `Finish()` blocks on 
`parallel_tasks`.  You should be ok with your current usage but a comment 
somewhere might not hurt.

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -46,6 +50,41 @@ using internal::MakeConverter;
 
 namespace r {
 
+class RTasks {
+ public:
+  using Task = std::function<Status()>;
+
+  RTasks()
+      : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded(
+            arrow::internal::GetCpuThreadPool())) {}
+
+  Status Finish() {
+    Status status = Status::OK();
+
+    // run the delayed tasks now
+    for (auto& task : delayed_serial_tasks) {
+      status &= task();
+      if (!status.ok()) break;
+    }
+
+    // then wait for the parallel tasks to finish
+    status &= parallel_tasks->Finish();

Review comment:
       It would be nice if there were a good way to trigger the parallel_tasks 
to fail early if `status` was not ok here (and we broke out of the serial loop 
above).

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -168,46 +207,85 @@ bool is_NA<int64_t>(int64_t value) {
 }
 
 template <typename T>
-struct RVectorVisitor {
+class RVectorIterator {
+ public:
+  using value_type = T;
+  RVectorIterator(SEXP x, int64_t start)
+      : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {}
+
+  RVectorIterator& operator++() {
+    ++ptr_x_;
+    return *this;
+  }
+
+  const T operator*() const { return *ptr_x_; }
+
+ private:
+  const T* ptr_x_;
+};
+
+template <typename T>
+class RVectorIterator_ALTREP {
+ public:
+  using value_type = T;
   using data_type =
       typename std::conditional<std::is_same<T, int64_t>::value, double, 
T>::type;
   using r_vector_type = cpp11::r_vector<data_type>;
+  using r_vector_iterator = typename r_vector_type::const_iterator;
 
-  template <typename AppendNull, typename AppendValue>
-  static Status Visit(SEXP x, int64_t size, AppendNull&& append_null,
-                      AppendValue&& append_value) {
-    r_vector_type values(x);
-    auto it = values.begin();
-
-    for (R_xlen_t i = 0; i < size; i++, ++it) {
-      auto value = GetValue(*it);
-
-      if (is_NA<T>(value)) {
-        RETURN_NOT_OK(append_null());
-      } else {
-        RETURN_NOT_OK(append_value(value));
-      }
-    }
+  RVectorIterator_ALTREP(SEXP x, int64_t start)
+      : vector_(x), it_(vector_.begin() + start) {}
 
-    return Status::OK();
+  RVectorIterator_ALTREP& operator++() {
+    ++it_;
+    return *this;
   }
 
+  const T operator*() const { return GetValue(*it_); }
+
   static T GetValue(data_type x) { return x; }
+
+ private:
+  r_vector_type vector_;
+  r_vector_iterator it_;
 };
 
 template <>
-int64_t RVectorVisitor<int64_t>::GetValue(double x) {
+int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) {
   int64_t value;
   memcpy(&value, &x, sizeof(int64_t));
   return value;
 }
 
+template <typename Iterator, typename AppendNull, typename AppendValue>
+Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null,
+                   AppendValue&& append_value) {
+  for (R_xlen_t i = 0; i < n; i++, ++it) {

Review comment:
       Minor: You're pretty close to being able to use a range-based for loop 
here.  I'm not sure how difficult it would be to create an `end()` pointer and 
an iterator equality function.

##########
File path: r/src/r_to_arrow.cpp
##########
@@ -168,46 +207,85 @@ bool is_NA<int64_t>(int64_t value) {
 }
 
 template <typename T>
-struct RVectorVisitor {
+class RVectorIterator {
+ public:
+  using value_type = T;
+  RVectorIterator(SEXP x, int64_t start)
+      : ptr_x_(reinterpret_cast<const T*>(DATAPTR_RO(x)) + start) {}
+
+  RVectorIterator& operator++() {
+    ++ptr_x_;
+    return *this;
+  }
+
+  const T operator*() const { return *ptr_x_; }
+
+ private:
+  const T* ptr_x_;
+};
+
+template <typename T>
+class RVectorIterator_ALTREP {
+ public:
+  using value_type = T;
   using data_type =
       typename std::conditional<std::is_same<T, int64_t>::value, double, 
T>::type;
   using r_vector_type = cpp11::r_vector<data_type>;
+  using r_vector_iterator = typename r_vector_type::const_iterator;
 
-  template <typename AppendNull, typename AppendValue>
-  static Status Visit(SEXP x, int64_t size, AppendNull&& append_null,
-                      AppendValue&& append_value) {
-    r_vector_type values(x);
-    auto it = values.begin();
-
-    for (R_xlen_t i = 0; i < size; i++, ++it) {
-      auto value = GetValue(*it);
-
-      if (is_NA<T>(value)) {
-        RETURN_NOT_OK(append_null());
-      } else {
-        RETURN_NOT_OK(append_value(value));
-      }
-    }
+  RVectorIterator_ALTREP(SEXP x, int64_t start)
+      : vector_(x), it_(vector_.begin() + start) {}
 
-    return Status::OK();
+  RVectorIterator_ALTREP& operator++() {
+    ++it_;
+    return *this;
   }
 
+  const T operator*() const { return GetValue(*it_); }
+
   static T GetValue(data_type x) { return x; }
+
+ private:
+  r_vector_type vector_;
+  r_vector_iterator it_;
 };
 
 template <>
-int64_t RVectorVisitor<int64_t>::GetValue(double x) {
+int64_t RVectorIterator_ALTREP<int64_t>::GetValue(double x) {
   int64_t value;
   memcpy(&value, &x, sizeof(int64_t));
   return value;
 }
 
+template <typename Iterator, typename AppendNull, typename AppendValue>
+Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null,
+                   AppendValue&& append_value) {
+  for (R_xlen_t i = 0; i < n; i++, ++it) {
+    auto value = *it;
+
+    if (is_NA<typename Iterator::value_type>(value)) {
+      RETURN_NOT_OK(append_null());
+    } else {
+      RETURN_NOT_OK(append_value(value));
+    }
+  }
+
+  return Status::OK();
+}
+
 class RConverter : public Converter<SEXP, RConversionOptions> {
  public:
   virtual Status Append(SEXP) { return Status::NotImplemented("Append"); }
 
   virtual Status Extend(SEXP values, int64_t size) {
-    return Status::NotImplemented("ExtendMasked");
+    return Status::NotImplemented("Extend");
+  }
+
+  // by default, just delay the ->Extend(), i.e. not run in parallel
+  // implementations might redefine so that ->Extend() is run in parallel
+  virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) {
+    auto task = [this, values, size]() { return this->Extend(values, size); };

Review comment:
       The `this` capture is possibly worrisome.  It seems you should be ok 
since you block `Table__from_dots` until `Finish` is done but what if an error 
happens on line 1332 and `StopIfNotOk` bails out.  Is it possible it leaves 
(and allows `this` to go out of scope) before all the parallel tests have been 
cleared or flushed through?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to