[ 
https://issues.apache.org/jira/browse/ARROW-16705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545007#comment-17545007
 ] 

Paul Taylor edited comment on ARROW-16705 at 6/1/22 6:07 PM:
-------------------------------------------------------------

[~vic-bonilla] The RecordBatchReader is used to transform the IPC format off 
the wire into RecordBatches. You don't need to use the RecordBatchReader, 
because the Builder already produces RecordBatches (or the Vectors that can go 
inside a RecordBatch).

Instead you can transform the StructVector produced by the Builder into a 
RecordBatch, and go straight to the IPC format with the writer like this:
{code:java}
const {Readable, pipeline} = require('node:stream');
const {RecordBatch, Schema} = require('apache-arrow')

const messagesToBatches = async function*(source) {
  let schema = undefined; 
  const transform = builderThroughAsyncIterable(builderOptions);
  for await (const vector of transform(source)) {
    schema ??= new Schema(vector.type.children);
    for (const chunk of vector.data) {
      yield new RecordBatch(schema, chunk);
    }
  }
}

pipeline(
  Readable.from(messagesToBatches(messagesAsyncIterable)),
  RecordBatchStreamWriter.throughNode(),
  fsWriter
) {code}
 

 


was (Author: paul.e.taylor):
[~vic-bonilla] The RecordBatchReader is used to transform the IPC format off 
the wire into RecordBatches. You don't need to use the RecordBatchReader, 
because the Builder already produces RecordBatches (or the Vectors that can go 
inside a RecordBatch).

Instead you can transform the StructVector produced by the Builder into a 
RecordBatch, and go straight to the IPC format with the writer like this:
{code:java}
const {Readable, pipeline} = require('node:stream');
const {RecordBatch, Schema} = require('apache-arrow')

const messagesToBatches = async function*(source) {
  const transform = builderThroughAsyncIterable(builderOptions);
  let schema = undefined;
  for await (const vector of transform(source)) {
    schema ??= new Schema(vector.type.children);
    for (const chunk of vector.data) {
      yield new RecordBatch(schema, chunk);
    }
  }
}

pipeline(
  Readable.from(messagesToBatches(messagesAsyncIterable)),
  RecordBatchStreamWriter.throughNode(),
  fsWriter
) {code}
 

 

> [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: ARROW-16705
>                 URL: https://issues.apache.org/jira/browse/ARROW-16705
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: JavaScript
>    Affects Versions: 8.0.0
>         Environment: Nodejs v16.13.0
>            Reporter: Victor Bonilla
>            Priority: Major
>              Labels: async, ipc, javascript, stream
>
> Trying to code a real-time stream from an async iterable of objects to an IPC 
> Streaming format file I'm getting a TypeError.
> The idea is to stream every message to the arrow file as soon as it arrives 
> without waiting to build the complete table to stream it. To take advantage 
> of the stream event handling, I'm using the functional approach of 
> [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module 
> (Nodejs v16.13.0).
> The async iterable contains messages that are JS objects containing different 
> data types, for example:
> {code:javascript}
> {
>     id: '6345',
>     product: 'foo',
>     price: 62.78,
>     created: '2022-05-01T16:01:00.105Z',
> }{code}
> Code to replicate the error:
> {code:javascript}
> const {
>     Struct, Field, Utf8, Float32, TimestampMillisecond,
>     RecordBatchReader, RecordBatchStreamWriter,
>     builderThroughAsyncIterable,
> } = require('apache-arrow')
> const fs = require("fs");
> const path = require("path");
> const {pipeline} = require('node:stream');
> const asyncIterable = {
>     [Symbol.asyncIterator]: async function* () {
>         while (true) {
>             const obj = {
>                 id: Math.floor(Math.random() * 10).toString(),
>                 product: 'foo',
>                 price: Math.random() + Math.floor(Math.random() * 10),
>                 created: new Date(),
>             }
>             yield obj;
>             // insert some asynchrony
>             await new Promise((r) => setTimeout(r, 1000));
>         }
>     }
> }
> async function streamToArrow(messagesAsyncIterable) {
>     const message_type = new Struct([
>         new Field('id', new Utf8, false),
>         new Field('product', new Utf8, false),
>         new Field('price', new Float32, false),
>         new Field('created', new TimestampMillisecond, false),
>     ]);
>     const builderOptions = {
>         type: message_type,
>         nullValues: [null, 'n/a', undefined],
>         highWaterMark: 30,
>         queueingStrategy: 'count',
>     };
>     const transform = builderThroughAsyncIterable(builderOptions);  
>     let file_path = './ipc_stream.arrow';
>     const fsWriter = fs.createWriteStream(file_path);
>     pipeline(
>         RecordBatchReader
>             .from(transform(messagesAsyncIterable))
>             .toNodeStream(),  // Throws TypeError: 
> RecordBatchReader.from(...).toNodeStream is not a function         
>         RecordBatchStreamWriter.throughNode(),
>         fsWriter,
>         (err, value) => {
>             if (err) {
>                 console.error(err);
>             } else {
>                 console.log(value, 'value returned');
>             }
>         }
>     ).on('close', () => {
>         console.log('Closed pipeline')
>     });
> }
> streamToArrow(asyncIterable){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to