Attachments referred to in previous two messages:
https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0

On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen <j...@jgm.org> wrote:

> Thanks, Wes, for the thoughtful reply.  I really appreciate the
> engagement.  In order to clarify things a bit, I am attaching a graphic of
> how our application will take record-wise (row-oriented) data from an event
> source and incrementally populate a pre-allocated Arrow-compatible buffer,
> including for variable-length fields.  (Obviously at this stage I am not
> using the reference implementation Arrow code, although that would be a
> goal.... to contribute that back to the project.)
>
> For sake of simplicity these are non-nullable fields.  As a result a
> reader of "y" that has no knowledge of the "utilized" metadata would get a
> long string (zeros, spaces, uninitialized, or whatever we decide for the
> pre-allocation model) for the record just beyond the last utilized record.
>
> I don't see any "big O"-analysis problems with this approach.  The
> space/time tradeoff is that we have to guess how much room to allocate for
> variable-length fields.  We will probably almost always be wrong.  This
> ends up in "wasted" space.  However, we can do calculations based on these
> partially filled batches that take full advantage of the columnar layout.
>  (Here I've shown the case where we had too little variable-length buffer
> set aside, resulting in "wasted" rows.  The flip side is that rows achieve
> full [1] utilization but there is wasted variable-length buffer if we guess
> incorrectly in the other direction.)
>
> I proposed a few things that are "nice to have" but really what I'm eyeing
> is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some
> of the rows in a RecordBatch are not to be read, based on the new
> "utilized" (or whatever name) metadata.  That single tweak to the
> metadata-- and readers honoring it-- is the core of the proposal.
>  (Proposal 4.)  This would indicate that the attached example (or something
> similar) is the blessed approach for those seeking to accumulate events and
> process them while still expecting more data, with the heavier-weight task
> of creating a new pre-allocated batch being a rare occurrence.
>
> Notice that the mutability is only in the sense of "appending."  The
> current doctrine of total immutability would be revised to refer to the
> immutability of only the already-populated rows.
>
> It gives folks an option other than choosing the lesser of two evils: on
> the one hand, length 1 RecordBatches that don't result in a stream that is
> computationally efficient.  On the other hand, adding artificial latency by
> accumulating events before "freezing" a larger batch and only then making
> it available to computation.
>
> -John
>
> On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney <wesmck...@gmail.com> wrote:
>
>> hi John,
>>
>> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <j...@jgm.org> wrote:
>> >
>> > During my time building financial analytics and trading systems (23
>> years!), both the "batch processing" and "stream processing" paradigms have
>> been extensively used by myself and by colleagues.
>> >
>> > Unfortunately, the tools used in these paradigms have not successfully
>> overlapped.  For example, an analyst might use a Python notebook with
>> pandas to do some batch analysis.  Then, for acceptable latency and
>> throughput, a C++ programmer must implement the same schemas and processing
>> logic in order to analyze real-time data for real-time decision support.
>> (Time horizons often being sub-second or even sub-millisecond for an
>> acceptable reaction to an event.  The most aggressive software-based
>> systems, leaving custom hardware aside other than things like kernel-bypass
>> NICs, target 10s of microseconds for a full round trip from data ingestion
>> to decision.)
>> >
>> > As a result, TCO is more than doubled.  A doubling can be accounted for
>> by two implementations that share little or nothing in the way of
>> architecture.  Then additional effort is required to ensure that these
>> implementations continue to behave the same way and are upgraded in
>> lock-step.
>> >
>> > Arrow purports to be a "bridge" technology that eases one of the pain
>> points of working in different ecosystems by providing a common event
>> stream data structure.  (Discussion of common processing techniques is
>> beyond the scope of this discussion.  Suffice it to say that a streaming
>> algo can always be run in batch, but not vice versa.)
>> >
>> > Arrow seems to be growing up primarily in the batch processing world.
>> One publication notes that "the missing piece is streaming, where the
>> velocity of incoming data poses a special challenge. There are some early
>> experiments to populate Arrow nodes in microbatches..." [1]  Part our our
>> discussion could be a response to this observation.  In what ways is it
>> true or false?  What are the plans to remedy this shortcoming, if it
>> exists?  What steps can be taken now to ease the transition to low-latency
>> streaming support in the future?
>> >
>>
>> Arrow columnar format describes a collection of records with values
>> between records being placed adjacent to each other in memory. If you
>> break that assumption, you don't have a columnar format anymore. So I
>> don't where the "shortcoming" is. We don't have any software in the
>> project for managing the creation of record batches in a streaming
>> application, but this seems like an interesting development expansion
>> area for the project.
>>
>> Note that many contributors have already expanded the surface area of
>> what's in the Arrow libraries in many directions.
>>
>> Streaming data collection is yet another area of expansion, but
>> _personally_ it is not on the short list of projects that I will
>> personally be working on (or asking my direct or indirect colleagues
>> to work on). Since this is a project made up of volunteers, it's up to
>> contributors to drive new directions for the project by writing design
>> documents and pull requests.
>>
>> > In my own experience, a successful strategy for stream processing where
>> context (i.e. recent past events) must be considered by calculations is to
>> pre-allocate memory for event collection, to organize this memory in a
>> columnar layout, and to run incremental calculations at each event ingress
>> into the partially populated memory.  [Fig 1]  When the pre-allocated
>> memory has been exhausted, allocate a new batch of column-wise memory and
>> continue.  When a batch is no longer pertinent to the calculation look-back
>> window, free the memory back to the heap or pool.
>> >
>> > Here we run into the first philosophical barrier with Arrow, where
>> "Arrow data is immutable." [2]  There is currently little or no
>> consideration for reading a partially constructed RecordBatch, e.g. one
>> with only some of the rows containing event data at the present moment in
>> time.
>> >
>>
>> It seems like the use case you have heavily revolves around mutating
>> pre-allocated, memory-mapped datasets that are being consumed by other
>> processes on the same host. So you want to incrementally fill some
>> memory-mapped data that you've already exposed to another process.
>>
>> Because of the memory layout for variable-size and nested cells, it is
>> impossible in general to mutate Arrow record batches. This is not a
>> philosophical position: this was a deliberate technical decision to
>> guarantee data locality for scans and predictable O(1) random access
>> on variable-length and nested data.
>>
>> Technically speaking, you can mutate memory in-place for fixed-size
>> types in-RAM or on-disk, if you want to. It's an "off-label" use case
>> but no one is saying you can't do this.
>>
>> > Proposal 1: Shift the Arrow "immutability" doctrine to apply to
>> populated records of a RecordBatch instead of to all records?
>> >
>>
>> Per above, this is impossible in generality. You can't alter
>> variable-length or nested records without rewriting the record batch.
>>
>> > As an alternative approach, RecordBatch can be used as a single Record
>> (batch length of one).  [Fig 2]  In this approach the benefit of the
>> columnar layout is lost for look-back window processing.
>> >
>> > Another alternative approach is to collect an entire RecordBatch before
>> stepping through it with the stream processing calculation. [Fig 3]  With
>> this approach some columnar processing benefit can be recovered, however
>> artificial latency is introduced.  As tolerance for delays in decision
>> support dwindles, this model will be of increasingly limited value.  It is
>> already unworkable in many areas of finance.
>> >
>> > When considering the Arrow format and variable length values such as
>> strings, the pre-allocation approach (and subsequent processing of a
>> partially populated batch) encounters a hiccup.  How do we know the amount
>> of buffer space to pre-allocate?  If we allocate too much buffer for
>> variable-length data, some of it will be unused.  If we allocate too little
>> buffer for variable-length data, some row entities will be unusable.
>> (Additional "rows" remain but when populating string fields there is no
>> longer string storage space to point them to.)
>> >
>> > As with many optimization space/time tradeoff problems, the solution
>> seems to be to guess.  Pre-allocation sets aside variable length buffer
>> storage based on the typical "expected size" of the variable length data.
>> This can result in some unused rows, as discussed above.  [Fig 4]  In fact
>> it will necessarily result in one unused row unless the last of each
>> variable length field in the last row exactly fits into the remaining space
>> in the variable length data buffer.  Consider the case where there is more
>> variable length buffer space than data:
>> >
>> > Given variable-length field x, last row index of y, variable length
>> buffer v, beginning offset into v of o:
>> >     x[y] begins at o
>> >     x[y] ends at the offset of the next record, there is no next
>> record, so x[y] ends after the total remaining area in variable length
>> buffer... however, this is too much!
>> >
>>
>> It isn't clear to me what you're proposing. It sounds like you want a
>> major redesign of the columnar format to permit in-place mutation of
>> strings. I doubt that would be possible at this point.
>>
>> > Proposal 2: [low priority] Create an "expected length" statistic in the
>> Schema for variable length fields?
>> >
>> > Proposal 3: [low priority] Create metadata to store the index into
>> variable-length data that represents the end of the value for the last
>> record?  Alternatively: a row is "wasted," however pre-allocation is
>> inexact to begin with.
>> >
>> > Proposal 4: Add metadata to indicate to a RecordBatch reader that only
>> some of the rows are to be utilized.  [Fig 5]  This is useful not only when
>> processing a batch that is still under construction, but also for "closed"
>> batches that were not able to be fully populated due to an imperfect
>> projection of variable length storage.
>> >
>> > On this last proposal, Wes has weighed in:
>> >
>> > "I believe your use case can be addressed by pre-allocating record
>> batches and maintaining application level metadata about what portion of
>> the record batches has been 'filled' (so the unfilled records can be
>> dropped by slicing). I don't think any change to the binary protocol is
>> warranted." [3]
>> >
>>
>> My personal opinion is that a solution to the problem you have can be
>> composed from the components (combined with some new pieces of code)
>> that we have developed in the project already.
>>
>> So the "application level" could be an add-on C++ component in the
>> Apache Arrow project. Call it a "memory-mapped streaming data
>> collector" that pre-allocates on-disk record batches (of only
>> fixed-size or even possibly dictionary-encoded types) and then fills
>> them incrementally as bits of data come in, updating some auxiliary
>> metadata that other processes can use to determine what portion of the
>> Arrow IPC messages to "slice off".
>>
>> > Concerns with positioning this at the app level:
>> >
>> > 1- Do we need to address or begin to address the overall concern of how
>> Arrow data structures are to be used in "true" (non-microbatch) streaming
>> environments, cf [1] in the last paragraph, as a *first-class* usage
>> pattern?  If so, is now the time?
>> >if you break that design invariant you don't have a columnar format
>> anymore.
>>
>> Arrow provides a binary protocol for describing a payload data on the
>> wire (or on-disk, or in-memory, all the same). I don't see how it is
>> in conflict with streaming environments, unless the streaming
>> application has difficulty collecting multiple records into an Arrow
>> record batches. In that case, it's a system trade-off. Currently
>> people are using Avro with Kafka and sending one record at a time, but
>> then they're also spending a lot of CPU cycles in serialization.
>>
>> > 2- If we can even make broad-stroke attempts at data structure features
>> that are likely to be useful when streaming becomes a first class citizen,
>> it reduces the chances of "breaking" format changes in the future.  I do
>> not believe the proposals place an undue hardship on batch processing
>> paradigms.  We are currently discussing making a breaking change to the IPC
>> format [4], so there is a window of opportunity to consider features useful
>> for streaming?  (Current clients can feel free to ignore the proposed
>> "utilized" metadata of RecordBatch.)
>> >
>>
>> I think the perception that streaming is not a first class citizen is
>> an editorialization (e.g. the article you cited was an editorial
>> written by an industry analyst based on an interview with Jacques and
>> me). Columnar data formats in general are designed to work with more
>> than one value at a time (which we are calling a "batch" but I think
>> that's conflating terminology with the "batch processing" paradigm of
>> Hadoop, etc.),
>>
>> > 3- Part of the promise of Arrow is that applications are not a world
>> unto themselves, but interoperate with other Arrow-compliant systems.  In
>> my case, I would like users to be able to examine RecordBatchs in tools
>> such as pyarrow without needing to be aware of any streaming app-specific
>> metadata.  For example, a researcher may pull in an IPC "File" containing N
>> RecordBatch messages corresponding to those in Fig 4.  I would very much
>> like for this casual user to not have to apply N slice operations based on
>> out-of-band data to get to the data that is relevant.
>> >
>>
>> Per above, should this become a standard enough use case, I think that
>> code can be developed in the Apache project to address it.
>>
>> > Devil's advocate:
>> >
>> > 1- Concurrent access to a mutable (growing) RecordBatch will require
>> synchronization of some sort to get consistent metadata reads.  Since the
>> above proposals do not specify how this synchronization will occur for
>> tools such as pyarrow (we can imagine a Python user getting synchronized
>> access to File metadata and mapping a read-only area before the writer is
>> allowed to continue "appending" to this batch, or batches to this File),
>> some "unusual" code will be required anyway, so what is the harm of
>> consulting side-band data for slicing all the batches as part of this
>> "unusual" code?  [Potential response: Yes, but it is still one less thing
>> to worry about, and perhaps first-class support for common synchronization
>> patterns can be forthcoming?  These patterns may not require further format
>> changes?]
>> >
>> > My overall concern is that I see a lot of wasted effort dealing with
>> the "impedance mismatch" between batch oriented and streaming systems.  I
>> believe that "best practices" will begin (and continue!) to prefer tools
>> that help bridge the gap.  Certainly this is the case in my own work.  I
>> agree with the appraisal at the end of the ZDNet article.  If the above is
>> not a helpful solution, what other steps can be made?  Or if Arrow is
>> intentionally confined to batch processing for the foreseeable future (in
>> terms of first-class support), I'm interested in the rationale.  Perhaps
>> the feeling is that we avoid scope creep now (which I understand can be
>> never-ending) even if it means a certain breaking change in the future?
>> >
>>
>> There's some semantic issues with what "streaming" and "batch" means.
>> When people see "streaming" nowadays they think "Kafka" (or
>> Kafka-like). Single events flow in and out of streaming computation
>> nodes (e.g. like https://apache.github.io/incubator-heron/ or others).
>> The "streaming" is more about computational semantics than data
>> representation.
>>
>> The Arrow columnar format fundamentally deals with multiple records at
>> a time (you can have a record batch with size 1, but that is not going
>> to be efficient). But I do not think Arrow is "intentially confined"
>> to batch processing. If it makes sense to use a columnar format to
>> represent data in a streaming application, then you can certainly use
>> it for that. I'm aware of people successfully using Arrow with Kafka,
>> for example.
>>
>> - Wes
>>
>> > Who else encounters the need to mix/match batch and streaming, and what
>> are your experiences?
>> >
>> > Thanks for the further consideration and discussion!
>> >
>> > [1] https://zd.net/2H0LlBY
>> > [2] https://arrow.apache.org/docs/python/data.html
>> > [3] https://bit.ly/2J5sENZ
>> > [4] https://bit.ly/2Yske8L
>>
>

Reply via email to