This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ec690  ARROW-3490: [R] streaming of arrow objects to streams
d3ec690 is described below

commit d3ec69069649013229366ebe01e22f389597dc19
Author: Romain Francois <[email protected]>
AuthorDate: Thu Oct 18 10:06:08 2018 -0400

    ARROW-3490: [R] streaming of arrow objects to streams
    
    This makes `write_record_batch` and `write_table` generic with dispatch on 
the stream type.
    
    ```r
    write_record_batch <- function(x, stream, ...){
      UseMethod("write_record_batch", stream)
    }
    write_table <- function(x, stream, ...) {
      UseMethod("write_table", stream)
    }
    ```
    
    The `stream` argument can be various things for different use cases:
    
    - an `arrow::pic::RecordBatchWriter` created either with 
`record_batch_stream_writer()` or `record_batch_file_writer()`. This is the 
lowest level and that calls its `$WriteBatch()` or `$WriteTable()` method 
depending on what is being streamed
    
    - an `arrow::io::OutputStream` : this first creates an 
`arrow::ipc::RecordBatchStreamWriter` and streams into it. In particular this 
*does not* add the bytes of arrow files.
    
    - an `fs_path` from 📦 `fs` : this opens a 
`arrow::ipc::RecordBatchFileWriter` and streams to it, so that the file gets 
the ARROW1 bytes
    
    - A `character`, we just assert it is of length one and then call the 
`fs_path` method
    
    - A `raw()` which is just used for its type, in that case we stream into a 
byte buffer and returns it as a raw vector
    
    Some examples:
    
    ``` r
    library(arrow)
    tbl <- tibble::tibble(
      int = 1:10, dbl = as.numeric(1:10),
      lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
      chr = letters[1:10]
    )
    batch <- record_batch(tbl)
    
    tf <- tempfile()
    
    # stream the batch to the file
    write_record_batch(batch, tf)
    
    # same
    write_record_batch(batch, fs::path_abs(tf))
    
    # to an InputStream
    file_stream <- file_output_stream(tf)
    write_record_batch(batch, file_stream)
    file_stream$Close()
    
    # to a RecordBatchFileWriter
    file_stream <- file_output_stream(tf)
    file_writer <- record_batch_file_writer(file_stream,  batch$schema())
    write_record_batch(batch, file_writer)
    file_writer$Close()
    file_stream$Close()
    
    # get the bytes directly
    write_record_batch(batch, raw())
    #>   [1] 04 01 00 00 10 00 00 00 00 00 0a 00 0c 00 06 00 05 00 08 00 0a 00 
00
    #>  [24] 00 00 01 03 00 0c 00 00 00 08 00 08 00 00 00 04 00 08 00 00 00 04 
00
    #>  [47] 00 00 04 00 00 00 9c 00 00 00 58 00 00 00 2c 00 00 00 04 00 00 00 
84
    #>  [70] ff ff ff 00 00 01 05 14 00 00 00 0c 00 00 00 04 00 00 00 00 00 00 
00
    #>  [93] dc ff ff ff 03 00 00 00 63 68 72 00 a8 ff ff ff 00 00 01 06 18 00 
00
    #> [116] 00 10 00 00 00 04 00 00 00 00 00 00 00 04 00 04 00 04 00 00 00 03 
00
    #> [139] 00 00 6c 67 6c 00 d0 ff ff ff 00 00 01 03 20 00 00 00 14 00 00 00 
04
    #> [162] 00 00 00 00 00 00 00 00 00 06 00 08 00 06 00 06 00 00 00 00 00 02 
00
    #> [185] 03 00 00 00 64 62 6c 00 10 00 14 00 08 00 06 00 07 00 0c 00 00 00 
10
    #> [208] 00 10 00 00 00 00 00 01 02 24 00 00 00 14 00 00 00 04 00 00 00 00 
00
    #> [231] 00 00 08 00 0c 00 08 00 07 00 08 00 00 00 00 00 00 01 20 00 00 00 
03
    #> [254] 00 00 00 69 6e 74 00 00 00 00 00 2c 01 00 00 14 00 00 00 00 00 00 
00
    #> [277] 0c 00 16 00 06 00 05 00 08 00 0c 00 0c 00 00 00 00 03 03 00 18 00 
00
    #> [300] 00 c8 00 00 00 00 00 00 00 00 00 0a 00 18 00 0c 00 04 00 08 00 0a 
00
    #> [323] 00 00 ac 00 00 00 10 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 
09
    #> [346] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 
00
    #> [369] 00 00 00 00 28 00 00 00 00 00 00 00 28 00 00 00 00 00 00 00 00 00 
00
    #> [392] 00 00 00 00 00 28 00 00 00 00 00 00 00 50 00 00 00 00 00 00 00 78 
00
    #> [415] 00 00 00 00 00 00 08 00 00 00 00 00 00 00 80 00 00 00 00 00 00 00 
08
    #> [438] 00 00 00 00 00 00 00 88 00 00 00 00 00 00 00 00 00 00 00 00 00 00 
00
    #> [461] 88 00 00 00 00 00 00 00 30 00 00 00 00 00 00 00 b8 00 00 00 00 00 
00
    #> [484] 00 10 00 00 00 00 00 00 00 00 00 00 00 04 00 00 00 0a 00 00 00 00 
00
    #> [507] 00 00 00 00 00 00 00 00 00 00 0a 00 00 00 00 00 00 00 00 00 00 00 
00
    #> [530] 00 00 00 0a 00 00 00 00 00 00 00 03 00 00 00 00 00 00 00 0a 00 00 
00
    #> [553] 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 00 00 02 00 
00
    #> [576] 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00 08 
00
    #> [599] 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 00 00 f0 3f 00 00 00 00 
00
    #> [622] 00 00 40 00 00 00 00 00 00 08 40 00 00 00 00 00 00 10 40 00 00 00 
00
    #> [645] 00 00 14 40 00 00 00 00 00 00 18 40 00 00 00 00 00 00 1c 40 00 00 
00
    #> [668] 00 00 00 20 40 00 00 00 00 00 00 22 40 00 00 00 00 00 00 24 40 6b 
03
    #> [691] 00 00 00 00 00 00 22 01 00 00 00 00 00 00 00 00 00 00 01 00 00 00 
02
    #> [714] 00 00 00 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 
00
    #> [737] 08 00 00 00 09 00 00 00 0a 00 00 00 00 00 00 00 61 62 63 64 65 66 
67
    #> [760] 68 69 6a 00 00 00 00 00 00 00 00 00 00
    ```
    
    Created on 2018-10-12 by the [reprex package](https://reprex.tidyverse.org) 
(v0.2.1.9000)
    
    Author: Romain Francois <[email protected]>
    
    Closes #2749 from romainfrancois/ARROW-3490/stream-2 and squashes the 
following commits:
    
    ce4ec06a4 <Romain Francois> type promotion for types that do not exist in R
    338f75f0b <Romain Francois> More flexible read_table
    5cb8dbd65 <Romain Francois> more flexible read_record_batch with various 
dispatch
    072b7f0f1 <Romain Francois> + BufferOutputStream
    f301f1e52 <Romain Francois> :rewind: to write_record_batch, write_table and 
write_arrow
    a17b37582 <Romain Francois> Trying less double dispatch
    9b9a6b85b <Romain Francois> roxygen
    2ae2ab386 <Romain Francois> - to_file and to_stream - write_arrow + 
stream.data.frame
    8d0e581ba <Romain Francois> stream.arrow::Table methods
    e1f62cc07 <Romain Francois> R6 arrrow::io::FixedSizeBufferWriter
    80ea2b75f <Romain Francois> R6 arrow::io::MockOutputSream
    a93933a0c <Romain Francois> +close_on_exit, local_tempfile
    ac20df3b6 <Romain Francois> + stream
---
 r/DESCRIPTION                       |   9 +-
 r/NAMESPACE                         |  39 +++-
 r/R/R6.R                            |   4 +
 r/R/RcppExports.R                   | 132 +++++++++---
 r/R/RecordBatch.R                   |  46 +----
 r/R/RecordBatchReader.R             | 188 +++++++++++++++++
 r/R/RecordBatchWriter.R             | 191 ++++++++++++++++++
 r/R/Table.R                         |  53 +----
 r/R/buffer.R                        |   2 +
 r/R/enums.R                         |   2 -
 r/R/io.R                            |  65 ++++++
 r/R/on_exit.R                       |  28 +++
 r/data-raw/test.R                   |   6 +-
 r/man/io.Rd                         |  16 ++
 r/man/read_record_batch.Rd          |   6 +-
 r/man/read_table.Rd                 |   2 +-
 r/man/record_batch_file_reader.Rd   |  14 ++
 r/man/record_batch_file_writer.Rd   |  19 ++
 r/man/record_batch_stream_reader.Rd |  14 ++
 r/man/record_batch_stream_writer.Rd |  16 ++
 r/man/write_arrow.Rd                |  14 +-
 r/man/write_record_batch.Rd         |  18 ++
 r/man/write_table.Rd                |  18 ++
 r/src/DataType.cpp                  |   3 +
 r/src/RcppExports.cpp               | 389 +++++++++++++++++++++++++++---------
 r/src/RecordBatch.cpp               |  60 ------
 r/src/RecordBatchReader.cpp         | 104 ++++++++++
 r/src/RecordBatchWriter.cpp         |  58 ++++++
 r/src/Table.cpp                     |  77 -------
 r/src/array.cpp                     |  61 +++++-
 r/src/arrow_types.h                 |   9 +-
 r/src/io.cpp                        |  77 +++++++
 r/tests/testthat/test-RecordBatch.R |  38 ++--
 r/tests/testthat/test-Table.R       |  20 +-
 r/tests/testthat/test-read-write.R  |   8 +-
 35 files changed, 1392 insertions(+), 414 deletions(-)

diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index ceab62a..66fc42a 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -24,10 +24,12 @@ Imports:
     vctrs (>= 0.0.0.9000),
     fs,
     tibble,
-    crayon
+    crayon,
+    withr
 Remotes:
     r-lib/vctrs, 
-    RcppCore/Rcpp
+    RcppCore/Rcpp, 
+    romainfrancois/withr@bug-79/defer
 Roxygen: list(markdown = TRUE)
 RoxygenNote: 6.1.0.9000
 Suggests:
@@ -43,6 +45,8 @@ Collate:
     'List.R'
     'RcppExports.R'
     'RecordBatch.R'
+    'RecordBatchReader.R'
+    'RecordBatchWriter.R'
     'Schema.R'
     'Struct.R'
     'Table.R'
@@ -51,5 +55,6 @@ Collate:
     'dictionary.R'
     'io.R'
     'memory_pool.R'
+    'on_exit.R'
     'reexports-tibble.R'
     'zzz.R'
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 649efe7..b9967a3 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -13,19 +13,43 @@ S3method(buffer,numeric)
 S3method(buffer,raw)
 S3method(buffer_reader,"arrow::Buffer")
 S3method(buffer_reader,default)
+S3method(fixed_size_buffer_writer,"arrow::Buffer")
+S3method(fixed_size_buffer_writer,default)
 S3method(length,"arrow::Array")
 S3method(names,"arrow::RecordBatch")
 S3method(print,"arrow-enum")
 S3method(read_record_batch,"arrow::io::BufferReader")
 S3method(read_record_batch,"arrow::io::RandomAccessFile")
+S3method(read_record_batch,"arrow::ipc::RecordBatchFileReader")
+S3method(read_record_batch,"arrow::ipc::RecordBatchStreamReader")
 S3method(read_record_batch,character)
 S3method(read_record_batch,fs_path)
 S3method(read_record_batch,raw)
 S3method(read_table,"arrow::io::BufferReader")
 S3method(read_table,"arrow::io::RandomAccessFile")
+S3method(read_table,"arrow::ipc::RecordBatchFileReader")
+S3method(read_table,"arrow::ipc::RecordBatchStreamReader")
 S3method(read_table,character)
 S3method(read_table,fs_path)
 S3method(read_table,raw)
+S3method(record_batch_file_reader,"arrow::io::RandomAccessFile")
+S3method(record_batch_file_reader,character)
+S3method(record_batch_file_reader,fs_path)
+S3method(record_batch_stream_reader,"arrow::io::InputStream")
+S3method(record_batch_stream_reader,raw)
+S3method(write_arrow,"arrow::RecordBatch")
+S3method(write_arrow,"arrow::Table")
+S3method(write_arrow,data.frame)
+S3method(write_record_batch,"arrow::io::OutputStream")
+S3method(write_record_batch,"arrow::ipc::RecordBatchWriter")
+S3method(write_record_batch,character)
+S3method(write_record_batch,fs_path)
+S3method(write_record_batch,raw)
+S3method(write_table,"arrow::io::OutputStream")
+S3method(write_table,"arrow::ipc::RecordBatchWriter")
+S3method(write_table,character)
+S3method(write_table,fs_path)
+S3method(write_table,raw)
 export(DateUnit)
 export(FileMode)
 export(StatusCode)
@@ -35,6 +59,7 @@ export(array)
 export(as_tibble)
 export(boolean)
 export(buffer)
+export(buffer_output_stream)
 export(buffer_reader)
 export(chunked_array)
 export(date32)
@@ -42,6 +67,8 @@ export(date64)
 export(decimal)
 export(dictionary)
 export(file_open)
+export(file_output_stream)
+export(fixed_size_buffer_writer)
 export(float16)
 export(float32)
 export(float64)
@@ -52,11 +79,16 @@ export(int8)
 export(list_of)
 export(mmap_create)
 export(mmap_open)
+export(mock_output_stream)
 export(null)
 export(read_arrow)
 export(read_record_batch)
 export(read_table)
 export(record_batch)
+export(record_batch_file_reader)
+export(record_batch_file_writer)
+export(record_batch_stream_reader)
+export(record_batch_stream_writer)
 export(schema)
 export(struct)
 export(table)
@@ -69,17 +101,16 @@ export(uint64)
 export(uint8)
 export(utf8)
 export(write_arrow)
+export(write_record_batch)
+export(write_table)
 importFrom(R6,R6Class)
 importFrom(Rcpp,sourceCpp)
 importFrom(assertthat,assert_that)
 importFrom(glue,glue)
 importFrom(purrr,map)
 importFrom(purrr,map2)
-importFrom(purrr,map_chr)
 importFrom(purrr,map_int)
 importFrom(rlang,dots_n)
-importFrom(rlang,quo_name)
-importFrom(rlang,seq2)
-importFrom(rlang,set_names)
 importFrom(tibble,as_tibble)
+importFrom(withr,defer_parent)
 useDynLib(arrow, .registration = TRUE)
diff --git a/r/R/R6.R b/r/R/R6.R
index 734ddc0..752ad59 100644
--- a/r/R/R6.R
+++ b/r/R/R6.R
@@ -40,6 +40,10 @@
     },
     pointer_address = function(){
       Object__pointer_address(self$pointer())
+    },
+
+    is_null = function(){
+      Object__is_null(self)
     }
   )
 )
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 8ca9fd6..915cd4d 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -333,6 +333,10 @@ Object__pointer_address <- function(obj) {
     .Call(`_arrow_Object__pointer_address`, obj)
 }
 
+Object__is_null <- function(obj) {
+    .Call(`_arrow_Object__is_null`, obj)
+}
+
 DictionaryType__initialize <- function(type, array, ordered) {
     .Call(`_arrow_DictionaryType__initialize`, type, array, ordered)
 }
@@ -377,6 +381,10 @@ io___InputStream__Close <- function(x) {
     invisible(.Call(`_arrow_io___InputStream__Close`, x))
 }
 
+io___OutputStream__Close <- function(x) {
+    invisible(.Call(`_arrow_io___OutputStream__Close`, x))
+}
+
 io___RandomAccessFile__GetSize <- function(x) {
     .Call(`_arrow_io___RandomAccessFile__GetSize`, x)
 }
@@ -413,6 +421,42 @@ io___BufferReader__initialize <- function(buffer) {
     .Call(`_arrow_io___BufferReader__initialize`, buffer)
 }
 
+io___FileOutputStream__Open <- function(path) {
+    .Call(`_arrow_io___FileOutputStream__Open`, path)
+}
+
+io___BufferOutputStream__Create <- function(initial_capacity) {
+    .Call(`_arrow_io___BufferOutputStream__Create`, initial_capacity)
+}
+
+io___BufferOutputStream__capacity <- function(stream) {
+    .Call(`_arrow_io___BufferOutputStream__capacity`, stream)
+}
+
+io___BufferOutputStream__Finish <- function(stream) {
+    .Call(`_arrow_io___BufferOutputStream__Finish`, stream)
+}
+
+io___BufferOutputStream__Tell <- function(stream) {
+    .Call(`_arrow_io___BufferOutputStream__Tell`, stream)
+}
+
+io___BufferOutputStream__Write <- function(stream, bytes) {
+    invisible(.Call(`_arrow_io___BufferOutputStream__Write`, stream, bytes))
+}
+
+io___MockOutputStream__initialize <- function() {
+    .Call(`_arrow_io___MockOutputStream__initialize`)
+}
+
+io___MockOutputStream__GetExtentBytesWritten <- function(stream) {
+    .Call(`_arrow_io___MockOutputStream__GetExtentBytesWritten`, stream)
+}
+
+io___FixedSizeBufferWriter__initialize <- function(buffer) {
+    .Call(`_arrow_io___FixedSizeBufferWriter__initialize`, buffer)
+}
+
 MemoryPool__default <- function() {
     .Call(`_arrow_MemoryPool__default`)
 }
@@ -445,22 +489,6 @@ RecordBatch__to_dataframe <- function(batch) {
     .Call(`_arrow_RecordBatch__to_dataframe`, batch)
 }
 
-read_record_batch_RandomAccessFile <- function(stream) {
-    .Call(`_arrow_read_record_batch_RandomAccessFile`, stream)
-}
-
-read_record_batch_BufferReader <- function(stream) {
-    .Call(`_arrow_read_record_batch_BufferReader`, stream)
-}
-
-RecordBatch__to_file <- function(batch, path) {
-    .Call(`_arrow_RecordBatch__to_file`, batch, path)
-}
-
-RecordBatch__to_stream <- function(batch) {
-    .Call(`_arrow_RecordBatch__to_stream`, batch)
-}
-
 RecordBatch__from_dataframe <- function(tbl) {
     .Call(`_arrow_RecordBatch__from_dataframe`, tbl)
 }
@@ -489,36 +517,76 @@ RecordBatch__Slice2 <- function(self, offset, length) {
     .Call(`_arrow_RecordBatch__Slice2`, self, offset, length)
 }
 
-Table__from_dataframe <- function(tbl) {
-    .Call(`_arrow_Table__from_dataframe`, tbl)
+RecordBatchReader__schema <- function(reader) {
+    .Call(`_arrow_RecordBatchReader__schema`, reader)
 }
 
-Table__num_columns <- function(x) {
-    .Call(`_arrow_Table__num_columns`, x)
+RecordBatchReader__ReadNext <- function(reader) {
+    .Call(`_arrow_RecordBatchReader__ReadNext`, reader)
 }
 
-Table__num_rows <- function(x) {
-    .Call(`_arrow_Table__num_rows`, x)
+ipc___RecordBatchStreamReader__Open <- function(stream) {
+    .Call(`_arrow_ipc___RecordBatchStreamReader__Open`, stream)
 }
 
-Table__schema <- function(x) {
-    .Call(`_arrow_Table__schema`, x)
+ipc___RecordBatchFileReader__schema <- function(reader) {
+    .Call(`_arrow_ipc___RecordBatchFileReader__schema`, reader)
+}
+
+ipc___RecordBatchFileReader__num_record_batches <- function(reader) {
+    .Call(`_arrow_ipc___RecordBatchFileReader__num_record_batches`, reader)
+}
+
+ipc___RecordBatchFileReader__ReadRecordBatch <- function(reader, i) {
+    .Call(`_arrow_ipc___RecordBatchFileReader__ReadRecordBatch`, reader, i)
+}
+
+ipc___RecordBatchFileReader__Open <- function(file) {
+    .Call(`_arrow_ipc___RecordBatchFileReader__Open`, file)
+}
+
+Table__from_RecordBatchFileReader <- function(reader) {
+    .Call(`_arrow_Table__from_RecordBatchFileReader`, reader)
+}
+
+Table__from_RecordBatchStreamReader <- function(reader) {
+    .Call(`_arrow_Table__from_RecordBatchStreamReader`, reader)
+}
+
+ipc___RecordBatchFileWriter__Open <- function(stream, schema) {
+    .Call(`_arrow_ipc___RecordBatchFileWriter__Open`, stream, schema)
 }
 
-Table__to_file <- function(table, path) {
-    .Call(`_arrow_Table__to_file`, table, path)
+ipc___RecordBatchStreamWriter__Open <- function(stream, schema) {
+    .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema)
 }
 
-Table__to_stream <- function(table) {
-    .Call(`_arrow_Table__to_stream`, table)
+ipc___RecordBatchWriter__WriteRecordBatch <- function(batch_writer, batch, 
allow_64bit) {
+    invisible(.Call(`_arrow_ipc___RecordBatchWriter__WriteRecordBatch`, 
batch_writer, batch, allow_64bit))
 }
 
-read_table_RandomAccessFile <- function(stream) {
-    .Call(`_arrow_read_table_RandomAccessFile`, stream)
+ipc___RecordBatchWriter__WriteTable <- function(batch_writer, table) {
+    invisible(.Call(`_arrow_ipc___RecordBatchWriter__WriteTable`, 
batch_writer, table))
 }
 
-read_table_BufferReader <- function(stream) {
-    .Call(`_arrow_read_table_BufferReader`, stream)
+ipc___RecordBatchWriter__Close <- function(batch_writer) {
+    invisible(.Call(`_arrow_ipc___RecordBatchWriter__Close`, batch_writer))
+}
+
+Table__from_dataframe <- function(tbl) {
+    .Call(`_arrow_Table__from_dataframe`, tbl)
+}
+
+Table__num_columns <- function(x) {
+    .Call(`_arrow_Table__num_columns`, x)
+}
+
+Table__num_rows <- function(x) {
+    .Call(`_arrow_Table__num_rows`, x)
+}
+
+Table__schema <- function(x) {
+    .Call(`_arrow_Table__schema`, x)
 }
 
 Table__to_dataframe <- function(table) {
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 4e2e581..e0866f6 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -22,8 +22,6 @@
     num_columns = function() RecordBatch__num_columns(self),
     num_rows = function() RecordBatch__num_rows(self),
     schema = function() `arrow::Schema`$new(RecordBatch__schema(self)),
-    to_file = function(path) invisible(RecordBatch__to_file(self, 
fs::path_abs(path))),
-    to_stream = function() RecordBatch__to_stream(self),
     column = function(i) `arrow::Array`$new(RecordBatch__column(self, i)),
     column_name = function(i) RecordBatch__column_name(self, i),
     names = function() RecordBatch__names(self),
@@ -40,7 +38,9 @@
       } else {
         `arrow::RecordBatch`$new(RecordBatch__Slice2(self, offset, length))
       }
-    }
+    },
+
+    serialize = function(output_stream, ...) write_record_batch(self, 
output_stream, ...)
   )
 )
 
@@ -67,43 +67,3 @@
 record_batch <- function(.data){
   `arrow::RecordBatch`$new(RecordBatch__from_dataframe(.data))
 }
-
-#' Read a single record batch from a stream
-#'
-#' @param stream input stream
-#'
-#' @details `stream` can be a `arrow::io::RandomAccessFile` stream as created 
by [file_open()] or [mmap_open()] or a path.
-#'
-#' @export
-read_record_batch <- function(stream){
-  UseMethod("read_record_batch")
-}
-
-#' @export
-read_record_batch.character <- function(stream){
-  assert_that(length(stream) == 1L)
-  read_record_batch(fs::path_abs(stream))
-}
-
-#' @export
-read_record_batch.fs_path <- function(stream){
-  stream <- file_open(stream); on.exit(stream$Close())
-  read_record_batch(stream)
-}
-
-#' @export
-`read_record_batch.arrow::io::RandomAccessFile` <- function(stream){
-  `arrow::RecordBatch`$new(read_record_batch_RandomAccessFile(stream))
-}
-
-#' @export
-`read_record_batch.arrow::io::BufferReader` <- function(stream){
-  `arrow::RecordBatch`$new(read_record_batch_BufferReader(stream))
-}
-
-#' @export
-read_record_batch.raw <- function(stream){
-  stream <- buffer_reader(stream); on.exit(stream$Close())
-  read_record_batch(stream)
-}
-
diff --git a/r/R/RecordBatchReader.R b/r/R/RecordBatchReader.R
new file mode 100644
index 0000000..b7c8b00
--- /dev/null
+++ b/r/R/RecordBatchReader.R
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include R6.R
+
+`arrow::RecordBatchReader` <- R6Class("arrow::RecordBatchReader", inherit = 
`arrow::Object`,
+  public = list(
+    schema = function() `arrow::Schema`$new(RecordBatchReader__schema(self)),
+    ReadNext = function() {
+      `arrow::RecordBatch`$new(RecordBatchReader__ReadNext(self))
+    }
+  )
+)
+
+`arrow::ipc::RecordBatchStreamReader` <- 
R6Class("arrow::ipc::RecordBatchStreamReader", inherit = 
`arrow::RecordBatchReader`)
+
+`arrow::ipc::RecordBatchFileReader` <- 
R6Class("arrow::ipc::RecordBatchFileReader", inherit = `arrow::Object`,
+  public = list(
+    schema = function() 
`arrow::Schema`$new(ipc___RecordBatchFileReader__schema(self)),
+    num_record_batches = function() 
ipc___RecordBatchFileReader__num_record_batches(self),
+    ReadRecordBatch = function(i) 
`arrow::RecordBatch`$new(ipc___RecordBatchFileReader__ReadRecordBatch(self, i))
+  )
+)
+
+
+#' Create a `arrow::ipc::RecordBatchStreamReader` from an input stream
+#'
+#' @param stream input stream
+#' @export
+record_batch_stream_reader <- function(stream){
+  UseMethod("record_batch_stream_reader")
+}
+
+#' @export
+`record_batch_stream_reader.arrow::io::InputStream` <- function(stream) {
+  
`arrow::ipc::RecordBatchStreamReader`$new(ipc___RecordBatchStreamReader__Open(stream))
+}
+
+#' @export
+`record_batch_stream_reader.raw` <- function(stream) {
+  record_batch_stream_reader(buffer_reader(stream))
+}
+
+
+#' Create an `arrow::ipc::RecordBatchFileReader` from a file
+#'
+#' @param file The file to read from
+#'
+#' @export
+record_batch_file_reader <- function(file) {
+  UseMethod("record_batch_file_reader")
+}
+
+#' @export
+`record_batch_file_reader.arrow::io::RandomAccessFile` <- function(file) {
+  
`arrow::ipc::RecordBatchFileReader`$new(ipc___RecordBatchFileReader__Open(file))
+}
+
+#' @export
+`record_batch_file_reader.character` <- function(file) {
+  assert_that(length(file) == 1L)
+  record_batch_file_reader(fs::path_abs(file))
+}
+
+#' @export
+`record_batch_file_reader.fs_path` <- function(file) {
+  record_batch_file_reader(file_open(file))
+}
+
+#-------- read_record_batch
+
+#' Read a single record batch from a stream
+#'
+#' @param stream input stream
+#' @param ... additional parameters
+#'
+#' @details `stream` can be a `arrow::io::RandomAccessFile` stream as created 
by [file_open()] or [mmap_open()] or a path.
+#'
+#' @export
+read_record_batch <- function(stream, ...){
+  UseMethod("read_record_batch")
+}
+
+#' @export
+read_record_batch.character <- function(stream, ...){
+  assert_that(length(stream) == 1L)
+  read_record_batch(fs::path_abs(stream))
+}
+
+#' @export
+read_record_batch.fs_path <- function(stream, ...){
+  stream <- close_on_exit(file_open(stream))
+  read_record_batch(stream)
+}
+
+#' @export
+`read_record_batch.arrow::io::RandomAccessFile` <- function(stream, ...){
+  reader <- record_batch_file_reader(stream)
+  reader$ReadRecordBatch(0)
+}
+
+#' @export
+`read_record_batch.arrow::io::BufferReader` <- function(stream, ...){
+  reader <- record_batch_stream_reader(stream)
+  reader$ReadNext()
+}
+
+#' @export
+read_record_batch.raw <- function(stream, ...){
+  stream <- close_on_exit(buffer_reader(stream))
+  read_record_batch(stream)
+}
+
+#' @export
+`read_record_batch.arrow::ipc::RecordBatchStreamReader` <- function(stream, 
...) {
+  stream$ReadNext()
+}
+
+#' @export
+`read_record_batch.arrow::ipc::RecordBatchFileReader` <- function(stream, i = 
0, ...) {
+  stream$ReadRecordBatch(i)
+}
+
+#--------- read_table
+
+#' Read an arrow::Table from a stream
+#'
+#' @param stream stream. Either a stream created by [file_open()] or 
[mmap_open()] or a file path.
+#'
+#' @export
+read_table <- function(stream){
+  UseMethod("read_table")
+}
+
+#' @export
+read_table.character <- function(stream){
+  assert_that(length(stream) == 1L)
+  read_table(fs::path_abs(stream))
+}
+
+#' @export
+read_table.fs_path <- function(stream) {
+  stream <- close_on_exit(file_open(stream))
+  read_table(stream)
+}
+
+#' @export
+`read_table.arrow::io::RandomAccessFile` <- function(stream) {
+  reader <- record_batch_file_reader(stream)
+  read_table(reader)
+}
+
+#' @export
+`read_table.arrow::ipc::RecordBatchFileReader` <- function(stream) {
+  `arrow::Table`$new(Table__from_RecordBatchFileReader(stream))
+}
+
+#' @export
+`read_table.arrow::ipc::RecordBatchStreamReader` <- function(stream) {
+  `arrow::Table`$new(Table__from_RecordBatchStreamReader(stream))
+}
+
+#' @export
+`read_table.arrow::io::BufferReader` <- function(stream) {
+  reader <- record_batch_stream_reader(stream)
+  read_table(reader)
+}
+
+#' @export
+`read_table.raw` <- function(stream) {
+  stream <- close_on_exit(buffer_reader(stream))
+  read_table(stream)
+}
+
diff --git a/r/R/RecordBatchWriter.R b/r/R/RecordBatchWriter.R
new file mode 100644
index 0000000..f1ab29d
--- /dev/null
+++ b/r/R/RecordBatchWriter.R
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include R6.R
+
+`arrow::ipc::RecordBatchWriter` <- R6Class("arrow::ipc::RecordBatchWriter", 
inherit = `arrow::Object`,
+  public = list(
+    WriteRecordBatch = function(batch, allow_64bit) 
ipc___RecordBatchWriter__WriteRecordBatch(self, batch, allow_64bit),
+    WriteTable = function(table) ipc___RecordBatchWriter__WriteTable(self, 
table),
+    Close = function() ipc___RecordBatchWriter__Close(self)
+  )
+)
+
+`arrow::ipc::RecordBatchStreamWriter` <- 
R6Class("arrow::ipc::RecordBatchStreamWriter", inherit = 
`arrow::ipc::RecordBatchWriter`)
+`arrow::ipc::RecordBatchFileWriter` <- 
R6Class("arrow::ipc::RecordBatchFileWriter", inherit = 
`arrow::ipc::RecordBatchStreamWriter`)
+
+#' Create a record batch file writer from a stream
+#'
+#' @param stream a stream
+#' @param schema the schema of the batches
+#'
+#' @return an `arrow::ipc::RecordBatchWriter` object
+#'
+#' @export
+record_batch_file_writer <- function(stream, schema) {
+  assert_that(
+    inherits(stream, "arrow::io::OutputStream"),
+    inherits(schema, "arrow::Schema")
+  )
+  
`arrow::ipc::RecordBatchFileWriter`$new(ipc___RecordBatchFileWriter__Open(stream,
 schema))
+}
+
+#' Create a record batch stream writer
+#'
+#' @param stream a stream
+#' @param schema a schema
+#'
+#' @export
+record_batch_stream_writer <- function(stream, schema) {
+  assert_that(
+    inherits(stream, "arrow::io::OutputStream"),
+    inherits(schema, "arrow::Schema")
+  )
+  
`arrow::ipc::RecordBatchStreamWriter`$new(ipc___RecordBatchStreamWriter__Open(stream,
 schema))
+}
+
+#-------- write_record_batch
+
+#' write a record batch
+#'
+#' @param x a `arrow::RecordBatch`
+#' @param stream where to stream the record batch
+#' @param ... extra parameters
+#'
+#' @export
+write_record_batch <- function(x, stream, ...){
+  UseMethod("write_record_batch", stream)
+}
+
+#' @export
+`write_record_batch.arrow::io::OutputStream` <- function(x, stream, ...) {
+  stream_writer <- close_on_exit(record_batch_stream_writer(stream, 
x$schema()))
+  write_record_batch(x, stream_writer)
+}
+
+#' @export
+`write_record_batch.arrow::ipc::RecordBatchWriter` <- function(x, stream, 
allow_64bit = TRUE, ...){
+  stream$WriteRecordBatch(x, allow_64bit)
+}
+
+#' @export
+`write_record_batch.character` <- function(x, stream, ...) {
+  assert_that(length(stream) == 1L)
+  write_record_batch(x, fs::path_abs(stream), ...)
+}
+
+#' @export
+`write_record_batch.fs_path` <- function(x, stream, ...) {
+  assert_that(length(stream) == 1L)
+  file_stream <- close_on_exit(file_output_stream(stream))
+  file_writer <- close_on_exit(record_batch_file_writer(file_stream, 
x$schema()))
+  write_record_batch(x, file_writer, ...)
+}
+
+#' @export
+`write_record_batch.raw` <- function(x, stream, ...) {
+  # how many bytes do we need
+  mock <- mock_output_stream()
+  write_record_batch(x, mock)
+  n <- mock$GetExtentBytesWritten()
+
+  bytes <- raw(n)
+  buffer <- buffer(bytes)
+  buffer_writer <- fixed_size_buffer_writer(buffer)
+  write_record_batch(x, buffer_writer)
+
+  bytes
+}
+
+#-------- stream Table
+
+#' write an arrow::Table
+#'
+#' @param x an `arrow::Table`
+#' @param stream where to stream the record batch
+#' @param ... extra parameters
+#'
+#' @export
+write_table <- function(x, stream, ...) {
+  UseMethod("write_table", stream)
+}
+
+#' @export
+`write_table.arrow::io::OutputStream` <- function(x, stream, ...) {
+  stream_writer <- close_on_exit(record_batch_stream_writer(stream, 
x$schema()))
+  write_table(x, stream_writer)
+}
+
+#' @export
+`write_table.arrow::ipc::RecordBatchWriter` <- function(x, stream, ...){
+  stream$WriteTable(x)
+}
+
+#' @export
+`write_table.character` <- function(x, stream, ...) {
+  assert_that(length(stream) == 1L)
+  write_table(x, fs::path_abs(stream), ...)
+}
+
+#' @export
+`write_table.fs_path` <- function(x, stream, ...) {
+  assert_that(length(stream) == 1L)
+  file_stream <- close_on_exit(file_output_stream(stream))
+  file_writer <- close_on_exit(record_batch_file_writer(file_stream, 
x$schema()))
+  write_table(x, file_writer, ...)
+}
+
+#' @export
+`write_table.raw` <- function(x, stream, ...) {
+  # how many bytes do we need
+  mock <- mock_output_stream()
+  write_table(x, mock)
+  n <- mock$GetExtentBytesWritten()
+
+  bytes <- raw(n)
+  buffer <- buffer(bytes)
+  buffer_writer <- fixed_size_buffer_writer(buffer)
+  write_table(x, buffer_writer)
+
+  bytes
+}
+
+#' Write an object to a stream
+#'
+#' @param x An object to stream
+#' @param stream A stream
+#' @param ... additional parameters
+#'
+#' @export
+write_arrow <- function(x, stream, ...){
+  UseMethod("write_arrow")
+}
+
+#' @export
+`write_arrow.arrow::RecordBatch` <- function(x, stream, ...) {
+  write_record_batch(x, stream, ...)
+}
+
+#' @export
+`write_arrow.arrow::Table` <- function(x, stream, ...) {
+  write_table(x, stream, ...)
+}
+
+#' @export
+`write_arrow.data.frame` <- function(x, stream, ...) {
+  write_record_batch(record_batch(x), stream, ...)
+}
diff --git a/r/R/Table.R b/r/R/Table.R
index 8acf30d..62011fc 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -22,9 +22,9 @@
     num_columns = function() Table__num_columns(self),
     num_rows = function() Table__num_rows(self),
     schema = function() `arrow::Schema`$new(Table__schema(self)),
-    to_file = function(path) invisible(Table__to_file(self, 
fs::path_abs(path))),
-    to_stream = function() Table__to_stream(self),
-    column = function(i) `arrow::Column`$new(Table__column(self, i))
+    column = function(i) `arrow::Column`$new(Table__column(self, i)),
+
+    serialize = function(output_stream, ...) write_table(self, output_stream, 
...)
   )
 )
 
@@ -37,53 +37,6 @@ table <- function(.data){
   `arrow::Table`$new(Table__from_dataframe(.data))
 }
 
-#' Write a tibble in a binary arrow file
-#'
-#' @param data a [tibble::tibble]
-#' @param path file path
-#'
-#' @export
-write_arrow <- function(data, path){
-  table(data)$to_file(path)
-}
-
-#' Read an arrow::Table from a stream
-#'
-#' @param stream stream. Either a stream created by [file_open()] or 
[mmap_open()] or a file path.
-#'
-#' @export
-read_table <- function(stream){
-  UseMethod("read_table")
-}
-
-#' @export
-read_table.character <- function(stream){
-  assert_that(length(stream) == 1L)
-  read_table(fs::path_abs(stream))
-}
-
-#' @export
-read_table.fs_path <- function(stream) {
-  stream <- file_open(stream); on.exit(stream$Close())
-  read_table(stream)
-}
-
-#' @export
-`read_table.arrow::io::RandomAccessFile` <- function(stream) {
-  `arrow::Table`$new(read_table_RandomAccessFile(stream))
-}
-
-#' @export
-`read_table.arrow::io::BufferReader` <- function(stream) {
-  `arrow::Table`$new(read_table_BufferReader(stream))
-}
-
-#' @export
-`read_table.raw` <- function(stream) {
-  stream <- buffer_reader(stream); on.exit(stream$Close())
-  read_table(stream)
-}
-
 #' @export
 `as_tibble.arrow::Table` <- function(x, ...){
   Table__to_dataframe(x)
diff --git a/r/R/buffer.R b/r/R/buffer.R
index 6389fcb..5f65721 100644
--- a/r/R/buffer.R
+++ b/r/R/buffer.R
@@ -27,6 +27,8 @@
   )
 )
 
+`arrow::MutableBuffer` <- R6Class("arrow::Buffer", inherit = `arrow::Buffer`)
+
 #' Create a buffer from an R object
 #'
 #' @param x R object
diff --git a/r/R/enums.R b/r/R/enums.R
index 6b4ff24..657dc1a 100644
--- a/r/R/enums.R
+++ b/r/R/enums.R
@@ -20,8 +20,6 @@
   NextMethod()
 }
 
-#' @importFrom rlang seq2 quo_name set_names
-#' @importFrom purrr map_chr
 enum <- function(class, ..., .list = list(...)){
   structure(
     .list,
diff --git a/r/R/io.R b/r/R/io.R
index 76fa1dc..1516bd3 100644
--- a/r/R/io.R
+++ b/r/R/io.R
@@ -31,6 +31,33 @@
   )
 )
 
+`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = 
`arrow::Object`)
+
+`arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit = 
`arrow::io::Writable`,
+  public = list(
+    Close = function() io___OutputStream__Close(self)
+  )
+)
+
+`arrow::io::FileOutputStream` <- R6Class("arrow::io::FileOutputStream", 
inherit = `arrow::io::OutputStream`)
+
+`arrow::io::MockOutputStream` <- R6Class("arrow::io::MockOutputStream", 
inherit = `arrow::io::OutputStream`,
+  public = list(
+    GetExtentBytesWritten = function() 
io___MockOutputStream__GetExtentBytesWritten(self)
+  )
+)
+
+`arrow::io::BufferOutputStream` <- R6Class("arrow::io::BufferOutputStream", 
inherit = `arrow::io::OutputStream`,
+  public = list(
+    capacity = function() io___BufferOutputStream__capacity(self),
+    Finish = function() 
`arrow::Buffer`$new(io___BufferOutputStream__Finish(self)),
+    Write = function(bytes) io___BufferOutputStream__Write(self, bytes),
+    Tell = function() io___BufferOutputStream__Tell(self)
+  )
+)
+
+`arrow::io::FixedSizeBufferWriter` <- 
R6Class("arrow::io::FixedSizeBufferWriter", inherit = `arrow::io::OutputStream`)
+
 `arrow::io::RandomAccessFile` <- R6Class("arrow::io::RandomAccessFile", 
inherit = `arrow::io::InputStream`,
   public = list(
     GetSize = function() io___RandomAccessFile__GetSize(self),
@@ -49,11 +76,14 @@
 `arrow::io::ReadableFile` <- R6Class("arrow::io::ReadableFile", inherit = 
`arrow::io::RandomAccessFile`)
 `arrow::io::BufferReader` <- R6Class("arrow::io::BufferReader", inherit = 
`arrow::io::RandomAccessFile`)
 
+
 #' Create a new read/write memory mapped file of a given size
 #'
 #' @param path file path
 #' @param size size in bytes
 #' @param mode file mode (read/write/readwrite)
+#' @param buffer an `arrow::Buffer`, typically created by [buffer()]
+#' @param initial_capacity initial capacity for the buffer output stream
 #'
 #' @rdname io
 #' @export
@@ -74,6 +104,41 @@ file_open <- `arrow::io::ReadableFile`$open <- 
function(path) {
   `arrow::io::ReadableFile`$new(io___ReadableFile__Open(fs::path_abs(path)))
 }
 
+#' @rdname io
+#' @export
+file_output_stream <- function(path) {
+  `arrow::io::FileOutputStream`$new(io___FileOutputStream__Open(path))
+}
+
+#' @rdname io
+#' @export
+mock_output_stream <- function() {
+  `arrow::io::MockOutputStream`$new(io___MockOutputStream__initialize())
+}
+
+#' @rdname io
+#' @export
+buffer_output_stream <- function(initial_capacity = 0L) {
+  
`arrow::io::BufferOutputStream`$new(io___BufferOutputStream__Create(initial_capacity))
+}
+
+#' @rdname io
+#' @export
+fixed_size_buffer_writer <- function(buffer){
+  UseMethod("fixed_size_buffer_writer")
+}
+
+#' @export
+fixed_size_buffer_writer.default <- function(buffer){
+  fixed_size_buffer_writer(buffer(buffer))
+}
+
+#' @export
+`fixed_size_buffer_writer.arrow::Buffer` <- function(buffer){
+  assert_that(buffer$is_mutable())
+  
`arrow::io::FixedSizeBufferWriter`$new(io___FixedSizeBufferWriter__initialize(buffer))
+}
+
 #' Create a `arrow::BufferReader`
 #'
 #' @param x R object to treat as a buffer or a buffer created by [buffer()]
diff --git a/r/R/on_exit.R b/r/R/on_exit.R
new file mode 100644
index 0000000..9387169
--- /dev/null
+++ b/r/R/on_exit.R
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @importFrom withr defer_parent
+close_on_exit <- function(x, ...){
+  defer_parent(x$Close(), ...)
+  x
+}
+
+local_tempfile <- function(...){
+  tf <- tempfile()
+  defer_parent(unlink(tf), ...)
+  tf
+}
diff --git a/r/data-raw/test.R b/r/data-raw/test.R
index 60f2607..516af58 100644
--- a/r/data-raw/test.R
+++ b/r/data-raw/test.R
@@ -48,7 +48,7 @@ arr <- array(1:3, 5:80)
 arr
 arr$as_vector()
 
-#------- read_arrow / write_arrow
+#------- read_arrow / stream
 tbl <- tibble(x=1:10, y=rnorm(10))
 write_arrow(tbl, "/tmp/test.arrow")
 readr::write_rds(tbl, "/tmp/test.rds")
@@ -60,7 +60,7 @@ fs::file_info(c("/tmp/test.arrow", "/tmp/test.rds"))
 (batch <- record_batch(tbl))
 batch$num_columns()
 batch$num_rows()
-batch$to_file("/tmp/test")
+write_arrow(batch, "/tmp/test")
 readBin("/tmp/test", what = raw(), n = 1000)
 batch$schema()
 all.equal(tbl, data)
@@ -79,7 +79,7 @@ tab$schema()
 tab$num_columns()
 tab$num_rows()
 
-# read_arrow, write_arrow
+# read_arrow, stream
 tbl <- tibble(x = rnorm(20), y = seq_len(20))
 write_arrow(tbl, tf)
 
diff --git a/r/man/io.Rd b/r/man/io.Rd
index 8c572d8..74817bf 100644
--- a/r/man/io.Rd
+++ b/r/man/io.Rd
@@ -4,6 +4,10 @@
 \alias{mmap_create}
 \alias{mmap_open}
 \alias{file_open}
+\alias{file_output_stream}
+\alias{mock_output_stream}
+\alias{buffer_output_stream}
+\alias{fixed_size_buffer_writer}
 \title{Create a new read/write memory mapped file of a given size}
 \usage{
 mmap_create(path, size)
@@ -11,6 +15,14 @@ mmap_create(path, size)
 mmap_open(path, mode = c("read", "write", "readwrite"))
 
 file_open(path)
+
+file_output_stream(path)
+
+mock_output_stream()
+
+buffer_output_stream(initial_capacity = 0L)
+
+fixed_size_buffer_writer(buffer)
 }
 \arguments{
 \item{path}{file path}
@@ -18,6 +30,10 @@ file_open(path)
 \item{size}{size in bytes}
 
 \item{mode}{file mode (read/write/readwrite)}
+
+\item{initial_capacity}{initial capacity for the buffer output stream}
+
+\item{buffer}{an \code{arrow::Buffer}, typically created by 
\code{\link[=buffer]{buffer()}}}
 }
 \description{
 Create a new read/write memory mapped file of a given size
diff --git a/r/man/read_record_batch.Rd b/r/man/read_record_batch.Rd
index 86e7673..4ca048f 100644
--- a/r/man/read_record_batch.Rd
+++ b/r/man/read_record_batch.Rd
@@ -1,13 +1,15 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/RecordBatch.R
+% Please edit documentation in R/RecordBatchReader.R
 \name{read_record_batch}
 \alias{read_record_batch}
 \title{Read a single record batch from a stream}
 \usage{
-read_record_batch(stream)
+read_record_batch(stream, ...)
 }
 \arguments{
 \item{stream}{input stream}
+
+\item{...}{additional parameters}
 }
 \description{
 Read a single record batch from a stream
diff --git a/r/man/read_table.Rd b/r/man/read_table.Rd
index 7b34654..f851057 100644
--- a/r/man/read_table.Rd
+++ b/r/man/read_table.Rd
@@ -1,5 +1,5 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/Table.R
+% Please edit documentation in R/RecordBatchReader.R
 \name{read_table}
 \alias{read_table}
 \title{Read an arrow::Table from a stream}
diff --git a/r/man/record_batch_file_reader.Rd 
b/r/man/record_batch_file_reader.Rd
new file mode 100644
index 0000000..b7e211d
--- /dev/null
+++ b/r/man/record_batch_file_reader.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchReader.R
+\name{record_batch_file_reader}
+\alias{record_batch_file_reader}
+\title{Create an \code{arrow::ipc::RecordBatchFileReader} from a file}
+\usage{
+record_batch_file_reader(file)
+}
+\arguments{
+\item{file}{The file to read from}
+}
+\description{
+Create an \code{arrow::ipc::RecordBatchFileReader} from a file
+}
diff --git a/r/man/record_batch_file_writer.Rd 
b/r/man/record_batch_file_writer.Rd
new file mode 100644
index 0000000..b7dcb0c
--- /dev/null
+++ b/r/man/record_batch_file_writer.Rd
@@ -0,0 +1,19 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{record_batch_file_writer}
+\alias{record_batch_file_writer}
+\title{Create a record batch file writer from a stream}
+\usage{
+record_batch_file_writer(stream, schema)
+}
+\arguments{
+\item{stream}{a stream}
+
+\item{schema}{the schema of the batches}
+}
+\value{
+an \code{arrow::ipc::RecordBatchWriter} object
+}
+\description{
+Create a record batch file writer from a stream
+}
diff --git a/r/man/record_batch_stream_reader.Rd 
b/r/man/record_batch_stream_reader.Rd
new file mode 100644
index 0000000..018045f
--- /dev/null
+++ b/r/man/record_batch_stream_reader.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchReader.R
+\name{record_batch_stream_reader}
+\alias{record_batch_stream_reader}
+\title{Create a \code{arrow::ipc::RecordBatchStreamReader} from an input 
stream}
+\usage{
+record_batch_stream_reader(stream)
+}
+\arguments{
+\item{stream}{input stream}
+}
+\description{
+Create a \code{arrow::ipc::RecordBatchStreamReader} from an input stream
+}
diff --git a/r/man/record_batch_stream_writer.Rd 
b/r/man/record_batch_stream_writer.Rd
new file mode 100644
index 0000000..d720d50
--- /dev/null
+++ b/r/man/record_batch_stream_writer.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{record_batch_stream_writer}
+\alias{record_batch_stream_writer}
+\title{Create a record batch stream writer}
+\usage{
+record_batch_stream_writer(stream, schema)
+}
+\arguments{
+\item{stream}{a stream}
+
+\item{schema}{a schema}
+}
+\description{
+Create a record batch stream writer
+}
diff --git a/r/man/write_arrow.Rd b/r/man/write_arrow.Rd
index ff48d1e..42b39f1 100644
--- a/r/man/write_arrow.Rd
+++ b/r/man/write_arrow.Rd
@@ -1,16 +1,18 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/Table.R
+% Please edit documentation in R/RecordBatchWriter.R
 \name{write_arrow}
 \alias{write_arrow}
-\title{Write a tibble in a binary arrow file}
+\title{Write an object to a stream}
 \usage{
-write_arrow(data, path)
+write_arrow(x, stream, ...)
 }
 \arguments{
-\item{data}{a \link[tibble:tibble]{tibble::tibble}}
+\item{x}{An object to stream}
 
-\item{path}{file path}
+\item{stream}{A stream}
+
+\item{...}{additional parameters}
 }
 \description{
-Write a tibble in a binary arrow file
+Write an object to a stream
 }
diff --git a/r/man/write_record_batch.Rd b/r/man/write_record_batch.Rd
new file mode 100644
index 0000000..afc3363
--- /dev/null
+++ b/r/man/write_record_batch.Rd
@@ -0,0 +1,18 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{write_record_batch}
+\alias{write_record_batch}
+\title{write a record batch}
+\usage{
+write_record_batch(x, stream, ...)
+}
+\arguments{
+\item{x}{a \code{arrow::RecordBatch}}
+
+\item{stream}{where to stream the record batch}
+
+\item{...}{extra parameters}
+}
+\description{
+write a record batch
+}
diff --git a/r/man/write_table.Rd b/r/man/write_table.Rd
new file mode 100644
index 0000000..a247870
--- /dev/null
+++ b/r/man/write_table.Rd
@@ -0,0 +1,18 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/RecordBatchWriter.R
+\name{write_table}
+\alias{write_table}
+\title{write an arrow::Table}
+\usage{
+write_table(x, stream, ...)
+}
+\arguments{
+\item{x}{an \code{arrow::Table}}
+
+\item{stream}{where to stream the record batch}
+
+\item{...}{extra parameters}
+}
+\description{
+write an arrow::Table
+}
diff --git a/r/src/DataType.cpp b/r/src/DataType.cpp
index ab9351b..595dee3 100644
--- a/r/src/DataType.cpp
+++ b/r/src/DataType.cpp
@@ -215,6 +215,9 @@ std::string Object__pointer_address(SEXP obj) {
 }
 
 // [[Rcpp::export]]
+bool Object__is_null(const std::shared_ptr<void>& obj) { return obj.get() == 
nullptr; }
+
+// [[Rcpp::export]]
 std::shared_ptr<arrow::DataType> DictionaryType__initialize(
     const std::shared_ptr<arrow::DataType>& type,
     const std::shared_ptr<arrow::Array>& array, bool ordered) {
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index e98a432..8b4c89a 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -920,6 +920,17 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// Object__is_null
+bool Object__is_null(const std::shared_ptr<void>& obj);
+RcppExport SEXP _arrow_Object__is_null(SEXP objSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<void>& >::type 
obj(objSEXP);
+    rcpp_result_gen = Rcpp::wrap(Object__is_null(obj));
+    return rcpp_result_gen;
+END_RCPP
+}
 // DictionaryType__initialize
 std::shared_ptr<arrow::DataType> DictionaryType__initialize(const 
std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Array>& 
array, bool ordered);
 RcppExport SEXP _arrow_DictionaryType__initialize(SEXP typeSEXP, SEXP 
arraySEXP, SEXP orderedSEXP) {
@@ -1045,6 +1056,16 @@ BEGIN_RCPP
     return R_NilValue;
 END_RCPP
 }
+// io___OutputStream__Close
+void io___OutputStream__Close(const std::shared_ptr<arrow::io::OutputStream>& 
x);
+RcppExport SEXP _arrow_io___OutputStream__Close(SEXP xSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::OutputStream>& >::type x(xSEXP);
+    io___OutputStream__Close(x);
+    return R_NilValue;
+END_RCPP
+}
 // io___RandomAccessFile__GetSize
 int64_t io___RandomAccessFile__GetSize(const 
std::shared_ptr<arrow::io::RandomAccessFile>& x);
 RcppExport SEXP _arrow_io___RandomAccessFile__GetSize(SEXP xSEXP) {
@@ -1146,6 +1167,104 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// io___FileOutputStream__Open
+std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(const 
std::string& path);
+RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::string& >::type path(pathSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___FileOutputStream__Open(path));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Create
+std::shared_ptr<arrow::io::BufferOutputStream> 
io___BufferOutputStream__Create(int64_t initial_capacity);
+RcppExport SEXP _arrow_io___BufferOutputStream__Create(SEXP 
initial_capacitySEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< int64_t >::type 
initial_capacity(initial_capacitySEXP);
+    rcpp_result_gen = 
Rcpp::wrap(io___BufferOutputStream__Create(initial_capacity));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__capacity
+int64_t io___BufferOutputStream__capacity(const 
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__capacity(SEXP streamSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__capacity(stream));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Finish
+std::shared_ptr<arrow::Buffer> io___BufferOutputStream__Finish(const 
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__Finish(SEXP streamSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__Finish(stream));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Tell
+int64_t io___BufferOutputStream__Tell(const 
std::shared_ptr<arrow::io::BufferOutputStream>& stream);
+RcppExport SEXP _arrow_io___BufferOutputStream__Tell(SEXP streamSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___BufferOutputStream__Tell(stream));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___BufferOutputStream__Write
+void io___BufferOutputStream__Write(const 
std::shared_ptr<arrow::io::BufferOutputStream>& stream, RawVector_ bytes);
+RcppExport SEXP _arrow_io___BufferOutputStream__Write(SEXP streamSEXP, SEXP 
bytesSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferOutputStream>& >::type stream(streamSEXP);
+    Rcpp::traits::input_parameter< RawVector_ >::type bytes(bytesSEXP);
+    io___BufferOutputStream__Write(stream, bytes);
+    return R_NilValue;
+END_RCPP
+}
+// io___MockOutputStream__initialize
+std::shared_ptr<arrow::io::MockOutputStream> 
io___MockOutputStream__initialize();
+RcppExport SEXP _arrow_io___MockOutputStream__initialize() {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    rcpp_result_gen = Rcpp::wrap(io___MockOutputStream__initialize());
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___MockOutputStream__GetExtentBytesWritten
+int64_t io___MockOutputStream__GetExtentBytesWritten(const 
std::shared_ptr<arrow::io::MockOutputStream>& stream);
+RcppExport SEXP _arrow_io___MockOutputStream__GetExtentBytesWritten(SEXP 
streamSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::MockOutputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = 
Rcpp::wrap(io___MockOutputStream__GetExtentBytesWritten(stream));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___FixedSizeBufferWriter__initialize
+std::shared_ptr<arrow::io::FixedSizeBufferWriter> 
io___FixedSizeBufferWriter__initialize(const std::shared_ptr<arrow::Buffer>& 
buffer);
+RcppExport SEXP _arrow_io___FixedSizeBufferWriter__initialize(SEXP bufferSEXP) 
{
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Buffer>& 
>::type buffer(bufferSEXP);
+    rcpp_result_gen = 
Rcpp::wrap(io___FixedSizeBufferWriter__initialize(buffer));
+    return rcpp_result_gen;
+END_RCPP
+}
 // MemoryPool__default
 std::shared_ptr<arrow::MemoryPool> MemoryPool__default();
 RcppExport SEXP _arrow_MemoryPool__default() {
@@ -1234,51 +1353,6 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
-// read_record_batch_RandomAccessFile
-std::shared_ptr<arrow::RecordBatch> read_record_batch_RandomAccessFile(const 
std::shared_ptr<arrow::io::RandomAccessFile>& stream);
-RcppExport SEXP _arrow_read_record_batch_RandomAccessFile(SEXP streamSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::RandomAccessFile>& >::type stream(streamSEXP);
-    rcpp_result_gen = Rcpp::wrap(read_record_batch_RandomAccessFile(stream));
-    return rcpp_result_gen;
-END_RCPP
-}
-// read_record_batch_BufferReader
-std::shared_ptr<arrow::RecordBatch> read_record_batch_BufferReader(const 
std::shared_ptr<arrow::io::BufferReader>& stream);
-RcppExport SEXP _arrow_read_record_batch_BufferReader(SEXP streamSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferReader>& >::type stream(streamSEXP);
-    rcpp_result_gen = Rcpp::wrap(read_record_batch_BufferReader(stream));
-    return rcpp_result_gen;
-END_RCPP
-}
-// RecordBatch__to_file
-int RecordBatch__to_file(const std::shared_ptr<arrow::RecordBatch>& batch, 
std::string path);
-RcppExport SEXP _arrow_RecordBatch__to_file(SEXP batchSEXP, SEXP pathSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& 
>::type batch(batchSEXP);
-    Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
-    rcpp_result_gen = Rcpp::wrap(RecordBatch__to_file(batch, path));
-    return rcpp_result_gen;
-END_RCPP
-}
-// RecordBatch__to_stream
-RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>& 
batch);
-RcppExport SEXP _arrow_RecordBatch__to_stream(SEXP batchSEXP) {
-BEGIN_RCPP
-    Rcpp::RObject rcpp_result_gen;
-    Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& 
>::type batch(batchSEXP);
-    rcpp_result_gen = Rcpp::wrap(RecordBatch__to_stream(batch));
-    return rcpp_result_gen;
-END_RCPP
-}
 // RecordBatch__from_dataframe
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl);
 RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) {
@@ -1362,92 +1436,204 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__from_dataframe
-std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
-RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) {
+// RecordBatchReader__schema
+std::shared_ptr<arrow::Schema> RecordBatchReader__schema(const 
std::shared_ptr<arrow::RecordBatchReader>& reader);
+RcppExport SEXP _arrow_RecordBatchReader__schema(SEXP readerSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl));
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::RecordBatchReader>& >::type reader(readerSEXP);
+    rcpp_result_gen = Rcpp::wrap(RecordBatchReader__schema(reader));
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__num_columns
-int Table__num_columns(const std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) {
+// RecordBatchReader__ReadNext
+std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(const 
std::shared_ptr<arrow::RecordBatchReader>& reader);
+RcppExport SEXP _arrow_RecordBatchReader__ReadNext(SEXP readerSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__num_columns(x));
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::RecordBatchReader>& >::type reader(readerSEXP);
+    rcpp_result_gen = Rcpp::wrap(RecordBatchReader__ReadNext(reader));
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__num_rows
-int Table__num_rows(const std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__num_rows(SEXP xSEXP) {
+// ipc___RecordBatchStreamReader__Open
+std::shared_ptr<arrow::RecordBatchReader> 
ipc___RecordBatchStreamReader__Open(const 
std::shared_ptr<arrow::io::InputStream>& stream);
+RcppExport SEXP _arrow_ipc___RecordBatchStreamReader__Open(SEXP streamSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__num_rows(x));
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::InputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchStreamReader__Open(stream));
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__schema
-std::shared_ptr<arrow::Schema> Table__schema(const 
std::shared_ptr<arrow::Table>& x);
-RcppExport SEXP _arrow_Table__schema(SEXP xSEXP) {
+// ipc___RecordBatchFileReader__schema
+std::shared_ptr<arrow::Schema> ipc___RecordBatchFileReader__schema(const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__schema(SEXP readerSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__schema(x));
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+    rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileReader__schema(reader));
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__to_file
-int Table__to_file(const std::shared_ptr<arrow::Table>& table, std::string 
path);
-RcppExport SEXP _arrow_Table__to_file(SEXP tableSEXP, SEXP pathSEXP) {
+// ipc___RecordBatchFileReader__num_record_batches
+int ipc___RecordBatchFileReader__num_record_batches(const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__num_record_batches(SEXP 
readerSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type table(tableSEXP);
-    Rcpp::traits::input_parameter< std::string >::type path(pathSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__to_file(table, path));
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+    rcpp_result_gen = 
Rcpp::wrap(ipc___RecordBatchFileReader__num_record_batches(reader));
+    return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileReader__ReadRecordBatch
+std::shared_ptr<arrow::RecordBatch> 
ipc___RecordBatchFileReader__ReadRecordBatch(const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader, int i);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__ReadRecordBatch(SEXP 
readerSEXP, SEXP iSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+    Rcpp::traits::input_parameter< int >::type i(iSEXP);
+    rcpp_result_gen = 
Rcpp::wrap(ipc___RecordBatchFileReader__ReadRecordBatch(reader, i));
+    return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileReader__Open
+std::shared_ptr<arrow::ipc::RecordBatchFileReader> 
ipc___RecordBatchFileReader__Open(const 
std::shared_ptr<arrow::io::RandomAccessFile>& file);
+RcppExport SEXP _arrow_ipc___RecordBatchFileReader__Open(SEXP fileSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::RandomAccessFile>& >::type file(fileSEXP);
+    rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileReader__Open(file));
+    return rcpp_result_gen;
+END_RCPP
+}
+// Table__from_RecordBatchFileReader
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+RcppExport SEXP _arrow_Table__from_RecordBatchFileReader(SEXP readerSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchFileReader>& >::type reader(readerSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__from_RecordBatchFileReader(reader));
+    return rcpp_result_gen;
+END_RCPP
+}
+// Table__from_RecordBatchStreamReader
+std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(const 
std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader);
+RcppExport SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP readerSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& >::type 
reader(readerSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__from_RecordBatchStreamReader(reader));
+    return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchFileWriter__Open
+std::shared_ptr<arrow::ipc::RecordBatchWriter> 
ipc___RecordBatchFileWriter__Open(const 
std::shared_ptr<arrow::io::OutputStream>& stream, const 
std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP streamSEXP, SEXP 
schemaSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Schema>& 
>::type schema(schemaSEXP);
+    rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchFileWriter__Open(stream, 
schema));
     return rcpp_result_gen;
 END_RCPP
 }
-// Table__to_stream
-RawVector Table__to_stream(const std::shared_ptr<arrow::Table>& table);
-RcppExport SEXP _arrow_Table__to_stream(SEXP tableSEXP) {
+// ipc___RecordBatchStreamWriter__Open
+std::shared_ptr<arrow::ipc::RecordBatchWriter> 
ipc___RecordBatchStreamWriter__Open(const 
std::shared_ptr<arrow::io::OutputStream>& stream, const 
std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP streamSEXP, 
SEXP schemaSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Schema>& 
>::type schema(schemaSEXP);
+    rcpp_result_gen = Rcpp::wrap(ipc___RecordBatchStreamWriter__Open(stream, 
schema));
+    return rcpp_result_gen;
+END_RCPP
+}
+// ipc___RecordBatchWriter__WriteRecordBatch
+void ipc___RecordBatchWriter__WriteRecordBatch(const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer, const 
std::shared_ptr<arrow::RecordBatch>& batch, bool allow_64bit);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__WriteRecordBatch(SEXP 
batch_writerSEXP, SEXP batchSEXP, SEXP allow_64bitSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type 
batch_writer(batch_writerSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::RecordBatch>& 
>::type batch(batchSEXP);
+    Rcpp::traits::input_parameter< bool >::type allow_64bit(allow_64bitSEXP);
+    ipc___RecordBatchWriter__WriteRecordBatch(batch_writer, batch, 
allow_64bit);
+    return R_NilValue;
+END_RCPP
+}
+// ipc___RecordBatchWriter__WriteTable
+void ipc___RecordBatchWriter__WriteTable(const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer, const 
std::shared_ptr<arrow::Table>& table);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__WriteTable(SEXP 
batch_writerSEXP, SEXP tableSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type 
batch_writer(batch_writerSEXP);
     Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type table(tableSEXP);
-    rcpp_result_gen = Rcpp::wrap(Table__to_stream(table));
+    ipc___RecordBatchWriter__WriteTable(batch_writer, table);
+    return R_NilValue;
+END_RCPP
+}
+// ipc___RecordBatchWriter__Close
+void ipc___RecordBatchWriter__Close(const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer);
+RcppExport SEXP _arrow_ipc___RecordBatchWriter__Close(SEXP batch_writerSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::ipc::RecordBatchWriter>& >::type 
batch_writer(batch_writerSEXP);
+    ipc___RecordBatchWriter__Close(batch_writer);
+    return R_NilValue;
+END_RCPP
+}
+// Table__from_dataframe
+std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
+RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl));
+    return rcpp_result_gen;
+END_RCPP
+}
+// Table__num_columns
+int Table__num_columns(const std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__num_columns(x));
     return rcpp_result_gen;
 END_RCPP
 }
-// read_table_RandomAccessFile
-std::shared_ptr<arrow::Table> read_table_RandomAccessFile(const 
std::shared_ptr<arrow::io::RandomAccessFile>& stream);
-RcppExport SEXP _arrow_read_table_RandomAccessFile(SEXP streamSEXP) {
+// Table__num_rows
+int Table__num_rows(const std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__num_rows(SEXP xSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::RandomAccessFile>& >::type stream(streamSEXP);
-    rcpp_result_gen = Rcpp::wrap(read_table_RandomAccessFile(stream));
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__num_rows(x));
     return rcpp_result_gen;
 END_RCPP
 }
-// read_table_BufferReader
-std::shared_ptr<arrow::Table> read_table_BufferReader(const 
std::shared_ptr<arrow::io::BufferReader>& stream);
-RcppExport SEXP _arrow_read_table_BufferReader(SEXP streamSEXP) {
+// Table__schema
+std::shared_ptr<arrow::Schema> Table__schema(const 
std::shared_ptr<arrow::Table>& x);
+RcppExport SEXP _arrow_Table__schema(SEXP xSEXP) {
 BEGIN_RCPP
     Rcpp::RObject rcpp_result_gen;
     Rcpp::RNGScope rcpp_rngScope_gen;
-    Rcpp::traits::input_parameter< const 
std::shared_ptr<arrow::io::BufferReader>& >::type stream(streamSEXP);
-    rcpp_result_gen = Rcpp::wrap(read_table_BufferReader(stream));
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Table>& 
>::type x(xSEXP);
+    rcpp_result_gen = Rcpp::wrap(Table__schema(x));
     return rcpp_result_gen;
 END_RCPP
 }
@@ -1559,6 +1745,7 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_TimestampType__timezone", (DL_FUNC) 
&_arrow_TimestampType__timezone, 1},
     {"_arrow_TimestampType__unit", (DL_FUNC) &_arrow_TimestampType__unit, 1},
     {"_arrow_Object__pointer_address", (DL_FUNC) 
&_arrow_Object__pointer_address, 1},
+    {"_arrow_Object__is_null", (DL_FUNC) &_arrow_Object__is_null, 1},
     {"_arrow_DictionaryType__initialize", (DL_FUNC) 
&_arrow_DictionaryType__initialize, 3},
     {"_arrow_DictionaryType__index_type", (DL_FUNC) 
&_arrow_DictionaryType__index_type, 1},
     {"_arrow_DictionaryType__name", (DL_FUNC) &_arrow_DictionaryType__name, 1},
@@ -1570,6 +1757,7 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_Field__nullable", (DL_FUNC) &_arrow_Field__nullable, 1},
     {"_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2},
     {"_arrow_io___InputStream__Close", (DL_FUNC) 
&_arrow_io___InputStream__Close, 1},
+    {"_arrow_io___OutputStream__Close", (DL_FUNC) 
&_arrow_io___OutputStream__Close, 1},
     {"_arrow_io___RandomAccessFile__GetSize", (DL_FUNC) 
&_arrow_io___RandomAccessFile__GetSize, 1},
     {"_arrow_io___RandomAccessFile__supports_zero_copy", (DL_FUNC) 
&_arrow_io___RandomAccessFile__supports_zero_copy, 1},
     {"_arrow_io___RandomAccessFile__Seek", (DL_FUNC) 
&_arrow_io___RandomAccessFile__Seek, 2},
@@ -1579,6 +1767,15 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC) 
&_arrow_io___MemoryMappedFile__Resize, 2},
     {"_arrow_io___ReadableFile__Open", (DL_FUNC) 
&_arrow_io___ReadableFile__Open, 1},
     {"_arrow_io___BufferReader__initialize", (DL_FUNC) 
&_arrow_io___BufferReader__initialize, 1},
+    {"_arrow_io___FileOutputStream__Open", (DL_FUNC) 
&_arrow_io___FileOutputStream__Open, 1},
+    {"_arrow_io___BufferOutputStream__Create", (DL_FUNC) 
&_arrow_io___BufferOutputStream__Create, 1},
+    {"_arrow_io___BufferOutputStream__capacity", (DL_FUNC) 
&_arrow_io___BufferOutputStream__capacity, 1},
+    {"_arrow_io___BufferOutputStream__Finish", (DL_FUNC) 
&_arrow_io___BufferOutputStream__Finish, 1},
+    {"_arrow_io___BufferOutputStream__Tell", (DL_FUNC) 
&_arrow_io___BufferOutputStream__Tell, 1},
+    {"_arrow_io___BufferOutputStream__Write", (DL_FUNC) 
&_arrow_io___BufferOutputStream__Write, 2},
+    {"_arrow_io___MockOutputStream__initialize", (DL_FUNC) 
&_arrow_io___MockOutputStream__initialize, 0},
+    {"_arrow_io___MockOutputStream__GetExtentBytesWritten", (DL_FUNC) 
&_arrow_io___MockOutputStream__GetExtentBytesWritten, 1},
+    {"_arrow_io___FixedSizeBufferWriter__initialize", (DL_FUNC) 
&_arrow_io___FixedSizeBufferWriter__initialize, 1},
     {"_arrow_MemoryPool__default", (DL_FUNC) &_arrow_MemoryPool__default, 0},
     {"_arrow_MemoryPool__bytes_allocated", (DL_FUNC) 
&_arrow_MemoryPool__bytes_allocated, 1},
     {"_arrow_MemoryPool__max_memory", (DL_FUNC) 
&_arrow_MemoryPool__max_memory, 1},
@@ -1587,10 +1784,6 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1},
     {"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2},
     {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__to_dataframe, 1},
-    {"_arrow_read_record_batch_RandomAccessFile", (DL_FUNC) 
&_arrow_read_record_batch_RandomAccessFile, 1},
-    {"_arrow_read_record_batch_BufferReader", (DL_FUNC) 
&_arrow_read_record_batch_BufferReader, 1},
-    {"_arrow_RecordBatch__to_file", (DL_FUNC) &_arrow_RecordBatch__to_file, 2},
-    {"_arrow_RecordBatch__to_stream", (DL_FUNC) 
&_arrow_RecordBatch__to_stream, 1},
     {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__from_dataframe, 1},
     {"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2},
     {"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) 
&_arrow_RecordBatch__RemoveColumn, 2},
@@ -1598,14 +1791,24 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_RecordBatch__names", (DL_FUNC) &_arrow_RecordBatch__names, 1},
     {"_arrow_RecordBatch__Slice1", (DL_FUNC) &_arrow_RecordBatch__Slice1, 2},
     {"_arrow_RecordBatch__Slice2", (DL_FUNC) &_arrow_RecordBatch__Slice2, 3},
+    {"_arrow_RecordBatchReader__schema", (DL_FUNC) 
&_arrow_RecordBatchReader__schema, 1},
+    {"_arrow_RecordBatchReader__ReadNext", (DL_FUNC) 
&_arrow_RecordBatchReader__ReadNext, 1},
+    {"_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) 
&_arrow_ipc___RecordBatchStreamReader__Open, 1},
+    {"_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC) 
&_arrow_ipc___RecordBatchFileReader__schema, 1},
+    {"_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) 
&_arrow_ipc___RecordBatchFileReader__num_record_batches, 1},
+    {"_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC) 
&_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2},
+    {"_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC) 
&_arrow_ipc___RecordBatchFileReader__Open, 1},
+    {"_arrow_Table__from_RecordBatchFileReader", (DL_FUNC) 
&_arrow_Table__from_RecordBatchFileReader, 1},
+    {"_arrow_Table__from_RecordBatchStreamReader", (DL_FUNC) 
&_arrow_Table__from_RecordBatchStreamReader, 1},
+    {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) 
&_arrow_ipc___RecordBatchFileWriter__Open, 2},
+    {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) 
&_arrow_ipc___RecordBatchStreamWriter__Open, 2},
+    {"_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) 
&_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 3},
+    {"_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) 
&_arrow_ipc___RecordBatchWriter__WriteTable, 2},
+    {"_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) 
&_arrow_ipc___RecordBatchWriter__Close, 1},
     {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 
1},
     {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1},
     {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1},
     {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1},
-    {"_arrow_Table__to_file", (DL_FUNC) &_arrow_Table__to_file, 2},
-    {"_arrow_Table__to_stream", (DL_FUNC) &_arrow_Table__to_stream, 1},
-    {"_arrow_read_table_RandomAccessFile", (DL_FUNC) 
&_arrow_read_table_RandomAccessFile, 1},
-    {"_arrow_read_table_BufferReader", (DL_FUNC) 
&_arrow_read_table_BufferReader, 1},
     {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 1},
     {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2},
     {NULL, NULL, 0}
diff --git a/r/src/RecordBatch.cpp b/r/src/RecordBatch.cpp
index cd05bc0..5428f21 100644
--- a/r/src/RecordBatch.cpp
+++ b/r/src/RecordBatch.cpp
@@ -63,66 +63,6 @@ List RecordBatch__to_dataframe(const 
std::shared_ptr<arrow::RecordBatch>& batch)
 }
 
 // [[Rcpp::export]]
-std::shared_ptr<arrow::RecordBatch> read_record_batch_RandomAccessFile(
-    const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
-  std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));
-
-  std::shared_ptr<arrow::RecordBatch> batch;
-  R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(0, &batch));
-
-  return batch;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::RecordBatch> read_record_batch_BufferReader(
-    const std::shared_ptr<arrow::io::BufferReader>& stream) {
-  std::shared_ptr<arrow::ipc::RecordBatchReader> rbf_reader;
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream, 
&rbf_reader));
-
-  std::shared_ptr<arrow::RecordBatch> batch;
-  R_ERROR_NOT_OK(rbf_reader->ReadNext(&batch));
-
-  return batch;
-}
-
-// [[Rcpp::export]]
-int RecordBatch__to_file(const std::shared_ptr<arrow::RecordBatch>& batch,
-                         std::string path) {
-  std::shared_ptr<arrow::io::OutputStream> stream;
-  std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
-
-  R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileWriter::Open(stream.get(), 
batch->schema(),
-                                                         &file_writer));
-  R_ERROR_NOT_OK(file_writer->WriteRecordBatch(*batch, true));
-  R_ERROR_NOT_OK(file_writer->Close());
-
-  int64_t offset;
-  R_ERROR_NOT_OK(stream->Tell(&offset));
-  R_ERROR_NOT_OK(stream->Close());
-  return offset;
-}
-
-int64_t RecordBatch_size(const std::shared_ptr<arrow::RecordBatch>& batch) {
-  io::MockOutputStream mock_sink;
-  R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, &mock_sink));
-  return mock_sink.GetExtentBytesWritten();
-}
-
-// [[Rcpp::export]]
-RawVector RecordBatch__to_stream(const std::shared_ptr<arrow::RecordBatch>& 
batch) {
-  auto n = RecordBatch_size(batch);
-
-  RawVector res(n);
-  auto raw_buffer = std::make_shared<arrow::MutableBuffer>(res.begin(), 
res.size());
-  arrow::io::FixedSizeBufferWriter sink(raw_buffer);
-  R_ERROR_NOT_OK(arrow::ipc::WriteRecordBatchStream({batch}, &sink));
-
-  return res;
-}
-
-// [[Rcpp::export]]
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(DataFrame tbl) 
{
   CharacterVector names = tbl.names();
 
diff --git a/r/src/RecordBatchReader.cpp b/r/src/RecordBatchReader.cpp
new file mode 100644
index 0000000..1ddc397
--- /dev/null
+++ b/r/src/RecordBatchReader.cpp
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Schema> RecordBatchReader__schema(
+    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+  return reader->schema();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(
+    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+  std::shared_ptr<arrow::RecordBatch> batch;
+  R_ERROR_NOT_OK(reader->ReadNext(&batch));
+  return batch;
+}
+
+// -------- RecordBatchStreamReader
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatchReader> ipc___RecordBatchStreamReader__Open(
+    const std::shared_ptr<arrow::io::InputStream>& stream) {
+  std::shared_ptr<arrow::RecordBatchReader> reader;
+  R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream, &reader));
+  return reader;
+}
+
+// -------- RecordBatchFileReader
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Schema> ipc___RecordBatchFileReader__schema(
+    const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+  return reader->schema();
+}
+
+// [[Rcpp::export]]
+int ipc___RecordBatchFileReader__num_record_batches(
+    const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+  return reader->num_record_batches();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatch> 
ipc___RecordBatchFileReader__ReadRecordBatch(
+    const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader, int i) {
+  std::shared_ptr<arrow::RecordBatch> batch;
+  R_ERROR_NOT_OK(reader->ReadRecordBatch(i, &batch));
+  return batch;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchFileReader> 
ipc___RecordBatchFileReader__Open(
+    const std::shared_ptr<arrow::io::RandomAccessFile>& file) {
+  std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
+  R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(file, &reader));
+  return reader;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
+    const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+  int num_batches = reader->num_record_batches();
+  std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
+  for (int i = 0; i < num_batches; i++) {
+    R_ERROR_NOT_OK(reader->ReadRecordBatch(i, &batches[i]));
+  }
+
+  std::shared_ptr<arrow::Table> table;
+  R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
+
+  return table;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(
+    const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader) {
+  std::shared_ptr<arrow::RecordBatch> batch;
+  std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+  while (true) {
+    R_ERROR_NOT_OK(reader->ReadNext(&batch));
+    if (!batch) break;
+    batches.push_back(batch);
+  }
+
+  std::shared_ptr<arrow::Table> table;
+  R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
+
+  return table;
+}
diff --git a/r/src/RecordBatchWriter.cpp b/r/src/RecordBatchWriter.cpp
new file mode 100644
index 0000000..7ba9257
--- /dev/null
+++ b/r/src/RecordBatchWriter.cpp
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchWriter> 
ipc___RecordBatchFileWriter__Open(
+    const std::shared_ptr<arrow::io::OutputStream>& stream,
+    const std::shared_ptr<arrow::Schema>& schema) {
+  std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
+  R_ERROR_NOT_OK(
+      arrow::ipc::RecordBatchFileWriter::Open(stream.get(), schema, 
&file_writer));
+  return file_writer;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::ipc::RecordBatchWriter> 
ipc___RecordBatchStreamWriter__Open(
+    const std::shared_ptr<arrow::io::OutputStream>& stream,
+    const std::shared_ptr<arrow::Schema>& schema) {
+  std::shared_ptr<arrow::ipc::RecordBatchWriter> stream_writer;
+  R_ERROR_NOT_OK(
+      arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema, 
&stream_writer));
+  return stream_writer;
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__WriteRecordBatch(
+    const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
+    const std::shared_ptr<arrow::RecordBatch>& batch, bool allow_64bit) {
+  R_ERROR_NOT_OK(batch_writer->WriteRecordBatch(*batch, allow_64bit));
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__WriteTable(
+    const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
+    const std::shared_ptr<arrow::Table>& table) {
+  R_ERROR_NOT_OK(batch_writer->WriteTable(*table));
+}
+
+// [[Rcpp::export]]
+void ipc___RecordBatchWriter__Close(
+    const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer) {
+  R_ERROR_NOT_OK(batch_writer->Close());
+}
diff --git a/r/src/Table.cpp b/r/src/Table.cpp
index adc3832..79d7031 100644
--- a/r/src/Table.cpp
+++ b/r/src/Table.cpp
@@ -46,83 +46,6 @@ std::shared_ptr<arrow::Schema> Table__schema(const 
std::shared_ptr<arrow::Table>
 }
 
 // [[Rcpp::export]]
-int Table__to_file(const std::shared_ptr<arrow::Table>& table, std::string 
path) {
-  std::shared_ptr<arrow::io::OutputStream> stream;
-  std::shared_ptr<arrow::ipc::RecordBatchWriter> file_writer;
-
-  R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileWriter::Open(stream.get(), 
table->schema(),
-                                                         &file_writer));
-  R_ERROR_NOT_OK(file_writer->WriteTable(*table));
-  R_ERROR_NOT_OK(file_writer->Close());
-
-  int64_t offset;
-  R_ERROR_NOT_OK(stream->Tell(&offset));
-  R_ERROR_NOT_OK(stream->Close());
-  return offset;
-}
-
-// [[Rcpp::export]]
-RawVector Table__to_stream(const std::shared_ptr<arrow::Table>& table) {
-  arrow::io::MockOutputStream mock_sink;
-  std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
-  R_ERROR_NOT_OK(
-      arrow::ipc::RecordBatchStreamWriter::Open(&mock_sink, table->schema(), 
&writer));
-  R_ERROR_NOT_OK(writer->WriteTable(*table));
-  R_ERROR_NOT_OK(writer->Close());
-  auto n = mock_sink.GetExtentBytesWritten();
-
-  RawVector res(no_init(n));
-  auto raw_buffer = std::make_shared<arrow::MutableBuffer>(res.begin(), 
res.size());
-  arrow::io::FixedSizeBufferWriter sink(raw_buffer);
-
-  R_ERROR_NOT_OK(
-      arrow::ipc::RecordBatchStreamWriter::Open(&sink, table->schema(), 
&writer));
-  R_ERROR_NOT_OK(writer->WriteTable(*table));
-  R_ERROR_NOT_OK(writer->Close());
-
-  return res;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::Table> read_table_RandomAccessFile(
-    const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
-  std::shared_ptr<arrow::ipc::RecordBatchFileReader> rbf_reader;
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchFileReader::Open(stream, &rbf_reader));
-
-  int num_batches = rbf_reader->num_record_batches();
-  std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
-  for (int i = 0; i < num_batches; i++) {
-    R_ERROR_NOT_OK(rbf_reader->ReadRecordBatch(i, &batches[i]));
-  }
-
-  std::shared_ptr<arrow::Table> table;
-  R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
-
-  return table;
-}
-
-// [[Rcpp::export]]
-std::shared_ptr<arrow::Table> read_table_BufferReader(
-    const std::shared_ptr<arrow::io::BufferReader>& stream) {
-  std::shared_ptr<arrow::ipc::RecordBatchReader> rb_reader;
-  R_ERROR_NOT_OK(arrow::ipc::RecordBatchStreamReader::Open(stream, 
&rb_reader));
-  std::shared_ptr<arrow::RecordBatch> batch;
-
-  std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
-  while (true) {
-    R_ERROR_NOT_OK(rb_reader->ReadNext(&batch));
-    if (!batch) break;
-    batches.push_back(batch);
-  }
-
-  std::shared_ptr<arrow::Table> table;
-  R_ERROR_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &table));
-
-  return table;
-}
-
-// [[Rcpp::export]]
 List Table__to_dataframe(const std::shared_ptr<arrow::Table>& table) {
   int nc = table->num_columns();
   int nr = table->num_rows();
diff --git a/r/src/array.cpp b/r/src/array.cpp
index 4f9eb6c..71bdb52 100644
--- a/r/src/array.cpp
+++ b/r/src/array.cpp
@@ -282,7 +282,6 @@ inline int64_t time_cast<double>(double value) {
 
 template <int RTYPE>
 std::shared_ptr<Array> Date64Array_From_POSIXct(SEXP x) {
-  using stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
   Rcpp::Vector<RTYPE> vec(x);
   auto p_vec = vec.begin();
   auto n = vec.size();
@@ -420,8 +419,6 @@ inline SEXP StringArray_to_Vector(const 
std::shared_ptr<arrow::Array>& array) {
   }
 
   Rcpp::CharacterVector res(no_init(n));
-  const auto& buffers = array->data()->buffers;
-
   auto p_offset = GetValuesSafely<int32_t>(array->data(), 1, array->offset());
   auto p_data = GetValuesSafely<char>(array->data(), 2, *p_offset);
 
@@ -593,6 +590,31 @@ SEXP Date64Array_to_Vector(const 
std::shared_ptr<arrow::Array> array) {
   return vec;
 }
 
+template <int RTYPE, typename Type>
+SEXP promotion_Array_to_Vector(const std::shared_ptr<Array>& array) {
+  using r_stored_type = typename Rcpp::Vector<RTYPE>::stored_type;
+  using value_type = typename TypeTraits<Type>::ArrayType::value_type;
+
+  auto n = array->length();
+  auto start = reinterpret_cast<const 
value_type*>(array->data()->buffers[1]->data()) +
+               array->offset();
+
+  Rcpp::Vector<RTYPE> vec(no_init(n));
+  if (array->null_count()) {
+    internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), 
array->offset(),
+                                         n);
+    for (size_t i = 0; i < n; i++, bitmap_reader.Next()) {
+      vec[i] = bitmap_reader.IsNotSet() ? Rcpp::Vector<RTYPE>::get_na()
+                                        : static_cast<r_stored_type>(start[i]);
+    }
+  } else {
+    std::transform(start, start + n, vec.begin(),
+                   [](value_type x) { return static_cast<r_stored_type>(x); });
+  }
+
+  return vec;
+}
+
 }  // namespace r
 }  // namespace arrow
 
@@ -601,22 +623,51 @@ SEXP Array__as_vector(const 
std::shared_ptr<arrow::Array>& array) {
   using namespace arrow::r;
 
   switch (array->type_id()) {
-    case Type::BOOL:
-      return BooleanArray_to_Vector(array);
+    // direct support
     case Type::INT8:
       return simple_Array_to_Vector<RAWSXP>(array);
     case Type::INT32:
       return simple_Array_to_Vector<INTSXP>(array);
     case Type::DOUBLE:
       return simple_Array_to_Vector<REALSXP>(array);
+
+    // need to handle 1-bit case
+    case Type::BOOL:
+      return BooleanArray_to_Vector(array);
+
+    // handle memory dense strings
     case Type::STRING:
       return StringArray_to_Vector(array);
     case Type::DICTIONARY:
       return 
DictionaryArray_to_Vector(static_cast<arrow::DictionaryArray*>(array.get()));
+
     case Type::DATE32:
       return Date32Array_to_Vector(array);
     case Type::DATE64:
       return Date64Array_to_Vector(array);
+
+    // promotions to integer vector
+    case Type::UINT8:
+      return arrow::r::promotion_Array_to_Vector<INTSXP, 
arrow::UInt8Type>(array);
+    case Type::INT16:
+      return arrow::r::promotion_Array_to_Vector<INTSXP, 
arrow::Int16Type>(array);
+    case Type::UINT16:
+      return arrow::r::promotion_Array_to_Vector<INTSXP, 
arrow::UInt16Type>(array);
+
+    // promotions to numeric vector
+    case Type::UINT32:
+      return arrow::r::promotion_Array_to_Vector<REALSXP, 
arrow::UInt32Type>(array);
+    case Type::HALF_FLOAT:
+      return arrow::r::promotion_Array_to_Vector<REALSXP, 
arrow::UInt32Type>(array);
+    case Type::FLOAT:
+      return arrow::r::promotion_Array_to_Vector<REALSXP, 
arrow::UInt32Type>(array);
+
+    // lossy promotions to numeric vector
+    case Type::INT64:
+      return arrow::r::promotion_Array_to_Vector<REALSXP, 
arrow::Int64Type>(array);
+    case Type::UINT64:
+      return arrow::r::promotion_Array_to_Vector<REALSXP, 
arrow::UInt64Type>(array);
+
     default:
       break;
   }
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 43ee41d..0d21dad 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -23,6 +23,8 @@
 #include <arrow/api.h>
 #include <arrow/io/file.h>
 #include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/ipc/writer.h>
 #include <arrow/type.h>
 
 #define R_ERROR_NOT_OK(s)                  \
@@ -104,6 +106,7 @@ using IntegerVector_ = Rcpp::Vector<INTSXP, 
Rcpp::NoProtectStorage>;
 using LogicalVector_ = Rcpp::Vector<LGLSXP, Rcpp::NoProtectStorage>;
 using StringVector_ = Rcpp::Vector<STRSXP, Rcpp::NoProtectStorage>;
 using CharacterVector_ = StringVector_;
+using RawVector_ = Rcpp::Vector<RAWSXP, Rcpp::NoProtectStorage>;
 
 template <int RTYPE>
 inline typename Rcpp::Vector<RTYPE>::stored_type default_value() {
@@ -135,11 +138,11 @@ inline const T* GetValuesSafely(const 
std::shared_ptr<ArrayData>& data, int i,
 }
 
 template <int RTYPE, typename Vec = Rcpp::Vector<RTYPE>>
-class RBuffer : public Buffer {
+class RBuffer : public MutableBuffer {
  public:
   RBuffer(Vec vec)
-      : Buffer(reinterpret_cast<const uint8_t*>(vec.begin()),
-               vec.size() * sizeof(typename Vec::stored_type)),
+      : MutableBuffer(reinterpret_cast<uint8_t*>(vec.begin()),
+                      vec.size() * sizeof(typename Vec::stored_type)),
         vec_(vec) {}
 
  private:
diff --git a/r/src/io.cpp b/r/src/io.cpp
index e528720..cda4065 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -36,6 +36,13 @@ void io___InputStream__Close(const 
std::shared_ptr<arrow::io::InputStream>& x) {
   R_ERROR_NOT_OK(x->Close());
 }
 
+// ------ arrow::io::OutputStream
+
+// [[Rcpp::export]]
+void io___OutputStream__Close(const std::shared_ptr<arrow::io::OutputStream>& 
x) {
+  R_ERROR_NOT_OK(x->Close());
+}
+
 // ------ arrow::io::RandomAccessFile
 
 // [[Rcpp::export]]
@@ -107,3 +114,73 @@ std::shared_ptr<arrow::io::BufferReader> 
io___BufferReader__initialize(
     const std::shared_ptr<arrow::Buffer>& buffer) {
   return std::make_shared<arrow::io::BufferReader>(buffer);
 }
+
+// ------ arrow::io::FileOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(
+    const std::string& path) {
+  std::shared_ptr<arrow::io::FileOutputStream> stream;
+  R_ERROR_NOT_OK(arrow::io::FileOutputStream::Open(path, &stream));
+  return stream;
+}
+
+// ------ arrow::BufferOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::BufferOutputStream> io___BufferOutputStream__Create(
+    int64_t initial_capacity) {
+  std::shared_ptr<arrow::io::BufferOutputStream> stream;
+  R_ERROR_NOT_OK(arrow::io::BufferOutputStream::Create(
+      initial_capacity, arrow::default_memory_pool(), &stream));
+  return stream;
+}
+
+// [[Rcpp::export]]
+int64_t io___BufferOutputStream__capacity(
+    const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+  return stream->capacity();
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Buffer> io___BufferOutputStream__Finish(
+    const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+  std::shared_ptr<arrow::Buffer> buffer;
+  R_ERROR_NOT_OK(stream->Finish(&buffer));
+  return buffer;
+}
+
+// [[Rcpp::export]]
+int64_t io___BufferOutputStream__Tell(
+    const std::shared_ptr<arrow::io::BufferOutputStream>& stream) {
+  int64_t res;
+  R_ERROR_NOT_OK(stream->Tell(&res));
+  return res;
+}
+
+// [[Rcpp::export]]
+void io___BufferOutputStream__Write(
+    const std::shared_ptr<arrow::io::BufferOutputStream>& stream, RawVector_ 
bytes) {
+  R_ERROR_NOT_OK(stream->Write(bytes.begin(), bytes.size()));
+}
+
+// ------ arrow::io::MockOutputStream
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::MockOutputStream> 
io___MockOutputStream__initialize() {
+  return std::make_shared<arrow::io::MockOutputStream>();
+}
+
+// [[Rcpp::export]]
+int64_t io___MockOutputStream__GetExtentBytesWritten(
+    const std::shared_ptr<arrow::io::MockOutputStream>& stream) {
+  return stream->GetExtentBytesWritten();
+}
+
+// ------ arrow::io::FixedSizeBufferWriter
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::FixedSizeBufferWriter> 
io___FixedSizeBufferWriter__initialize(
+    const std::shared_ptr<arrow::Buffer>& buffer) {
+  return std::make_shared<arrow::io::FixedSizeBufferWriter>(buffer);
+}
diff --git a/r/tests/testthat/test-RecordBatch.R 
b/r/tests/testthat/test-RecordBatch.R
index 0d9d0d9..c6a31bb 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -108,53 +108,49 @@ test_that("RecordBatch with 0 rows are supported", {
     )
   )
 
-  tf <- tempfile(); on.exit(unlink(tf))
-  batch$to_file(tf)
-
+  tf <- local_tempfile()
+  write_record_batch(batch, tf)
   res <- read_record_batch(tf)
   expect_equal(res, batch)
 })
 
-test_that("RecordBatch can output stream", {
-  tbl <- tibble::tibble(
-    int = 1:10, dbl = as.numeric(1:10),
-    lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
-    chr = letters[1:10]
-  )
-  record <- record_batch(tbl)
-  stream <- record$to_stream()
-
-  expect_gt(length(stream), 0)
-})
-
-test_that("read_record_batch handles ReadableFile and MemoryMappedFile 
(ARROW-3450)", {
+test_that("read_record_batch handles various streams (ARROW-3450, 
ARROW-3505)", {
   tbl <- tibble::tibble(
     int = 1:10, dbl = as.numeric(1:10),
     lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
     chr = letters[1:10]
   )
   batch <- record_batch(tbl)
-  tf <- tempfile(); on.exit(unlink(tf))
-  batch$to_file(tf)
+  tf <- local_tempfile()
+  write_record_batch(batch, tf)
 
-  bytes <- batch$to_stream()
+  bytes <- write_record_batch(batch, raw())
   buf_reader <- buffer_reader(bytes)
 
   batch1 <- read_record_batch(tf)
   batch2 <- read_record_batch(fs::path_abs(tf))
 
-  readable_file <- file_open(tf); on.exit(readable_file$Close())
+  readable_file <- close_on_exit(file_open(tf))
   batch3 <- read_record_batch(readable_file)
 
-  mmap_file <- mmap_open(tf); on.exit(mmap_file$Close())
+  mmap_file <- close_on_exit(mmap_open(tf))
   batch4 <- read_record_batch(mmap_file)
 
   batch5 <- read_record_batch(bytes)
   batch6 <- read_record_batch(buf_reader)
 
+  stream_reader <- record_batch_stream_reader(bytes)
+  batch7 <- read_record_batch(stream_reader)
+
+  file_reader <- record_batch_file_reader(tf)
+  batch8 <- read_record_batch(file_reader)
+
   expect_equal(batch, batch1)
   expect_equal(batch, batch2)
   expect_equal(batch, batch3)
   expect_equal(batch, batch4)
   expect_equal(batch, batch5)
+  expect_equal(batch, batch6)
+  expect_equal(batch, batch7)
+  expect_equal(batch, batch8)
 })
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index e6b2cad..681b7a9 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -17,35 +17,43 @@
 
 context("arrow::Table")
 
-test_that("read_table handles various input streams (ARROW-3450)", {
+test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", 
{
   tbl <- tibble::tibble(
     int = 1:10, dbl = as.numeric(1:10),
     lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
     chr = letters[1:10]
   )
   tab <- arrow::table(tbl)
-  tf <- tempfile(); on.exit(unlink(tf))
-  tab$to_file(tf)
+  tf <- local_tempfile()
+  write_table(tab, tf)
 
-  bytes <- tab$to_stream()
+  bytes <- write_table(tab, raw())
   buf_reader <- buffer_reader(bytes)
 
   tab1 <- read_table(tf)
   tab2 <- read_table(fs::path_abs(tf))
 
-  readable_file <- file_open(tf); on.exit(readable_file$Close())
+  readable_file <- close_on_exit(file_open(tf))
   tab3 <- read_table(readable_file)
 
-  mmap_file <- mmap_open(tf); on.exit(mmap_file$Close())
+  mmap_file <- close_on_exit(mmap_open(tf))
   tab4 <- read_table(mmap_file)
 
   tab5 <- read_table(bytes)
   tab6 <- read_table(buf_reader)
 
+  stream_reader <- record_batch_stream_reader(bytes)
+  tab7 <- read_table(stream_reader)
+
+  file_reader <- record_batch_file_reader(tf)
+  tab8 <- read_table(file_reader)
+
   expect_equal(tab, tab1)
   expect_equal(tab, tab2)
   expect_equal(tab, tab3)
   expect_equal(tab, tab4)
   expect_equal(tab, tab5)
   expect_equal(tab, tab6)
+  expect_equal(tab, tab7)
+  expect_equal(tab, tab8)
 })
diff --git a/r/tests/testthat/test-read-write.R 
b/r/tests/testthat/test-read-write.R
index c67fe72..bcf3922 100644
--- a/r/tests/testthat/test-read-write.R
+++ b/r/tests/testthat/test-read-write.R
@@ -84,8 +84,8 @@ test_that("arrow::table round trip", {
   for( i in seq_along(chunks_raw)){
     expect_equal(chunked_array_raw$chunk(i-1L), chunks_raw[[i]])
   }
-  tf <- tempfile(); on.exit(unlink(tf))
-  write_arrow(tbl, path = tf)
+  tf <- local_tempfile()
+  write_arrow(tbl, tf)
 
   res <- read_arrow(tf)
   expect_identical(tbl, res)
@@ -114,8 +114,8 @@ test_that("arrow::table round trip handles NA in integer 
and numeric", {
   expect_equal(tab$column(1)$type(), float64())
   expect_equal(tab$column(2)$type(), int8())
 
-  tf <- tempfile(); on.exit(unlink(tf))
-  write_arrow(tbl, path = tf)
+  tf <- local_tempfile()
+  write_arrow(tbl, tf)
 
   res <- read_arrow(tf)
   expect_identical(tbl, res)

Reply via email to