Hi Wes,

My C++ code is attached. I tried to also read it from C++ by opening the disk 
file as a MemoryMappedFile and get the same error when I make a 
RecordBatchReader on the mmap'ed file, ie, "not an Arrow file".

There must be some magical sequence of writes needed to make the file kosher.

Thanks for helping.

Ken

p.s. I read your blog about relocating to Nashville.  Was my stomping grounds 
back in the 80s. Memories.

________________________________
From: Wes McKinney <[email protected]>
Sent: Sunday, January 24, 2021 11:41 AM
To: [email protected] <[email protected]>
Subject: Re: [python] not an arrow file

Can you show your C++ code?

On Sun, Jan 24, 2021 at 8:10 AM Teh, Kenneth M. 
<[email protected]<mailto:[email protected]>> wrote:
Just started with arrow...

I wrote a record batch to a file using ipc::MakeFileWriter to create a writer 
and writer->WriteRecordBatch in a C++ program and tried to read it in python 
with:

[] import pyarrow as pa
[] reader = pa.ipc.open_file("myfile")


It raises the ArrowInvalid with the message "not an arrow file".

If I write it out as a Table in feather format, I can read it in python. But I 
want to write large files on the order of 100GB or more and then read them back 
into python as pandas dataframes or something similar.

So, I switched to using an ipc writer.

Can something point me in the right direction?  Thanks.

Ken
#include <fmt/format.h>
#include <arrow/status.h>
#include <arrow/result.h>
#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/ipc/writer.h>
#include <arrow/io/file.h>
#include "ran3.h"

arrow::Result<std::shared_ptr<arrow::RecordBatch>> make_batch(
        const std::shared_ptr<arrow::Schema> &schema,
        const int64_t nrows)
{
    using func_return_type = arrow::Result<std::shared_ptr<
        arrow::RecordBatch>>;
    arrow::Status st;

    // All schema fields are conveniently int32_t's.
    std::vector<int32_t> v(nrows, 0);
    const int narrays = schema->num_fields();
    std::vector<std::shared_ptr<arrow::Array>> columns(narrays);
    for (int i = 0; i < narrays; ++i) {
        arrow::Result<std::shared_ptr<arrow::Array>> res;
        arrow::Int32Builder b;

        for (int j = 0; j < nrows; ++j) v[j] = ran3(0);
        b.Reset();
        st = b.AppendValues(v);
        if (not st.ok()) return func_return_type(st);
        st = b.Finish(&columns[i]);
        if (not st.ok()) return func_return_type(st);
    }

    auto rb = arrow::RecordBatch::Make(schema, nrows, columns);
    return func_return_type(rb);
}

int main(int argc, char *argv[])
{
    int nrecs = 1;

    if (argc > 1) nrecs = std::stoi(argv[1]);

    /* Make a schema for our record batches.
     */
    std::shared_ptr<arrow::Field> adc1, adc2;
    std::shared_ptr<arrow::Schema> schema;
    adc1 = arrow::field("adc1", arrow::int32());
    adc2 = arrow::field("adc2", arrow::int32());
    schema = arrow::schema({adc1, adc2});

    /* Make and IPC writer so we can write record batches.
     */
    const std::string fnm = "writing.arrow";
    auto fres = arrow::io::FileOutputStream::Open(fnm);
    if (not fres.ok()) {
        fmt::print("open file failed, <{}>, \"{}\"\n",
                fres.status().code(), fres.status().message());
        return 1;
    }
    auto fout = fres.ValueOrDie();
    
    auto wres = arrow::ipc::MakeFileWriter(fout, schema);
    if (not wres.ok()) {
        fmt::print("make ipc writer failed, <{}>, \"{}\"\n",
                wres.status().code(), wres.status().message());
        return 1;
    }
    auto writer = wres.ValueOrDie();

    const int64_t nrows = 500;
    ran3(12345);
    for (int i = 0; i < nrecs; ++i) {
        arrow::Result<std::shared_ptr<arrow::RecordBatch>> bres;
        std::shared_ptr<arrow::RecordBatch> rb;
        arrow::Status st;

        bres = make_batch(schema, nrows);
        if (not bres.ok()) {
            fmt::print("make_batch failed on i={}, <{}>, \"{}\"\n",
                    i, bres.status().code(), bres.status().message());
            break;
        }
        rb = bres.ValueOrDie();
        st = writer->WriteRecordBatch(*rb);
        if (not st.ok()) {
            fmt::print("write batch {} failed, <{}>, \"{}\"\n",
                    i, st.code(), st.message());
            break;
        }
    }

    static_cast<void>(fout->Close());
    return 0;
}

Reply via email to