paleolimbot commented on pull request #12323:
URL: https://github.com/apache/arrow/pull/12323#issuecomment-1040523056


   OK, I think this is ready for review.
   
   There is a threading issue that cause `read_feather()` and 
`read_csv_arrow()` to fail, although I *think* I implemented this such that 
there will not be any calls into R from another thread (we get an `IOError` 
instead). There was a point where I had both of those working on MacOS and 
Windows, but there were crashes on Linux (and in all cases R was getting called 
from another thread, so it *might* have crashed). Interestingly, 
`read_parquet()` seems to be fine, but maybe this is only because it's a very 
small file. I am well out of my depth in dealing with concurrency and perhaps I 
am missing how we have dealt with this in other parts of the R package.
   
   In general, all writers work and any readers work that don't call the 
stream's `Read()` method from another thread. One of the original tickets for 
this requested support to stream tables over a `socketConnection()` 
(ARROW-4512), which should work with this PR in both directions. The other 
ticket requested support for Parquet reading and writing, which also seems to 
work.
   
   Reprex:
   
   ``` r
   # remotes::install_github("paleolimbot/arrow/r@r-connections")
   library(arrow, warn.conflicts = FALSE)
   tbl <- tibble::tibble(x = 1:5)
   
   # all the writers I know about just work
   tf_parquet <- tempfile()
   write_parquet(tbl, file(tf_parquet))
   
   tf_ipc <- tempfile()
   write_ipc_stream(tbl, file(tf_ipc))
   
   tf_feather <- tempfile()
   write_feather(tbl, file(tf_feather))
   
   tf_csv <- tempfile()
   write_csv_arrow(tbl, file(tf_csv))
   
   # some readers work...
   read_parquet(file(tf_parquet))
   #> # A tibble: 5 × 1
   #>       x
   #>   <int>
   #> 1     1
   #> 2     2
   #> 3     3
   #> 4     4
   #> 5     5
   read_ipc_stream(file(tf_ipc))
   #> # A tibble: 5 × 1
   #>       x
   #>   <int>
   #> 1     1
   #> 2     2
   #> 3     3
   #> 4     4
   #> 5     5
   
   # ...except the ones that read from other threads
   read_feather(file(tf_feather))
   #> Error: IOError: Attempt to call into R from a non-R thread
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/io/interfaces.cc:157
  Seek(position)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/reader.cc:1233  
ReadFooter()
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/reader.cc:1720  
result->Open(file, footer_offset, options)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/feather.cc:713  
RecordBatchFileReader::Open(source_, options_)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/feather.cc:793  
result->Open(source, options)
   read_csv_arrow(file(tf_parquet))
   #> Error in `handle_csv_read_error()` at r/R/csv.R:198:6:
   #> ! IOError: Attempt to call into R from a non-R thread
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/io/interfaces.cc:86 
 stream_->Read(block_size_)
   
   # ...even with use_threads = FALSE
   options(arrow.use_threads = FALSE)
   read_feather(file(tf_feather))
   #> Error: IOError: Attempt to call into R from a non-R thread
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/io/interfaces.cc:157
  Seek(position)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/reader.cc:1233  
ReadFooter()
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/reader.cc:1720  
result->Open(file, footer_offset, options)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/feather.cc:713  
RecordBatchFileReader::Open(source_, options_)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/ipc/feather.cc:793  
result->Open(source, options)
   read_csv_arrow(file(tf_parquet))
   #> Error in `handle_csv_read_error()` at r/R/csv.R:198:6:
   #> ! IOError: Attempt to call into R from a non-R thread
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/io/interfaces.cc:86 
 stream_->Read(block_size_)
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/util/iterator.h:270 
 it_.Next()
   #> 
/Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/csv/reader.cc:996  
buffer_iterator_.Next()
   ```
   
   <sup>Created on 2022-02-15 by the [reprex 
package](https://reprex.tidyverse.org) (v2.0.1)</sup>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to