[ https://issues.apache.org/jira/browse/ARROW-16705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Bonilla closed ARROW-16705. ---------------------------------- > [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a > function > ---------------------------------------------------------------------------------- > > Key: ARROW-16705 > URL: https://issues.apache.org/jira/browse/ARROW-16705 > Project: Apache Arrow > Issue Type: Bug > Components: JavaScript > Affects Versions: 8.0.0 > Environment: Nodejs v16.13.0 > Reporter: Victor Bonilla > Priority: Major > Labels: async, ipc, javascript, stream > > Trying to code a real-time stream from an async iterable of objects to an IPC > Streaming format file I'm getting a TypeError. > The idea is to stream every message to the arrow file as soon as it arrives > without waiting to build the complete table to stream it. To take advantage > of the stream event handling, I'm using the functional approach of > [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module > (Nodejs v16.13.0). > The async iterable contains messages that are JS objects containing different > data types, for example: > {code:javascript} > { > id: '6345', > product: 'foo', > price: 62.78, > created: '2022-05-01T16:01:00.105Z', > }{code} > Code to replicate the error: > {code:javascript} > const { > Struct, Field, Utf8, Float32, TimestampMillisecond, > RecordBatchReader, RecordBatchStreamWriter, > builderThroughAsyncIterable, > } = require('apache-arrow') > const fs = require("fs"); > const path = require("path"); > const {pipeline} = require('node:stream'); > const asyncIterable = { > [Symbol.asyncIterator]: async function* () { > while (true) { > const obj = { > id: Math.floor(Math.random() * 10).toString(), > product: 'foo', > price: Math.random() + Math.floor(Math.random() * 10), > created: new Date(), > } > yield obj; > // insert some asynchrony > await new Promise((r) => setTimeout(r, 1000)); > } > } > } > async function streamToArrow(messagesAsyncIterable) { > const message_type = new Struct([ > new Field('id', new Utf8, false), > new Field('product', new Utf8, false), > new Field('price', new Float32, false), > new Field('created', new TimestampMillisecond, false), > ]); > const builderOptions = { > type: message_type, > nullValues: [null, 'n/a', undefined], > highWaterMark: 30, > queueingStrategy: 'count', > }; > const transform = builderThroughAsyncIterable(builderOptions); > let file_path = './ipc_stream.arrow'; > const fsWriter = fs.createWriteStream(file_path); > pipeline( > RecordBatchReader > .from(transform(messagesAsyncIterable)) > .toNodeStream(), // Throws TypeError: > RecordBatchReader.from(...).toNodeStream is not a function > RecordBatchStreamWriter.throughNode(), > fsWriter, > (err, value) => { > if (err) { > console.error(err); > } else { > console.log(value, 'value returned'); > } > } > ).on('close', () => { > console.log('Closed pipeline') > }); > } > streamToArrow(asyncIterable){code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)