This seems to help... still testing it though.

  Status GetFieldMetadata(int field_index, ArrayData* out) {
    auto nodes = metadata_->nodes();
    // pop off a field
    if (field_index >= static_cast<int>(nodes->size())) {
      return Status::Invalid("Ran out of field metadata, likely malformed");
    }
    const flatbuf::FieldNode* node = nodes->Get(field_index);

*    //out->length = node->length();*
*    out->length = metadata_->length();*
    out->null_count = node->null_count();
    out->offset = 0;
    return Status::OK();
  }

On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen <j...@jgm.org> wrote:

> So far it seems as if pyarrow is completely ignoring the
> RecordBatch.length field.  More info to follow...
>
> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen <j...@jgm.org> wrote:
>
>> Crikey! I'll do some testing around that and suggest some test cases to
>> ensure it continues to work, assuming that it does.
>>
>> -John
>>
>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney <wesmck...@gmail.com> wrote:
>>
>>> Thanks for the attachment, it's helpful.
>>>
>>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen <j...@jgm.org> wrote:
>>> >
>>> > Attachments referred to in previous two messages:
>>> >
>>> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
>>> >
>>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen <j...@jgm.org> wrote:
>>> >
>>> > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
>>> > > engagement.  In order to clarify things a bit, I am attaching a
>>> graphic of
>>> > > how our application will take record-wise (row-oriented) data from
>>> an event
>>> > > source and incrementally populate a pre-allocated Arrow-compatible
>>> buffer,
>>> > > including for variable-length fields.  (Obviously at this stage I am
>>> not
>>> > > using the reference implementation Arrow code, although that would
>>> be a
>>> > > goal.... to contribute that back to the project.)
>>> > >
>>> > > For sake of simplicity these are non-nullable fields.  As a result a
>>> > > reader of "y" that has no knowledge of the "utilized" metadata would
>>> get a
>>> > > long string (zeros, spaces, uninitialized, or whatever we decide for
>>> the
>>> > > pre-allocation model) for the record just beyond the last utilized
>>> record.
>>> > >
>>> > > I don't see any "big O"-analysis problems with this approach.  The
>>> > > space/time tradeoff is that we have to guess how much room to
>>> allocate for
>>> > > variable-length fields.  We will probably almost always be wrong.
>>> This
>>> > > ends up in "wasted" space.  However, we can do calculations based on
>>> these
>>> > > partially filled batches that take full advantage of the columnar
>>> layout.
>>> > >  (Here I've shown the case where we had too little variable-length
>>> buffer
>>> > > set aside, resulting in "wasted" rows.  The flip side is that rows
>>> achieve
>>> > > full [1] utilization but there is wasted variable-length buffer if
>>> we guess
>>> > > incorrectly in the other direction.)
>>> > >
>>> > > I proposed a few things that are "nice to have" but really what I'm
>>> eyeing
>>> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see
>>> that some
>>> > > of the rows in a RecordBatch are not to be read, based on the new
>>> > > "utilized" (or whatever name) metadata.  That single tweak to the
>>> > > metadata-- and readers honoring it-- is the core of the proposal.
>>> > >  (Proposal 4.)  This would indicate that the attached example (or
>>> something
>>> > > similar) is the blessed approach for those seeking to accumulate
>>> events and
>>> > > process them while still expecting more data, with the
>>> heavier-weight task
>>> > > of creating a new pre-allocated batch being a rare occurrence.
>>> > >
>>>
>>> So the "length" field in RecordBatch is already the utilized number of
>>> rows. The body buffers can certainly have excess unused space. So your
>>> application can mutate Flatbuffer "length" field in-place as new
>>> records are filled in.
>>>
>>> > > Notice that the mutability is only in the sense of "appending."  The
>>> > > current doctrine of total immutability would be revised to refer to
>>> the
>>> > > immutability of only the already-populated rows.
>>> > >
>>> > > It gives folks an option other than choosing the lesser of two
>>> evils: on
>>> > > the one hand, length 1 RecordBatches that don't result in a stream
>>> that is
>>> > > computationally efficient.  On the other hand, adding artificial
>>> latency by
>>> > > accumulating events before "freezing" a larger batch and only then
>>> making
>>> > > it available to computation.
>>> > >
>>> > > -John
>>> > >
>>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney <wesmck...@gmail.com>
>>> wrote:
>>> > >
>>> > >> hi John,
>>> > >>
>>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <j...@jgm.org>
>>> wrote:
>>> > >> >
>>> > >> > During my time building financial analytics and trading systems
>>> (23
>>> > >> years!), both the "batch processing" and "stream processing"
>>> paradigms have
>>> > >> been extensively used by myself and by colleagues.
>>> > >> >
>>> > >> > Unfortunately, the tools used in these paradigms have not
>>> successfully
>>> > >> overlapped.  For example, an analyst might use a Python notebook
>>> with
>>> > >> pandas to do some batch analysis.  Then, for acceptable latency and
>>> > >> throughput, a C++ programmer must implement the same schemas and
>>> processing
>>> > >> logic in order to analyze real-time data for real-time decision
>>> support.
>>> > >> (Time horizons often being sub-second or even sub-millisecond for an
>>> > >> acceptable reaction to an event.  The most aggressive software-based
>>> > >> systems, leaving custom hardware aside other than things like
>>> kernel-bypass
>>> > >> NICs, target 10s of microseconds for a full round trip from data
>>> ingestion
>>> > >> to decision.)
>>> > >> >
>>> > >> > As a result, TCO is more than doubled.  A doubling can be
>>> accounted for
>>> > >> by two implementations that share little or nothing in the way of
>>> > >> architecture.  Then additional effort is required to ensure that
>>> these
>>> > >> implementations continue to behave the same way and are upgraded in
>>> > >> lock-step.
>>> > >> >
>>> > >> > Arrow purports to be a "bridge" technology that eases one of the
>>> pain
>>> > >> points of working in different ecosystems by providing a common
>>> event
>>> > >> stream data structure.  (Discussion of common processing techniques
>>> is
>>> > >> beyond the scope of this discussion.  Suffice it to say that a
>>> streaming
>>> > >> algo can always be run in batch, but not vice versa.)
>>> > >> >
>>> > >> > Arrow seems to be growing up primarily in the batch processing
>>> world.
>>> > >> One publication notes that "the missing piece is streaming, where
>>> the
>>> > >> velocity of incoming data poses a special challenge. There are some
>>> early
>>> > >> experiments to populate Arrow nodes in microbatches..." [1]  Part
>>> our our
>>> > >> discussion could be a response to this observation.  In what ways
>>> is it
>>> > >> true or false?  What are the plans to remedy this shortcoming, if it
>>> > >> exists?  What steps can be taken now to ease the transition to
>>> low-latency
>>> > >> streaming support in the future?
>>> > >> >
>>> > >>
>>> > >> Arrow columnar format describes a collection of records with values
>>> > >> between records being placed adjacent to each other in memory. If
>>> you
>>> > >> break that assumption, you don't have a columnar format anymore. So
>>> I
>>> > >> don't where the "shortcoming" is. We don't have any software in the
>>> > >> project for managing the creation of record batches in a streaming
>>> > >> application, but this seems like an interesting development
>>> expansion
>>> > >> area for the project.
>>> > >>
>>> > >> Note that many contributors have already expanded the surface area
>>> of
>>> > >> what's in the Arrow libraries in many directions.
>>> > >>
>>> > >> Streaming data collection is yet another area of expansion, but
>>> > >> _personally_ it is not on the short list of projects that I will
>>> > >> personally be working on (or asking my direct or indirect colleagues
>>> > >> to work on). Since this is a project made up of volunteers, it's up
>>> to
>>> > >> contributors to drive new directions for the project by writing
>>> design
>>> > >> documents and pull requests.
>>> > >>
>>> > >> > In my own experience, a successful strategy for stream processing
>>> where
>>> > >> context (i.e. recent past events) must be considered by
>>> calculations is to
>>> > >> pre-allocate memory for event collection, to organize this memory
>>> in a
>>> > >> columnar layout, and to run incremental calculations at each event
>>> ingress
>>> > >> into the partially populated memory.  [Fig 1]  When the
>>> pre-allocated
>>> > >> memory has been exhausted, allocate a new batch of column-wise
>>> memory and
>>> > >> continue.  When a batch is no longer pertinent to the calculation
>>> look-back
>>> > >> window, free the memory back to the heap or pool.
>>> > >> >
>>> > >> > Here we run into the first philosophical barrier with Arrow, where
>>> > >> "Arrow data is immutable." [2]  There is currently little or no
>>> > >> consideration for reading a partially constructed RecordBatch, e.g.
>>> one
>>> > >> with only some of the rows containing event data at the present
>>> moment in
>>> > >> time.
>>> > >> >
>>> > >>
>>> > >> It seems like the use case you have heavily revolves around mutating
>>> > >> pre-allocated, memory-mapped datasets that are being consumed by
>>> other
>>> > >> processes on the same host. So you want to incrementally fill some
>>> > >> memory-mapped data that you've already exposed to another process.
>>> > >>
>>> > >> Because of the memory layout for variable-size and nested cells, it
>>> is
>>> > >> impossible in general to mutate Arrow record batches. This is not a
>>> > >> philosophical position: this was a deliberate technical decision to
>>> > >> guarantee data locality for scans and predictable O(1) random access
>>> > >> on variable-length and nested data.
>>> > >>
>>> > >> Technically speaking, you can mutate memory in-place for fixed-size
>>> > >> types in-RAM or on-disk, if you want to. It's an "off-label" use
>>> case
>>> > >> but no one is saying you can't do this.
>>> > >>
>>> > >> > Proposal 1: Shift the Arrow "immutability" doctrine to apply to
>>> > >> populated records of a RecordBatch instead of to all records?
>>> > >> >
>>> > >>
>>> > >> Per above, this is impossible in generality. You can't alter
>>> > >> variable-length or nested records without rewriting the record
>>> batch.
>>> > >>
>>> > >> > As an alternative approach, RecordBatch can be used as a single
>>> Record
>>> > >> (batch length of one).  [Fig 2]  In this approach the benefit of the
>>> > >> columnar layout is lost for look-back window processing.
>>> > >> >
>>> > >> > Another alternative approach is to collect an entire RecordBatch
>>> before
>>> > >> stepping through it with the stream processing calculation. [Fig
>>> 3]  With
>>> > >> this approach some columnar processing benefit can be recovered,
>>> however
>>> > >> artificial latency is introduced.  As tolerance for delays in
>>> decision
>>> > >> support dwindles, this model will be of increasingly limited
>>> value.  It is
>>> > >> already unworkable in many areas of finance.
>>> > >> >
>>> > >> > When considering the Arrow format and variable length values such
>>> as
>>> > >> strings, the pre-allocation approach (and subsequent processing of a
>>> > >> partially populated batch) encounters a hiccup.  How do we know the
>>> amount
>>> > >> of buffer space to pre-allocate?  If we allocate too much buffer for
>>> > >> variable-length data, some of it will be unused.  If we allocate
>>> too little
>>> > >> buffer for variable-length data, some row entities will be unusable.
>>> > >> (Additional "rows" remain but when populating string fields there
>>> is no
>>> > >> longer string storage space to point them to.)
>>> > >> >
>>> > >> > As with many optimization space/time tradeoff problems, the
>>> solution
>>> > >> seems to be to guess.  Pre-allocation sets aside variable length
>>> buffer
>>> > >> storage based on the typical "expected size" of the variable length
>>> data.
>>> > >> This can result in some unused rows, as discussed above.  [Fig 4]
>>> In fact
>>> > >> it will necessarily result in one unused row unless the last of each
>>> > >> variable length field in the last row exactly fits into the
>>> remaining space
>>> > >> in the variable length data buffer.  Consider the case where there
>>> is more
>>> > >> variable length buffer space than data:
>>> > >> >
>>> > >> > Given variable-length field x, last row index of y, variable
>>> length
>>> > >> buffer v, beginning offset into v of o:
>>> > >> >     x[y] begins at o
>>> > >> >     x[y] ends at the offset of the next record, there is no next
>>> > >> record, so x[y] ends after the total remaining area in variable
>>> length
>>> > >> buffer... however, this is too much!
>>> > >> >
>>> > >>
>>> > >> It isn't clear to me what you're proposing. It sounds like you want
>>> a
>>> > >> major redesign of the columnar format to permit in-place mutation of
>>> > >> strings. I doubt that would be possible at this point.
>>> > >>
>>> > >> > Proposal 2: [low priority] Create an "expected length" statistic
>>> in the
>>> > >> Schema for variable length fields?
>>> > >> >
>>> > >> > Proposal 3: [low priority] Create metadata to store the index into
>>> > >> variable-length data that represents the end of the value for the
>>> last
>>> > >> record?  Alternatively: a row is "wasted," however pre-allocation is
>>> > >> inexact to begin with.
>>> > >> >
>>> > >> > Proposal 4: Add metadata to indicate to a RecordBatch reader that
>>> only
>>> > >> some of the rows are to be utilized.  [Fig 5]  This is useful not
>>> only when
>>> > >> processing a batch that is still under construction, but also for
>>> "closed"
>>> > >> batches that were not able to be fully populated due to an imperfect
>>> > >> projection of variable length storage.
>>> > >> >
>>> > >> > On this last proposal, Wes has weighed in:
>>> > >> >
>>> > >> > "I believe your use case can be addressed by pre-allocating record
>>> > >> batches and maintaining application level metadata about what
>>> portion of
>>> > >> the record batches has been 'filled' (so the unfilled records can be
>>> > >> dropped by slicing). I don't think any change to the binary
>>> protocol is
>>> > >> warranted." [3]
>>> > >> >
>>> > >>
>>> > >> My personal opinion is that a solution to the problem you have can
>>> be
>>> > >> composed from the components (combined with some new pieces of code)
>>> > >> that we have developed in the project already.
>>> > >>
>>> > >> So the "application level" could be an add-on C++ component in the
>>> > >> Apache Arrow project. Call it a "memory-mapped streaming data
>>> > >> collector" that pre-allocates on-disk record batches (of only
>>> > >> fixed-size or even possibly dictionary-encoded types) and then fills
>>> > >> them incrementally as bits of data come in, updating some auxiliary
>>> > >> metadata that other processes can use to determine what portion of
>>> the
>>> > >> Arrow IPC messages to "slice off".
>>> > >>
>>> > >> > Concerns with positioning this at the app level:
>>> > >> >
>>> > >> > 1- Do we need to address or begin to address the overall concern
>>> of how
>>> > >> Arrow data structures are to be used in "true" (non-microbatch)
>>> streaming
>>> > >> environments, cf [1] in the last paragraph, as a *first-class* usage
>>> > >> pattern?  If so, is now the time?
>>> > >> >if you break that design invariant you don't have a columnar format
>>> > >> anymore.
>>> > >>
>>> > >> Arrow provides a binary protocol for describing a payload data on
>>> the
>>> > >> wire (or on-disk, or in-memory, all the same). I don't see how it is
>>> > >> in conflict with streaming environments, unless the streaming
>>> > >> application has difficulty collecting multiple records into an Arrow
>>> > >> record batches. In that case, it's a system trade-off. Currently
>>> > >> people are using Avro with Kafka and sending one record at a time,
>>> but
>>> > >> then they're also spending a lot of CPU cycles in serialization.
>>> > >>
>>> > >> > 2- If we can even make broad-stroke attempts at data structure
>>> features
>>> > >> that are likely to be useful when streaming becomes a first class
>>> citizen,
>>> > >> it reduces the chances of "breaking" format changes in the future.
>>> I do
>>> > >> not believe the proposals place an undue hardship on batch
>>> processing
>>> > >> paradigms.  We are currently discussing making a breaking change to
>>> the IPC
>>> > >> format [4], so there is a window of opportunity to consider
>>> features useful
>>> > >> for streaming?  (Current clients can feel free to ignore the
>>> proposed
>>> > >> "utilized" metadata of RecordBatch.)
>>> > >> >
>>> > >>
>>> > >> I think the perception that streaming is not a first class citizen
>>> is
>>> > >> an editorialization (e.g. the article you cited was an editorial
>>> > >> written by an industry analyst based on an interview with Jacques
>>> and
>>> > >> me). Columnar data formats in general are designed to work with more
>>> > >> than one value at a time (which we are calling a "batch" but I think
>>> > >> that's conflating terminology with the "batch processing" paradigm
>>> of
>>> > >> Hadoop, etc.),
>>> > >>
>>> > >> > 3- Part of the promise of Arrow is that applications are not a
>>> world
>>> > >> unto themselves, but interoperate with other Arrow-compliant
>>> systems.  In
>>> > >> my case, I would like users to be able to examine RecordBatchs in
>>> tools
>>> > >> such as pyarrow without needing to be aware of any streaming
>>> app-specific
>>> > >> metadata.  For example, a researcher may pull in an IPC "File"
>>> containing N
>>> > >> RecordBatch messages corresponding to those in Fig 4.  I would very
>>> much
>>> > >> like for this casual user to not have to apply N slice operations
>>> based on
>>> > >> out-of-band data to get to the data that is relevant.
>>> > >> >
>>> > >>
>>> > >> Per above, should this become a standard enough use case, I think
>>> that
>>> > >> code can be developed in the Apache project to address it.
>>> > >>
>>> > >> > Devil's advocate:
>>> > >> >
>>> > >> > 1- Concurrent access to a mutable (growing) RecordBatch will
>>> require
>>> > >> synchronization of some sort to get consistent metadata reads.
>>> Since the
>>> > >> above proposals do not specify how this synchronization will occur
>>> for
>>> > >> tools such as pyarrow (we can imagine a Python user getting
>>> synchronized
>>> > >> access to File metadata and mapping a read-only area before the
>>> writer is
>>> > >> allowed to continue "appending" to this batch, or batches to this
>>> File),
>>> > >> some "unusual" code will be required anyway, so what is the harm of
>>> > >> consulting side-band data for slicing all the batches as part of
>>> this
>>> > >> "unusual" code?  [Potential response: Yes, but it is still one less
>>> thing
>>> > >> to worry about, and perhaps first-class support for common
>>> synchronization
>>> > >> patterns can be forthcoming?  These patterns may not require
>>> further format
>>> > >> changes?]
>>> > >> >
>>> > >> > My overall concern is that I see a lot of wasted effort dealing
>>> with
>>> > >> the "impedance mismatch" between batch oriented and streaming
>>> systems.  I
>>> > >> believe that "best practices" will begin (and continue!) to prefer
>>> tools
>>> > >> that help bridge the gap.  Certainly this is the case in my own
>>> work.  I
>>> > >> agree with the appraisal at the end of the ZDNet article.  If the
>>> above is
>>> > >> not a helpful solution, what other steps can be made?  Or if Arrow
>>> is
>>> > >> intentionally confined to batch processing for the foreseeable
>>> future (in
>>> > >> terms of first-class support), I'm interested in the rationale.
>>> Perhaps
>>> > >> the feeling is that we avoid scope creep now (which I understand
>>> can be
>>> > >> never-ending) even if it means a certain breaking change in the
>>> future?
>>> > >> >
>>> > >>
>>> > >> There's some semantic issues with what "streaming" and "batch"
>>> means.
>>> > >> When people see "streaming" nowadays they think "Kafka" (or
>>> > >> Kafka-like). Single events flow in and out of streaming computation
>>> > >> nodes (e.g. like https://apache.github.io/incubator-heron/ or
>>> others).
>>> > >> The "streaming" is more about computational semantics than data
>>> > >> representation.
>>> > >>
>>> > >> The Arrow columnar format fundamentally deals with multiple records
>>> at
>>> > >> a time (you can have a record batch with size 1, but that is not
>>> going
>>> > >> to be efficient). But I do not think Arrow is "intentially confined"
>>> > >> to batch processing. If it makes sense to use a columnar format to
>>> > >> represent data in a streaming application, then you can certainly
>>> use
>>> > >> it for that. I'm aware of people successfully using Arrow with
>>> Kafka,
>>> > >> for example.
>>> > >>
>>> > >> - Wes
>>> > >>
>>> > >> > Who else encounters the need to mix/match batch and streaming,
>>> and what
>>> > >> are your experiences?
>>> > >> >
>>> > >> > Thanks for the further consideration and discussion!
>>> > >> >
>>> > >> > [1] https://zd.net/2H0LlBY
>>> > >> > [2] https://arrow.apache.org/docs/python/data.html
>>> > >> > [3] https://bit.ly/2J5sENZ
>>> > >> > [4] https://bit.ly/2Yske8L
>>> > >>
>>> > >
>>>
>>

Reply via email to