paleolimbot commented on code in PR #12323:
URL: https://github.com/apache/arrow/pull/12323#discussion_r847573544


##########
r/src/csv.cpp:
##########
@@ -162,7 +164,19 @@ std::shared_ptr<arrow::csv::TableReader> 
csv___TableReader__Make(
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> csv___TableReader__Read(
     const std::shared_ptr<arrow::csv::TableReader>& table_reader) {
-  return ValueOrStop(table_reader->Read());
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] { fut.MarkFinished(table_reader->Read()); 
});
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to 
wrap `table_reader->Read()` with `RunWithCapturedR()`, but we need a cleaner 
way to do it than what I have here!



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp 
columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }
 
 // [[arrow::export]]
 std::shared_ptr<arrow::ipc::feather::Reader> ipc___feather___Reader__Open(
     const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
-  return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
+  std::thread* thread_ptr;
+  auto result = 
RunWithCapturedR<std::shared_ptr<arrow::ipc::feather::Reader>>([&]() {
+    auto fut = 
arrow::Future<std::shared_ptr<arrow::ipc::feather::Reader>>::Make();
+
+    thread_ptr = new std::thread(
+        [&] { fut.MarkFinished(arrow::ipc::feather::Reader::Open(stream)); });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to 
wrap `arrow::ipc::feather::Reader::Open(stream))` with `RunWithCapturedR()`, 
but we need a cleaner way to do it than what I have here!



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp 
columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to 
wrap `reader->Read()` with `RunWithCapturedR()`, but we need a cleaner way to 
do it than what I have here!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to