Ah, I see now. I'll look into writing IPC streams directly, but discarding the first 8 bytes works for me in the short term.

Thanks,

Edward

On 3/2/21 11:37 AM, Micah Kornfield wrote:

    Is the ArrowStreamReader intended to operate on a generic InputStream,
    like from a file grabbed from S3, or does it need to have a certain
    structure? Has anyone else attempted to download files from S3 to
    Arrow
    structures using Java, and if so, what have you gotten to work?

No it isn't.  The documentation could be improved here. The ArrowStreamReader is intended to read IPC message streams (essentially a schema followed by either dictionary or record batches).  This is similar to how Arrow files are layed out [1] but differs in the first few bytes because of the magic "ARROW1" in the file format.

Depending on what you are trying to do, simply reading and writing Arrow IPC streams (not the file format) could be sufficient for your use-case.

Another option, if dictionary batches aren't used (and the writer of the file is relatively recent).  Discarding the first 8 bytes on the InputStream and then trying to read it should work.

[1] https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format

On Tue, Mar 2, 2021 at 8:19 AM Edward Morgan <[email protected] <mailto:[email protected]>> wrote:

    Hey everyone,

    I'm trying to implement a method to put .arrow files on S3 into
    memory
    as Arrow structures using Scala (Java). As far as I can tell, there
    isn't native Arrow support for this in Java (though I may be wrong).

    The AWS Java SDK allows you to retrieve an S3InputStream for an
    object,
    which is a subclass of InputStream. I've attempted to plug that
    into an
    instance of ArrowStreamReader, i.e.

    ```
    val obj: S3Object = client.getObject(getRequest)
    val is: S3ObjectInputStream = obj.getObjectContent
    val readerDirect = new ArrowStreamReader(is, new
    RootAllocator(Long.MaxValue))
    readerDirect.loadNextBatch()
    ```

    When the loadNextBatch() function is executed, it errors out with the
    following exception:

    ```
    Exception in thread "main" java.io.IOException: Unexpected end of
    stream
    trying to read message.
         at
    
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:700)
         at
    
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:57)
         at
    
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:164)
         at
    org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:170)
         at
    
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:161)
         at
    
org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:194)
         at
    
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:106)
         ...
    ```

    It seems to be checking the first few bytes of the stream to
    determine
    its size, which I assume S3InputStream doesn't provide.

    Weirdly enough, by casting the S3InputStream to a byte array and then
    putting it in a ByteArrayReadableSeekableByteChannel, I can use the
    ArrowFileReader just fine, though I'm not sure whether that's desired
    behavior:

    ```
    val obj: S3Object = s3Client.getObject(getRequest)
    val is: S3ObjectInputStream = obj.getObjectContent
    val bytes: Array[Byte] = IOUtils.toByteArray(is)
    val byteStream = new ByteArrayReadableSeekableByteChannel(bytes)
    val arrowReader = new ArrowFileReader(byteStream, new
    RootAllocator(Long.MaxValue))
    arrowReader.loadNextBatch()
    ```

    Is the ArrowStreamReader intended to operate on a generic
    InputStream,
    like from a file grabbed from S3, or does it need to have a certain
    structure? Has anyone else attempted to download files from S3 to
    Arrow
    structures using Java, and if so, what have you gotten to work?


    Thanks,

    Edward

Reply via email to