hi John,

On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <j...@jgm.org> wrote:
>
> During my time building financial analytics and trading systems (23 years!), 
> both the "batch processing" and "stream processing" paradigms have been 
> extensively used by myself and by colleagues.
>
> Unfortunately, the tools used in these paradigms have not successfully 
> overlapped.  For example, an analyst might use a Python notebook with pandas 
> to do some batch analysis.  Then, for acceptable latency and throughput, a 
> C++ programmer must implement the same schemas and processing logic in order 
> to analyze real-time data for real-time decision support.  (Time horizons 
> often being sub-second or even sub-millisecond for an acceptable reaction to 
> an event.  The most aggressive software-based systems, leaving custom 
> hardware aside other than things like kernel-bypass NICs, target 10s of 
> microseconds for a full round trip from data ingestion to decision.)
>
> As a result, TCO is more than doubled.  A doubling can be accounted for by 
> two implementations that share little or nothing in the way of architecture.  
> Then additional effort is required to ensure that these implementations 
> continue to behave the same way and are upgraded in lock-step.
>
> Arrow purports to be a "bridge" technology that eases one of the pain points 
> of working in different ecosystems by providing a common event stream data 
> structure.  (Discussion of common processing techniques is beyond the scope 
> of this discussion.  Suffice it to say that a streaming algo can always be 
> run in batch, but not vice versa.)
>
> Arrow seems to be growing up primarily in the batch processing world.  One 
> publication notes that "the missing piece is streaming, where the velocity of 
> incoming data poses a special challenge. There are some early experiments to 
> populate Arrow nodes in microbatches..." [1]  Part our our discussion could 
> be a response to this observation.  In what ways is it true or false?  What 
> are the plans to remedy this shortcoming, if it exists?  What steps can be 
> taken now to ease the transition to low-latency streaming support in the 
> future?
>

Arrow columnar format describes a collection of records with values
between records being placed adjacent to each other in memory. If you
break that assumption, you don't have a columnar format anymore. So I
don't where the "shortcoming" is. We don't have any software in the
project for managing the creation of record batches in a streaming
application, but this seems like an interesting development expansion
area for the project.

Note that many contributors have already expanded the surface area of
what's in the Arrow libraries in many directions.

Streaming data collection is yet another area of expansion, but
_personally_ it is not on the short list of projects that I will
personally be working on (or asking my direct or indirect colleagues
to work on). Since this is a project made up of volunteers, it's up to
contributors to drive new directions for the project by writing design
documents and pull requests.

> In my own experience, a successful strategy for stream processing where 
> context (i.e. recent past events) must be considered by calculations is to 
> pre-allocate memory for event collection, to organize this memory in a 
> columnar layout, and to run incremental calculations at each event ingress 
> into the partially populated memory.  [Fig 1]  When the pre-allocated memory 
> has been exhausted, allocate a new batch of column-wise memory and continue.  
> When a batch is no longer pertinent to the calculation look-back window, free 
> the memory back to the heap or pool.
>
> Here we run into the first philosophical barrier with Arrow, where "Arrow 
> data is immutable." [2]  There is currently little or no consideration for 
> reading a partially constructed RecordBatch, e.g. one with only some of the 
> rows containing event data at the present moment in time.
>

It seems like the use case you have heavily revolves around mutating
pre-allocated, memory-mapped datasets that are being consumed by other
processes on the same host. So you want to incrementally fill some
memory-mapped data that you've already exposed to another process.

Because of the memory layout for variable-size and nested cells, it is
impossible in general to mutate Arrow record batches. This is not a
philosophical position: this was a deliberate technical decision to
guarantee data locality for scans and predictable O(1) random access
on variable-length and nested data.

Technically speaking, you can mutate memory in-place for fixed-size
types in-RAM or on-disk, if you want to. It's an "off-label" use case
but no one is saying you can't do this.

> Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated 
> records of a RecordBatch instead of to all records?
>

Per above, this is impossible in generality. You can't alter
variable-length or nested records without rewriting the record batch.

> As an alternative approach, RecordBatch can be used as a single Record (batch 
> length of one).  [Fig 2]  In this approach the benefit of the columnar layout 
> is lost for look-back window processing.
>
> Another alternative approach is to collect an entire RecordBatch before 
> stepping through it with the stream processing calculation. [Fig 3]  With 
> this approach some columnar processing benefit can be recovered, however 
> artificial latency is introduced.  As tolerance for delays in decision 
> support dwindles, this model will be of increasingly limited value.  It is 
> already unworkable in many areas of finance.
>
> When considering the Arrow format and variable length values such as strings, 
> the pre-allocation approach (and subsequent processing of a partially 
> populated batch) encounters a hiccup.  How do we know the amount of buffer 
> space to pre-allocate?  If we allocate too much buffer for variable-length 
> data, some of it will be unused.  If we allocate too little buffer for 
> variable-length data, some row entities will be unusable.  (Additional "rows" 
> remain but when populating string fields there is no longer string storage 
> space to point them to.)
>
> As with many optimization space/time tradeoff problems, the solution seems to 
> be to guess.  Pre-allocation sets aside variable length buffer storage based 
> on the typical "expected size" of the variable length data.  This can result 
> in some unused rows, as discussed above.  [Fig 4]  In fact it will 
> necessarily result in one unused row unless the last of each variable length 
> field in the last row exactly fits into the remaining space in the variable 
> length data buffer.  Consider the case where there is more variable length 
> buffer space than data:
>
> Given variable-length field x, last row index of y, variable length buffer v, 
> beginning offset into v of o:
>     x[y] begins at o
>     x[y] ends at the offset of the next record, there is no next record, so 
> x[y] ends after the total remaining area in variable length buffer... 
> however, this is too much!
>

It isn't clear to me what you're proposing. It sounds like you want a
major redesign of the columnar format to permit in-place mutation of
strings. I doubt that would be possible at this point.

> Proposal 2: [low priority] Create an "expected length" statistic in the 
> Schema for variable length fields?
>
> Proposal 3: [low priority] Create metadata to store the index into 
> variable-length data that represents the end of the value for the last 
> record?  Alternatively: a row is "wasted," however pre-allocation is inexact 
> to begin with.
>
> Proposal 4: Add metadata to indicate to a RecordBatch reader that only some 
> of the rows are to be utilized.  [Fig 5]  This is useful not only when 
> processing a batch that is still under construction, but also for "closed" 
> batches that were not able to be fully populated due to an imperfect 
> projection of variable length storage.
>
> On this last proposal, Wes has weighed in:
>
> "I believe your use case can be addressed by pre-allocating record batches 
> and maintaining application level metadata about what portion of the record 
> batches has been 'filled' (so the unfilled records can be dropped by 
> slicing). I don't think any change to the binary protocol is warranted." [3]
>

My personal opinion is that a solution to the problem you have can be
composed from the components (combined with some new pieces of code)
that we have developed in the project already.

So the "application level" could be an add-on C++ component in the
Apache Arrow project. Call it a "memory-mapped streaming data
collector" that pre-allocates on-disk record batches (of only
fixed-size or even possibly dictionary-encoded types) and then fills
them incrementally as bits of data come in, updating some auxiliary
metadata that other processes can use to determine what portion of the
Arrow IPC messages to "slice off".

> Concerns with positioning this at the app level:
>
> 1- Do we need to address or begin to address the overall concern of how Arrow 
> data structures are to be used in "true" (non-microbatch) streaming 
> environments, cf [1] in the last paragraph, as a *first-class* usage pattern? 
>  If so, is now the time?
>if you break that design invariant you don't have a columnar format anymore.

Arrow provides a binary protocol for describing a payload data on the
wire (or on-disk, or in-memory, all the same). I don't see how it is
in conflict with streaming environments, unless the streaming
application has difficulty collecting multiple records into an Arrow
record batches. In that case, it's a system trade-off. Currently
people are using Avro with Kafka and sending one record at a time, but
then they're also spending a lot of CPU cycles in serialization.

> 2- If we can even make broad-stroke attempts at data structure features that 
> are likely to be useful when streaming becomes a first class citizen, it 
> reduces the chances of "breaking" format changes in the future.  I do not 
> believe the proposals place an undue hardship on batch processing paradigms.  
> We are currently discussing making a breaking change to the IPC format [4], 
> so there is a window of opportunity to consider features useful for 
> streaming?  (Current clients can feel free to ignore the proposed "utilized" 
> metadata of RecordBatch.)
>

I think the perception that streaming is not a first class citizen is
an editorialization (e.g. the article you cited was an editorial
written by an industry analyst based on an interview with Jacques and
me). Columnar data formats in general are designed to work with more
than one value at a time (which we are calling a "batch" but I think
that's conflating terminology with the "batch processing" paradigm of
Hadoop, etc.),

> 3- Part of the promise of Arrow is that applications are not a world unto 
> themselves, but interoperate with other Arrow-compliant systems.  In my case, 
> I would like users to be able to examine RecordBatchs in tools such as 
> pyarrow without needing to be aware of any streaming app-specific metadata.  
> For example, a researcher may pull in an IPC "File" containing N RecordBatch 
> messages corresponding to those in Fig 4.  I would very much like for this 
> casual user to not have to apply N slice operations based on out-of-band data 
> to get to the data that is relevant.
>

Per above, should this become a standard enough use case, I think that
code can be developed in the Apache project to address it.

> Devil's advocate:
>
> 1- Concurrent access to a mutable (growing) RecordBatch will require 
> synchronization of some sort to get consistent metadata reads.  Since the 
> above proposals do not specify how this synchronization will occur for tools 
> such as pyarrow (we can imagine a Python user getting synchronized access to 
> File metadata and mapping a read-only area before the writer is allowed to 
> continue "appending" to this batch, or batches to this File), some "unusual" 
> code will be required anyway, so what is the harm of consulting side-band 
> data for slicing all the batches as part of this "unusual" code?  [Potential 
> response: Yes, but it is still one less thing to worry about, and perhaps 
> first-class support for common synchronization patterns can be forthcoming?  
> These patterns may not require further format changes?]
>
> My overall concern is that I see a lot of wasted effort dealing with the 
> "impedance mismatch" between batch oriented and streaming systems.  I believe 
> that "best practices" will begin (and continue!) to prefer tools that help 
> bridge the gap.  Certainly this is the case in my own work.  I agree with the 
> appraisal at the end of the ZDNet article.  If the above is not a helpful 
> solution, what other steps can be made?  Or if Arrow is intentionally 
> confined to batch processing for the foreseeable future (in terms of 
> first-class support), I'm interested in the rationale.  Perhaps the feeling 
> is that we avoid scope creep now (which I understand can be never-ending) 
> even if it means a certain breaking change in the future?
>

There's some semantic issues with what "streaming" and "batch" means.
When people see "streaming" nowadays they think "Kafka" (or
Kafka-like). Single events flow in and out of streaming computation
nodes (e.g. like https://apache.github.io/incubator-heron/ or others).
The "streaming" is more about computational semantics than data
representation.

The Arrow columnar format fundamentally deals with multiple records at
a time (you can have a record batch with size 1, but that is not going
to be efficient). But I do not think Arrow is "intentially confined"
to batch processing. If it makes sense to use a columnar format to
represent data in a streaming application, then you can certainly use
it for that. I'm aware of people successfully using Arrow with Kafka,
for example.

- Wes

> Who else encounters the need to mix/match batch and streaming, and what are 
> your experiences?
>
> Thanks for the further consideration and discussion!
>
> [1] https://zd.net/2H0LlBY
> [2] https://arrow.apache.org/docs/python/data.html
> [3] https://bit.ly/2J5sENZ
> [4] https://bit.ly/2Yske8L

Reply via email to