On Wed, Oct 16, 2019 at 12:32 PM John Muehlhausen <j...@jgm.org> wrote:
>
> I really need to "get into the zone" on some other development today, but I
> want to remind us of something earlier in the thread that gave me the
> impression I wasn't stomping on too many paradigms with this proposal:
>
> Wes: ``So the "length" field in RecordBatch is already the utilized number
> of rows. The body buffers can certainly have excess unused space. So
> your application can mutate Flatbuffer "length" field in-place as
> new records are filled in.''

I'm objecting to RecordBatch.length being inconsistent with the
constituent field lengths, that's where the danger lies. If all of the
lengths are consistent, no code changes are necessary.

> If RecordBatch.length is the utilized number of rows then my PR makes this
> actually true.  Yes, we need it in a handful of implementations.  I'm
> willing to provide all of them.  To me that is the lowest complexity
> solution.
>
> -John
>
> On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney <wesmck...@gmail.com> wrote:
>
> > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen <j...@jgm.org> wrote:
> > >
> > > "pyarrow is intended as a developer-facing library, not a user-facing
> > one"
> > >
> > > Is that really the core issue?  I doubt you would want to add this
> > proposed
> > > logic to pandas even though it is user-facing, because then pandas will
> > > either have to re-implement what it means to read a batch (to respect
> > > length when it is smaller than array length) or else rely on the single
> > > blessed custom metadata for doing this, which doesn't make it custom
> > > anymore.
> >
> > What you have proposed in your PR amounts to an alteration of the IPC
> > format to suit this use case. This pushes complexity onto _every_
> > implementation that will need to worry about a "truncated" record
> > batch. I'd rather avoid this unless it is truly the only way.
> >
> > Note that we serialize a significant amount of custom metadata already
> > to address pandas-specific issues, and have not had to make any
> > changes to the columnar format as a result.
> >
> > > I think really your concern is that perhaps nobody wants this but me,
> > > therefore it should not be in arrow or pandas regardless of whether it is
> > > user-facing?  But, if that is your thinking, is it true?  What is our
> > > solution to the locality/latency problem for systems that ingest and
> > > process concurrently, if not this solution?  I do see it as a general
> > > problem that needs at least the beginnings of a general solution... not a
> > > "custom" one.
> >
> > We use the custom_metadata fields to implement a number of built-in
> > things in the project, such as extension types. If enough people find
> > this useful, then it can be promoted to a formalized concept. As far
> > as I can tell, you have developed quite a bit of custom code related
> > to this for your application, including manipulating Flatbuffers
> > metadata in place to maintain the populated length, so the barrier to
> > entry to being able to properly take advantage of this is rather high.
> >
> > > Also, I wonder whether it is true that pyarrow avoids smart/magical
> > > things.  The entire concept of a "Table" seems to be in that category?
> > The
> > > docs specifically mention that it is for convenience.
> > >
> >
> > Table arose out of legitimate developer need. There are a number of
> > areas of the project that would be much more difficult if we had to
> > worry about regularizing column chunking at any call site that returns
> > an in-memory dataset.
> >
> > > I'd like to focus on two questions:
> > > 1- What is the Arrow general solution to the locality/latency tradeoff
> > > problem for systems that ingest and process data concurrently?  This
> > > proposed solution or something else?  Or if we propose not to address the
> > > problem, why?
> > > 2- What will the proposed change negatively impact?  It seems that all we
> > > are talking about is respecting batch length if arrays happen to be
> > longer.
> >
> > I'm suggesting to help you solve the post-read truncation problem
> > without modifying the IPC protocol. If you want to make things work
> > for the users without knowledge, I think this can be achieved through
> > a plug-in API to define a metadata handler-callback to apply the
> > truncation to the record batches.
> >
> > > Thanks,
> > > -John
> > >
> > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney <wesmck...@gmail.com>
> > wrote:
> > >
> > > > hi John,
> > > >
> > > > > As a practical matter, the reason metadata is not a good solution
> > for me
> > > > is that it requires awareness on the part of the reader.  I want
> > (e.g.) a
> > > > researcher in Python to be able to map a file of batches in IPC format
> > > > without needing to worry about the fact that the file was built in a
> > > > streaming fashion and therefore has some unused array elements.
> > > >
> > > > I don't find this argument to be persuasive.
> > > >
> > > > pyarrow is intended as a developer-facing library, not a user-facing
> > > > one. I don't think you should be having the kinds of users you are
> > > > describing using pyarrow directly, instead consuming the library
> > > > through a layer above it. Specifically, we are deliberately avoiding
> > > > doing anything too "smart" or "magical", instead maintaining tight
> > > > developer control over what is going on.
> > > >
> > > > - Wes
> > > >
> > > > On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield <emkornfi...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > Still thinking through the implications here, but to save others from
> > > > > having to go search [1] is the PR.
> > > > >
> > > > > [1] https://github.com/apache/arrow/pull/5663/files
> > > > >
> > > > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen <j...@jgm.org>
> > wrote:
> > > > >
> > > > > > A proposal with linked PR now exists in ARROW-5916 and Wes
> > commented
> > > > that
> > > > > > we should kick it around some more.
> > > > > >
> > > > > > The high-level topic is how Apache Arrow intersects with streaming
> > > > > > methodologies:
> > > > > >
> > > > > > If record batches are strictly immutable, a difficult trade-off is
> > > > created
> > > > > > for streaming data collection: either I can have low-latency
> > > > presentation
> > > > > > of new data by appending very small batches (often 1 row) to the
> > IPC
> > > > stream
> > > > > > and lose columnar layout benefits, or I can have high-latency
> > > > presentation
> > > > > > of new data by waiting to append a batch until it is large enough
> > to
> > > > gain
> > > > > > significant columnar layout benefits.  During this waiting period
> > the
> > > > new
> > > > > > data is unavailable to processing.
> > > > > >
> > > > > > If, on the other hand, [0,length) of a batch is immutable but
> > length
> > > > may
> > > > > > increase, the trade-off is eliminated: I can pre-allocate a batch
> > and
> > > > > > populate records in it when they occur (without waiting), and also
> > gain
> > > > > > columnar benefits as each "closed" batch will be large.  (A batch
> > may
> > > > be
> > > > > > practically "closed" before the arrays are full when the
> > projection of
> > > > > > variable-length buffer space is wrong... a space/time tradeoff in
> > > > favor of
> > > > > > time.)
> > > > > >
> > > > > > Looking ahead to a day when the reference implementation(s) will be
> > > > able to
> > > > > > bump RecordBatch.length while populating pre-allocated records
> > > > > > in-place, ARROW-5916 reads such batches by ignoring portions of
> > arrays
> > > > that
> > > > > > are beyond RecordBatch.length.
> > > > > >
> > > > > > If we are not looking ahead to such a day, the discussion is about
> > the
> > > > > > alternative way that Arrow will avoid the latency/locality tradeoff
> > > > > > inherent in streaming data collection.  Or, if the answer is
> > "streaming
> > > > > > apps are and will always be out of scope", that idea needs to be
> > > > defended
> > > > > > from the observation that practitioners are moving more towards the
> > > > fusion
> > > > > > of batch and streaming, not away from it.
> > > > > >
> > > > > > As a practical matter, the reason metadata is not a good solution
> > for
> > > > me is
> > > > > > that it requires awareness on the part of the reader.  I want
> > (e.g.) a
> > > > > > researcher in Python to be able to map a file of batches in IPC
> > format
> > > > > > without needing to worry about the fact that the file was built in
> > a
> > > > > > streaming fashion and therefore has some unused array elements.
> > > > > >
> > > > > > The change itself seems relatively simple.  What negative
> > consequences
> > > > do
> > > > > > we anticipate, if any?
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen <j...@jgm.org>
> > wrote:
> > > > > >
> > > > > > > This seems to help... still testing it though.
> > > > > > >
> > > > > > >   Status GetFieldMetadata(int field_index, ArrayData* out) {
> > > > > > >     auto nodes = metadata_->nodes();
> > > > > > >     // pop off a field
> > > > > > >     if (field_index >= static_cast<int>(nodes->size())) {
> > > > > > >       return Status::Invalid("Ran out of field metadata, likely
> > > > > > > malformed");
> > > > > > >     }
> > > > > > >     const flatbuf::FieldNode* node = nodes->Get(field_index);
> > > > > > >
> > > > > > > *    //out->length = node->length();*
> > > > > > > *    out->length = metadata_->length();*
> > > > > > >     out->null_count = node->null_count();
> > > > > > >     out->offset = 0;
> > > > > > >     return Status::OK();
> > > > > > >   }
> > > > > > >
> > > > > > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen <j...@jgm.org>
> > > > wrote:
> > > > > > >
> > > > > > >> So far it seems as if pyarrow is completely ignoring the
> > > > > > >> RecordBatch.length field.  More info to follow...
> > > > > > >>
> > > > > > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen <j...@jgm.org>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Crikey! I'll do some testing around that and suggest some test
> > > > cases to
> > > > > > >>> ensure it continues to work, assuming that it does.
> > > > > > >>>
> > > > > > >>> -John
> > > > > > >>>
> > > > > > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney <
> > wesmck...@gmail.com>
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Thanks for the attachment, it's helpful.
> > > > > > >>>>
> > > > > > >>>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen <j...@jgm.org>
> > > > wrote:
> > > > > > >>>> >
> > > > > > >>>> > Attachments referred to in previous two messages:
> > > > > > >>>> >
> > > > > > >>>>
> > > > > >
> > > >
> > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
> > > > > > >>>> >
> > > > > > >>>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen <
> > j...@jgm.org>
> > > > > > wrote:
> > > > > > >>>> >
> > > > > > >>>> > > Thanks, Wes, for the thoughtful reply.  I really
> > appreciate
> > > > the
> > > > > > >>>> > > engagement.  In order to clarify things a bit, I am
> > attaching
> > > > a
> > > > > > >>>> graphic of
> > > > > > >>>> > > how our application will take record-wise (row-oriented)
> > data
> > > > from
> > > > > > >>>> an event
> > > > > > >>>> > > source and incrementally populate a pre-allocated
> > > > Arrow-compatible
> > > > > > >>>> buffer,
> > > > > > >>>> > > including for variable-length fields.  (Obviously at this
> > > > stage I
> > > > > > >>>> am not
> > > > > > >>>> > > using the reference implementation Arrow code, although
> > that
> > > > would
> > > > > > >>>> be a
> > > > > > >>>> > > goal.... to contribute that back to the project.)
> > > > > > >>>> > >
> > > > > > >>>> > > For sake of simplicity these are non-nullable fields.  As
> > a
> > > > > > result a
> > > > > > >>>> > > reader of "y" that has no knowledge of the "utilized"
> > metadata
> > > > > > >>>> would get a
> > > > > > >>>> > > long string (zeros, spaces, uninitialized, or whatever we
> > > > decide
> > > > > > >>>> for the
> > > > > > >>>> > > pre-allocation model) for the record just beyond the last
> > > > utilized
> > > > > > >>>> record.
> > > > > > >>>> > >
> > > > > > >>>> > > I don't see any "big O"-analysis problems with this
> > > > approach.  The
> > > > > > >>>> > > space/time tradeoff is that we have to guess how much
> > room to
> > > > > > >>>> allocate for
> > > > > > >>>> > > variable-length fields.  We will probably almost always be
> > > > wrong.
> > > > > > >>>> This
> > > > > > >>>> > > ends up in "wasted" space.  However, we can do
> > calculations
> > > > based
> > > > > > >>>> on these
> > > > > > >>>> > > partially filled batches that take full advantage of the
> > > > columnar
> > > > > > >>>> layout.
> > > > > > >>>> > >  (Here I've shown the case where we had too little
> > > > variable-length
> > > > > > >>>> buffer
> > > > > > >>>> > > set aside, resulting in "wasted" rows.  The flip side is
> > that
> > > > rows
> > > > > > >>>> achieve
> > > > > > >>>> > > full [1] utilization but there is wasted variable-length
> > > > buffer if
> > > > > > >>>> we guess
> > > > > > >>>> > > incorrectly in the other direction.)
> > > > > > >>>> > >
> > > > > > >>>> > > I proposed a few things that are "nice to have" but really
> > > > what
> > > > > > I'm
> > > > > > >>>> eyeing
> > > > > > >>>> > > is the ability for a reader-- any reader (e.g. pyarrow)--
> > to
> > > > see
> > > > > > >>>> that some
> > > > > > >>>> > > of the rows in a RecordBatch are not to be read, based on
> > the
> > > > new
> > > > > > >>>> > > "utilized" (or whatever name) metadata.  That single
> > tweak to
> > > > the
> > > > > > >>>> > > metadata-- and readers honoring it-- is the core of the
> > > > proposal.
> > > > > > >>>> > >  (Proposal 4.)  This would indicate that the attached
> > example
> > > > (or
> > > > > > >>>> something
> > > > > > >>>> > > similar) is the blessed approach for those seeking to
> > > > accumulate
> > > > > > >>>> events and
> > > > > > >>>> > > process them while still expecting more data, with the
> > > > > > >>>> heavier-weight task
> > > > > > >>>> > > of creating a new pre-allocated batch being a rare
> > occurrence.
> > > > > > >>>> > >
> > > > > > >>>>
> > > > > > >>>> So the "length" field in RecordBatch is already the utilized
> > > > number of
> > > > > > >>>> rows. The body buffers can certainly have excess unused
> > space. So
> > > > your
> > > > > > >>>> application can mutate Flatbuffer "length" field in-place as
> > new
> > > > > > >>>> records are filled in.
> > > > > > >>>>
> > > > > > >>>> > > Notice that the mutability is only in the sense of
> > > > "appending."
> > > > > > The
> > > > > > >>>> > > current doctrine of total immutability would be revised to
> > > > refer
> > > > > > to
> > > > > > >>>> the
> > > > > > >>>> > > immutability of only the already-populated rows.
> > > > > > >>>> > >
> > > > > > >>>> > > It gives folks an option other than choosing the lesser
> > of two
> > > > > > >>>> evils: on
> > > > > > >>>> > > the one hand, length 1 RecordBatches that don't result in
> > a
> > > > stream
> > > > > > >>>> that is
> > > > > > >>>> > > computationally efficient.  On the other hand, adding
> > > > artificial
> > > > > > >>>> latency by
> > > > > > >>>> > > accumulating events before "freezing" a larger batch and
> > only
> > > > then
> > > > > > >>>> making
> > > > > > >>>> > > it available to computation.
> > > > > > >>>> > >
> > > > > > >>>> > > -John
> > > > > > >>>> > >
> > > > > > >>>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney <
> > > > wesmck...@gmail.com
> > > > > > >
> > > > > > >>>> wrote:
> > > > > > >>>> > >
> > > > > > >>>> > >> hi John,
> > > > > > >>>> > >>
> > > > > > >>>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen <
> > > > j...@jgm.org>
> > > > > > >>>> wrote:
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > During my time building financial analytics and trading
> > > > systems
> > > > > > >>>> (23
> > > > > > >>>> > >> years!), both the "batch processing" and "stream
> > processing"
> > > > > > >>>> paradigms have
> > > > > > >>>> > >> been extensively used by myself and by colleagues.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Unfortunately, the tools used in these paradigms have
> > not
> > > > > > >>>> successfully
> > > > > > >>>> > >> overlapped.  For example, an analyst might use a Python
> > > > notebook
> > > > > > >>>> with
> > > > > > >>>> > >> pandas to do some batch analysis.  Then, for acceptable
> > > > latency
> > > > > > and
> > > > > > >>>> > >> throughput, a C++ programmer must implement the same
> > schemas
> > > > and
> > > > > > >>>> processing
> > > > > > >>>> > >> logic in order to analyze real-time data for real-time
> > > > decision
> > > > > > >>>> support.
> > > > > > >>>> > >> (Time horizons often being sub-second or even
> > > > sub-millisecond for
> > > > > > >>>> an
> > > > > > >>>> > >> acceptable reaction to an event.  The most aggressive
> > > > > > >>>> software-based
> > > > > > >>>> > >> systems, leaving custom hardware aside other than things
> > like
> > > > > > >>>> kernel-bypass
> > > > > > >>>> > >> NICs, target 10s of microseconds for a full round trip
> > from
> > > > data
> > > > > > >>>> ingestion
> > > > > > >>>> > >> to decision.)
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > As a result, TCO is more than doubled.  A doubling can
> > be
> > > > > > >>>> accounted for
> > > > > > >>>> > >> by two implementations that share little or nothing in
> > the
> > > > way of
> > > > > > >>>> > >> architecture.  Then additional effort is required to
> > ensure
> > > > that
> > > > > > >>>> these
> > > > > > >>>> > >> implementations continue to behave the same way and are
> > > > upgraded
> > > > > > in
> > > > > > >>>> > >> lock-step.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Arrow purports to be a "bridge" technology that eases
> > one
> > > > of
> > > > > > the
> > > > > > >>>> pain
> > > > > > >>>> > >> points of working in different ecosystems by providing a
> > > > common
> > > > > > >>>> event
> > > > > > >>>> > >> stream data structure.  (Discussion of common processing
> > > > > > >>>> techniques is
> > > > > > >>>> > >> beyond the scope of this discussion.  Suffice it to say
> > that
> > > > a
> > > > > > >>>> streaming
> > > > > > >>>> > >> algo can always be run in batch, but not vice versa.)
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Arrow seems to be growing up primarily in the batch
> > > > processing
> > > > > > >>>> world.
> > > > > > >>>> > >> One publication notes that "the missing piece is
> > streaming,
> > > > where
> > > > > > >>>> the
> > > > > > >>>> > >> velocity of incoming data poses a special challenge.
> > There
> > > > are
> > > > > > >>>> some early
> > > > > > >>>> > >> experiments to populate Arrow nodes in microbatches..."
> > [1]
> > > > Part
> > > > > > >>>> our our
> > > > > > >>>> > >> discussion could be a response to this observation.  In
> > what
> > > > ways
> > > > > > >>>> is it
> > > > > > >>>> > >> true or false?  What are the plans to remedy this
> > > > shortcoming, if
> > > > > > >>>> it
> > > > > > >>>> > >> exists?  What steps can be taken now to ease the
> > transition
> > > > to
> > > > > > >>>> low-latency
> > > > > > >>>> > >> streaming support in the future?
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> Arrow columnar format describes a collection of records
> > with
> > > > > > values
> > > > > > >>>> > >> between records being placed adjacent to each other in
> > > > memory. If
> > > > > > >>>> you
> > > > > > >>>> > >> break that assumption, you don't have a columnar format
> > > > anymore.
> > > > > > >>>> So I
> > > > > > >>>> > >> don't where the "shortcoming" is. We don't have any
> > software
> > > > in
> > > > > > the
> > > > > > >>>> > >> project for managing the creation of record batches in a
> > > > > > streaming
> > > > > > >>>> > >> application, but this seems like an interesting
> > development
> > > > > > >>>> expansion
> > > > > > >>>> > >> area for the project.
> > > > > > >>>> > >>
> > > > > > >>>> > >> Note that many contributors have already expanded the
> > surface
> > > > > > area
> > > > > > >>>> of
> > > > > > >>>> > >> what's in the Arrow libraries in many directions.
> > > > > > >>>> > >>
> > > > > > >>>> > >> Streaming data collection is yet another area of
> > expansion,
> > > > but
> > > > > > >>>> > >> _personally_ it is not on the short list of projects
> > that I
> > > > will
> > > > > > >>>> > >> personally be working on (or asking my direct or indirect
> > > > > > >>>> colleagues
> > > > > > >>>> > >> to work on). Since this is a project made up of
> > volunteers,
> > > > it's
> > > > > > >>>> up to
> > > > > > >>>> > >> contributors to drive new directions for the project by
> > > > writing
> > > > > > >>>> design
> > > > > > >>>> > >> documents and pull requests.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > In my own experience, a successful strategy for stream
> > > > > > >>>> processing where
> > > > > > >>>> > >> context (i.e. recent past events) must be considered by
> > > > > > >>>> calculations is to
> > > > > > >>>> > >> pre-allocate memory for event collection, to organize
> > this
> > > > memory
> > > > > > >>>> in a
> > > > > > >>>> > >> columnar layout, and to run incremental calculations at
> > each
> > > > > > event
> > > > > > >>>> ingress
> > > > > > >>>> > >> into the partially populated memory.  [Fig 1]  When the
> > > > > > >>>> pre-allocated
> > > > > > >>>> > >> memory has been exhausted, allocate a new batch of
> > > > column-wise
> > > > > > >>>> memory and
> > > > > > >>>> > >> continue.  When a batch is no longer pertinent to the
> > > > calculation
> > > > > > >>>> look-back
> > > > > > >>>> > >> window, free the memory back to the heap or pool.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Here we run into the first philosophical barrier with
> > > > Arrow,
> > > > > > >>>> where
> > > > > > >>>> > >> "Arrow data is immutable." [2]  There is currently
> > little or
> > > > no
> > > > > > >>>> > >> consideration for reading a partially constructed
> > > > RecordBatch,
> > > > > > >>>> e.g. one
> > > > > > >>>> > >> with only some of the rows containing event data at the
> > > > present
> > > > > > >>>> moment in
> > > > > > >>>> > >> time.
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> It seems like the use case you have heavily revolves
> > around
> > > > > > >>>> mutating
> > > > > > >>>> > >> pre-allocated, memory-mapped datasets that are being
> > > > consumed by
> > > > > > >>>> other
> > > > > > >>>> > >> processes on the same host. So you want to incrementally
> > fill
> > > > > > some
> > > > > > >>>> > >> memory-mapped data that you've already exposed to another
> > > > > > process.
> > > > > > >>>> > >>
> > > > > > >>>> > >> Because of the memory layout for variable-size and nested
> > > > cells,
> > > > > > >>>> it is
> > > > > > >>>> > >> impossible in general to mutate Arrow record batches.
> > This is
> > > > > > not a
> > > > > > >>>> > >> philosophical position: this was a deliberate technical
> > > > decision
> > > > > > to
> > > > > > >>>> > >> guarantee data locality for scans and predictable O(1)
> > random
> > > > > > >>>> access
> > > > > > >>>> > >> on variable-length and nested data.
> > > > > > >>>> > >>
> > > > > > >>>> > >> Technically speaking, you can mutate memory in-place for
> > > > > > fixed-size
> > > > > > >>>> > >> types in-RAM or on-disk, if you want to. It's an
> > "off-label"
> > > > use
> > > > > > >>>> case
> > > > > > >>>> > >> but no one is saying you can't do this.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > Proposal 1: Shift the Arrow "immutability" doctrine to
> > > > apply to
> > > > > > >>>> > >> populated records of a RecordBatch instead of to all
> > records?
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> Per above, this is impossible in generality. You can't
> > alter
> > > > > > >>>> > >> variable-length or nested records without rewriting the
> > > > record
> > > > > > >>>> batch.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > As an alternative approach, RecordBatch can be used as
> > a
> > > > single
> > > > > > >>>> Record
> > > > > > >>>> > >> (batch length of one).  [Fig 2]  In this approach the
> > > > benefit of
> > > > > > >>>> the
> > > > > > >>>> > >> columnar layout is lost for look-back window processing.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Another alternative approach is to collect an entire
> > > > > > RecordBatch
> > > > > > >>>> before
> > > > > > >>>> > >> stepping through it with the stream processing
> > calculation.
> > > > [Fig
> > > > > > >>>> 3]  With
> > > > > > >>>> > >> this approach some columnar processing benefit can be
> > > > recovered,
> > > > > > >>>> however
> > > > > > >>>> > >> artificial latency is introduced.  As tolerance for
> > delays in
> > > > > > >>>> decision
> > > > > > >>>> > >> support dwindles, this model will be of increasingly
> > limited
> > > > > > >>>> value.  It is
> > > > > > >>>> > >> already unworkable in many areas of finance.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > When considering the Arrow format and variable length
> > > > values
> > > > > > >>>> such as
> > > > > > >>>> > >> strings, the pre-allocation approach (and subsequent
> > > > processing
> > > > > > of
> > > > > > >>>> a
> > > > > > >>>> > >> partially populated batch) encounters a hiccup.  How do
> > we
> > > > know
> > > > > > >>>> the amount
> > > > > > >>>> > >> of buffer space to pre-allocate?  If we allocate too much
> > > > buffer
> > > > > > >>>> for
> > > > > > >>>> > >> variable-length data, some of it will be unused.  If we
> > > > allocate
> > > > > > >>>> too little
> > > > > > >>>> > >> buffer for variable-length data, some row entities will
> > be
> > > > > > >>>> unusable.
> > > > > > >>>> > >> (Additional "rows" remain but when populating string
> > fields
> > > > there
> > > > > > >>>> is no
> > > > > > >>>> > >> longer string storage space to point them to.)
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > As with many optimization space/time tradeoff
> > problems, the
> > > > > > >>>> solution
> > > > > > >>>> > >> seems to be to guess.  Pre-allocation sets aside variable
> > > > length
> > > > > > >>>> buffer
> > > > > > >>>> > >> storage based on the typical "expected size" of the
> > variable
> > > > > > >>>> length data.
> > > > > > >>>> > >> This can result in some unused rows, as discussed above.
> > > > [Fig 4]
> > > > > > >>>> In fact
> > > > > > >>>> > >> it will necessarily result in one unused row unless the
> > last
> > > > of
> > > > > > >>>> each
> > > > > > >>>> > >> variable length field in the last row exactly fits into
> > the
> > > > > > >>>> remaining space
> > > > > > >>>> > >> in the variable length data buffer.  Consider the case
> > where
> > > > > > there
> > > > > > >>>> is more
> > > > > > >>>> > >> variable length buffer space than data:
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Given variable-length field x, last row index of y,
> > > > variable
> > > > > > >>>> length
> > > > > > >>>> > >> buffer v, beginning offset into v of o:
> > > > > > >>>> > >> >     x[y] begins at o
> > > > > > >>>> > >> >     x[y] ends at the offset of the next record, there
> > is no
> > > > > > next
> > > > > > >>>> > >> record, so x[y] ends after the total remaining area in
> > > > variable
> > > > > > >>>> length
> > > > > > >>>> > >> buffer... however, this is too much!
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> It isn't clear to me what you're proposing. It sounds
> > like
> > > > you
> > > > > > >>>> want a
> > > > > > >>>> > >> major redesign of the columnar format to permit in-place
> > > > mutation
> > > > > > >>>> of
> > > > > > >>>> > >> strings. I doubt that would be possible at this point.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > Proposal 2: [low priority] Create an "expected length"
> > > > > > statistic
> > > > > > >>>> in the
> > > > > > >>>> > >> Schema for variable length fields?
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Proposal 3: [low priority] Create metadata to store the
> > > > index
> > > > > > >>>> into
> > > > > > >>>> > >> variable-length data that represents the end of the value
> > > > for the
> > > > > > >>>> last
> > > > > > >>>> > >> record?  Alternatively: a row is "wasted," however
> > > > pre-allocation
> > > > > > >>>> is
> > > > > > >>>> > >> inexact to begin with.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Proposal 4: Add metadata to indicate to a RecordBatch
> > > > reader
> > > > > > >>>> that only
> > > > > > >>>> > >> some of the rows are to be utilized.  [Fig 5]  This is
> > > > useful not
> > > > > > >>>> only when
> > > > > > >>>> > >> processing a batch that is still under construction, but
> > > > also for
> > > > > > >>>> "closed"
> > > > > > >>>> > >> batches that were not able to be fully populated due to
> > an
> > > > > > >>>> imperfect
> > > > > > >>>> > >> projection of variable length storage.
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > On this last proposal, Wes has weighed in:
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > "I believe your use case can be addressed by
> > pre-allocating
> > > > > > >>>> record
> > > > > > >>>> > >> batches and maintaining application level metadata about
> > what
> > > > > > >>>> portion of
> > > > > > >>>> > >> the record batches has been 'filled' (so the unfilled
> > > > records can
> > > > > > >>>> be
> > > > > > >>>> > >> dropped by slicing). I don't think any change to the
> > binary
> > > > > > >>>> protocol is
> > > > > > >>>> > >> warranted." [3]
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> My personal opinion is that a solution to the problem you
> > > > have
> > > > > > can
> > > > > > >>>> be
> > > > > > >>>> > >> composed from the components (combined with some new
> > pieces
> > > > of
> > > > > > >>>> code)
> > > > > > >>>> > >> that we have developed in the project already.
> > > > > > >>>> > >>
> > > > > > >>>> > >> So the "application level" could be an add-on C++
> > component
> > > > in
> > > > > > the
> > > > > > >>>> > >> Apache Arrow project. Call it a "memory-mapped streaming
> > data
> > > > > > >>>> > >> collector" that pre-allocates on-disk record batches (of
> > only
> > > > > > >>>> > >> fixed-size or even possibly dictionary-encoded types) and
> > > > then
> > > > > > >>>> fills
> > > > > > >>>> > >> them incrementally as bits of data come in, updating some
> > > > > > auxiliary
> > > > > > >>>> > >> metadata that other processes can use to determine what
> > > > portion
> > > > > > of
> > > > > > >>>> the
> > > > > > >>>> > >> Arrow IPC messages to "slice off".
> > > > > > >>>> > >>
> > > > > > >>>> > >> > Concerns with positioning this at the app level:
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > 1- Do we need to address or begin to address the
> > overall
> > > > > > concern
> > > > > > >>>> of how
> > > > > > >>>> > >> Arrow data structures are to be used in "true"
> > > > (non-microbatch)
> > > > > > >>>> streaming
> > > > > > >>>> > >> environments, cf [1] in the last paragraph, as a
> > > > *first-class*
> > > > > > >>>> usage
> > > > > > >>>> > >> pattern?  If so, is now the time?
> > > > > > >>>> > >> >if you break that design invariant you don't have a
> > columnar
> > > > > > >>>> format
> > > > > > >>>> > >> anymore.
> > > > > > >>>> > >>
> > > > > > >>>> > >> Arrow provides a binary protocol for describing a payload
> > > > data on
> > > > > > >>>> the
> > > > > > >>>> > >> wire (or on-disk, or in-memory, all the same). I don't
> > see
> > > > how it
> > > > > > >>>> is
> > > > > > >>>> > >> in conflict with streaming environments, unless the
> > streaming
> > > > > > >>>> > >> application has difficulty collecting multiple records
> > into
> > > > an
> > > > > > >>>> Arrow
> > > > > > >>>> > >> record batches. In that case, it's a system trade-off.
> > > > Currently
> > > > > > >>>> > >> people are using Avro with Kafka and sending one record
> > at a
> > > > > > time,
> > > > > > >>>> but
> > > > > > >>>> > >> then they're also spending a lot of CPU cycles in
> > > > serialization.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > 2- If we can even make broad-stroke attempts at data
> > > > structure
> > > > > > >>>> features
> > > > > > >>>> > >> that are likely to be useful when streaming becomes a
> > first
> > > > class
> > > > > > >>>> citizen,
> > > > > > >>>> > >> it reduces the chances of "breaking" format changes in
> > the
> > > > > > >>>> future.  I do
> > > > > > >>>> > >> not believe the proposals place an undue hardship on
> > batch
> > > > > > >>>> processing
> > > > > > >>>> > >> paradigms.  We are currently discussing making a breaking
> > > > change
> > > > > > >>>> to the IPC
> > > > > > >>>> > >> format [4], so there is a window of opportunity to
> > consider
> > > > > > >>>> features useful
> > > > > > >>>> > >> for streaming?  (Current clients can feel free to ignore
> > the
> > > > > > >>>> proposed
> > > > > > >>>> > >> "utilized" metadata of RecordBatch.)
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> I think the perception that streaming is not a first
> > class
> > > > > > citizen
> > > > > > >>>> is
> > > > > > >>>> > >> an editorialization (e.g. the article you cited was an
> > > > editorial
> > > > > > >>>> > >> written by an industry analyst based on an interview with
> > > > Jacques
> > > > > > >>>> and
> > > > > > >>>> > >> me). Columnar data formats in general are designed to
> > work
> > > > with
> > > > > > >>>> more
> > > > > > >>>> > >> than one value at a time (which we are calling a "batch"
> > but
> > > > I
> > > > > > >>>> think
> > > > > > >>>> > >> that's conflating terminology with the "batch processing"
> > > > > > paradigm
> > > > > > >>>> of
> > > > > > >>>> > >> Hadoop, etc.),
> > > > > > >>>> > >>
> > > > > > >>>> > >> > 3- Part of the promise of Arrow is that applications
> > are
> > > > not a
> > > > > > >>>> world
> > > > > > >>>> > >> unto themselves, but interoperate with other
> > Arrow-compliant
> > > > > > >>>> systems.  In
> > > > > > >>>> > >> my case, I would like users to be able to examine
> > > > RecordBatchs in
> > > > > > >>>> tools
> > > > > > >>>> > >> such as pyarrow without needing to be aware of any
> > streaming
> > > > > > >>>> app-specific
> > > > > > >>>> > >> metadata.  For example, a researcher may pull in an IPC
> > > > "File"
> > > > > > >>>> containing N
> > > > > > >>>> > >> RecordBatch messages corresponding to those in Fig 4.  I
> > > > would
> > > > > > >>>> very much
> > > > > > >>>> > >> like for this casual user to not have to apply N slice
> > > > operations
> > > > > > >>>> based on
> > > > > > >>>> > >> out-of-band data to get to the data that is relevant.
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> Per above, should this become a standard enough use
> > case, I
> > > > think
> > > > > > >>>> that
> > > > > > >>>> > >> code can be developed in the Apache project to address
> > it.
> > > > > > >>>> > >>
> > > > > > >>>> > >> > Devil's advocate:
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > 1- Concurrent access to a mutable (growing) RecordBatch
> > > > will
> > > > > > >>>> require
> > > > > > >>>> > >> synchronization of some sort to get consistent metadata
> > > > reads.
> > > > > > >>>> Since the
> > > > > > >>>> > >> above proposals do not specify how this synchronization
> > will
> > > > > > occur
> > > > > > >>>> for
> > > > > > >>>> > >> tools such as pyarrow (we can imagine a Python user
> > getting
> > > > > > >>>> synchronized
> > > > > > >>>> > >> access to File metadata and mapping a read-only area
> > before
> > > > the
> > > > > > >>>> writer is
> > > > > > >>>> > >> allowed to continue "appending" to this batch, or
> > batches to
> > > > this
> > > > > > >>>> File),
> > > > > > >>>> > >> some "unusual" code will be required anyway, so what is
> > the
> > > > harm
> > > > > > of
> > > > > > >>>> > >> consulting side-band data for slicing all the batches as
> > > > part of
> > > > > > >>>> this
> > > > > > >>>> > >> "unusual" code?  [Potential response: Yes, but it is
> > still
> > > > one
> > > > > > >>>> less thing
> > > > > > >>>> > >> to worry about, and perhaps first-class support for
> > common
> > > > > > >>>> synchronization
> > > > > > >>>> > >> patterns can be forthcoming?  These patterns may not
> > require
> > > > > > >>>> further format
> > > > > > >>>> > >> changes?]
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > My overall concern is that I see a lot of wasted effort
> > > > dealing
> > > > > > >>>> with
> > > > > > >>>> > >> the "impedance mismatch" between batch oriented and
> > streaming
> > > > > > >>>> systems.  I
> > > > > > >>>> > >> believe that "best practices" will begin (and continue!)
> > to
> > > > > > prefer
> > > > > > >>>> tools
> > > > > > >>>> > >> that help bridge the gap.  Certainly this is the case in
> > my
> > > > own
> > > > > > >>>> work.  I
> > > > > > >>>> > >> agree with the appraisal at the end of the ZDNet article.
> > > > If the
> > > > > > >>>> above is
> > > > > > >>>> > >> not a helpful solution, what other steps can be made?
> > Or if
> > > > > > Arrow
> > > > > > >>>> is
> > > > > > >>>> > >> intentionally confined to batch processing for the
> > > > foreseeable
> > > > > > >>>> future (in
> > > > > > >>>> > >> terms of first-class support), I'm interested in the
> > > > rationale.
> > > > > > >>>> Perhaps
> > > > > > >>>> > >> the feeling is that we avoid scope creep now (which I
> > > > understand
> > > > > > >>>> can be
> > > > > > >>>> > >> never-ending) even if it means a certain breaking change
> > in
> > > > the
> > > > > > >>>> future?
> > > > > > >>>> > >> >
> > > > > > >>>> > >>
> > > > > > >>>> > >> There's some semantic issues with what "streaming" and
> > > > "batch"
> > > > > > >>>> means.
> > > > > > >>>> > >> When people see "streaming" nowadays they think "Kafka"
> > (or
> > > > > > >>>> > >> Kafka-like). Single events flow in and out of streaming
> > > > > > computation
> > > > > > >>>> > >> nodes (e.g. like
> > https://apache.github.io/incubator-heron/
> > > > or
> > > > > > >>>> others).
> > > > > > >>>> > >> The "streaming" is more about computational semantics
> > than
> > > > data
> > > > > > >>>> > >> representation.
> > > > > > >>>> > >>
> > > > > > >>>> > >> The Arrow columnar format fundamentally deals with
> > multiple
> > > > > > >>>> records at
> > > > > > >>>> > >> a time (you can have a record batch with size 1, but
> > that is
> > > > not
> > > > > > >>>> going
> > > > > > >>>> > >> to be efficient). But I do not think Arrow is
> > "intentially
> > > > > > >>>> confined"
> > > > > > >>>> > >> to batch processing. If it makes sense to use a columnar
> > > > format
> > > > > > to
> > > > > > >>>> > >> represent data in a streaming application, then you can
> > > > certainly
> > > > > > >>>> use
> > > > > > >>>> > >> it for that. I'm aware of people successfully using Arrow
> > > > with
> > > > > > >>>> Kafka,
> > > > > > >>>> > >> for example.
> > > > > > >>>> > >>
> > > > > > >>>> > >> - Wes
> > > > > > >>>> > >>
> > > > > > >>>> > >> > Who else encounters the need to mix/match batch and
> > > > streaming,
> > > > > > >>>> and what
> > > > > > >>>> > >> are your experiences?
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > Thanks for the further consideration and discussion!
> > > > > > >>>> > >> >
> > > > > > >>>> > >> > [1] https://zd.net/2H0LlBY
> > > > > > >>>> > >> > [2] https://arrow.apache.org/docs/python/data.html
> > > > > > >>>> > >> > [3] https://bit.ly/2J5sENZ
> > > > > > >>>> > >> > [4] https://bit.ly/2Yske8L
> > > > > > >>>> > >>
> > > > > > >>>> > >
> > > > > > >>>>
> > > > > > >>>
> > > > > >
> > > >
> >
> >

Reply via email to