[ 
https://issues.apache.org/jira/browse/ARROW-16705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545765#comment-17545765
 ] 

Victor Bonilla commented on ARROW-16705:
----------------------------------------

Hey [~paul.e.taylor] , I tried skipping RecordBatchReader as you suggest and it 
works fine, thanks for the help.

A quick remark to avoid confusion about the pipeline declaration, it'll raise 
an error due to the missing callback (or promise). I personally solved with:
{code:javascript}
const stream = require('stream'); 

const pipe = stream.promises.pipeline(
  Readable.from(messagesToBatches(messagesAsyncIterable)),
  RecordBatchStreamWriter.throughNode(),
  fsWriter
);
await pipe;
{code}

> [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: ARROW-16705
>                 URL: https://issues.apache.org/jira/browse/ARROW-16705
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: JavaScript
>    Affects Versions: 8.0.0
>         Environment: Nodejs v16.13.0
>            Reporter: Victor Bonilla
>            Priority: Major
>              Labels: async, ipc, javascript, stream
>
> Trying to code a real-time stream from an async iterable of objects to an IPC 
> Streaming format file I'm getting a TypeError.
> The idea is to stream every message to the arrow file as soon as it arrives 
> without waiting to build the complete table to stream it. To take advantage 
> of the stream event handling, I'm using the functional approach of 
> [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module 
> (Nodejs v16.13.0).
> The async iterable contains messages that are JS objects containing different 
> data types, for example:
> {code:javascript}
> {
>     id: '6345',
>     product: 'foo',
>     price: 62.78,
>     created: '2022-05-01T16:01:00.105Z',
> }{code}
> Code to replicate the error:
> {code:javascript}
> const {
>     Struct, Field, Utf8, Float32, TimestampMillisecond,
>     RecordBatchReader, RecordBatchStreamWriter,
>     builderThroughAsyncIterable,
> } = require('apache-arrow')
> const fs = require("fs");
> const path = require("path");
> const {pipeline} = require('node:stream');
> const asyncIterable = {
>     [Symbol.asyncIterator]: async function* () {
>         while (true) {
>             const obj = {
>                 id: Math.floor(Math.random() * 10).toString(),
>                 product: 'foo',
>                 price: Math.random() + Math.floor(Math.random() * 10),
>                 created: new Date(),
>             }
>             yield obj;
>             // insert some asynchrony
>             await new Promise((r) => setTimeout(r, 1000));
>         }
>     }
> }
> async function streamToArrow(messagesAsyncIterable) {
>     const message_type = new Struct([
>         new Field('id', new Utf8, false),
>         new Field('product', new Utf8, false),
>         new Field('price', new Float32, false),
>         new Field('created', new TimestampMillisecond, false),
>     ]);
>     const builderOptions = {
>         type: message_type,
>         nullValues: [null, 'n/a', undefined],
>         highWaterMark: 30,
>         queueingStrategy: 'count',
>     };
>     const transform = builderThroughAsyncIterable(builderOptions);  
>     let file_path = './ipc_stream.arrow';
>     const fsWriter = fs.createWriteStream(file_path);
>     pipeline(
>         RecordBatchReader
>             .from(transform(messagesAsyncIterable))
>             .toNodeStream(),  // Throws TypeError: 
> RecordBatchReader.from(...).toNodeStream is not a function         
>         RecordBatchStreamWriter.throughNode(),
>         fsWriter,
>         (err, value) => {
>             if (err) {
>                 console.error(err);
>             } else {
>                 console.log(value, 'value returned');
>             }
>         }
>     ).on('close', () => {
>         console.log('Closed pipeline')
>     });
> }
> streamToArrow(asyncIterable){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to