Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-19 Thread Micah Kornfield
Hi John,

> Just concerned about running out of time if we are freezing the format at
> 1.0.  Is there any rough guess as to how much time I have to make my case?


We haven't discussed it formally, but we have been doing releases ~3
months, so i would expect 1.0.0 sometime in January.

On Fri, Oct 18, 2019 at 10:18 AM John Muehlhausen  wrote:

> Perhaps what I should do is study the batch creation process in the
> reference implementation and see whether an alternative approach can be a
> lot more efficient in time while being less efficient in space.
>
> And if so, whether this new approach also requires a differentiation
> between batch length and underlying array/buffer length, even while the
> immutability doctrine of a batch remains unchanged.  (I.e. remove "mutating
> record batch headers on the fly" from the discussion as potentially
> unnecessary to make the point.)
>
> Just concerned about running out of time if we are freezing the format at
> 1.0.  Is there any rough guess as to how much time I have to make my case?
>
> -John
>
> On Fri, Oct 18, 2019 at 1:55 AM Micah Kornfield 
> wrote:
>
>> On the specification: I'm -.5 on saying array lengths can be different
>> then row batch length (especially if both are valid lengths).  I can see
>> some wiggle room the the current language [1][2] that might allow for
>> modifying this, so I think we should update it one way or another however
>> this conversation turns out.
>>
>> On the implementation: I wouldn't want to see code that is mutating the
>> record batch headers on the fly in the current code base.  It is dangerous
>> to break immutability assumptions across which standard implementations
>> might be relying on. IMO, for such a specialized use-case it makes sense to
>> have out-of-band communication (perhaps in a private implementation that
>> does modify headers) and then "touch" up the metadata for later analysis,
>> so it conforms to the specification (and standard libraries can be used).
>>
>> [1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49
>> [2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34
>>
>> On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen  wrote:
>>
>>> Micah, thanks very much for your input.  A few thoughts in response:
>>>
>>> ``Over the time horizon of desired latency if you aren't receiving
>>> enough messages to take advantage of columnar analytics, a system probably
>>> has enough time to compact batches after the fact for later analysis and
>>> conversely if you are receiving many events you naturally get reasonable
>>> batch sizes without having to do further work.''
>>>
>>> To see my perspective it is necessary to stop thinking about business
>>> analytics and latencies of minutes or seconds or significant fractions of
>>> seconds.  In real-time financial trading systems we do things like kernel
>>> bypass networking, core affinitization and spinning, fake workloads to keep
>>> hot paths cached, almost complete avoidance of general purpose allocators
>>> (preferring typed pools), all with the goal of shaving off one more
>>> MICROsecond.
>>>
>>> Therefore "probably has enough time" is never in our vocabulary.
>>> Microbatching is not a thing.  It is either purely event driven or bust if
>>> you are shooting for a dozen microseconds from packet-in to packet-out on
>>> the wire.  This is also the reason that certain high-performance systems
>>> will never use the reference implementations of Arrow, just as most other
>>> financial tech standards are centered on the *protocol* and there end up
>>> being dozens of implementations that compete on performance.
>>>
>>> Arrow *as a protocol* has the potential to be a beautiful thing.  It is
>>> simple and straightforward and brings needed standardization.  I would
>>> encourage thinking of the reference implementations almost as second-class
>>> citizens.  The reference implementations should not de-facto define the
>>> protocol, but vice versa. (I know this is probably already the philosophy.)
>>>
>>> Of course data eventually pops out of high performance systems and has a
>>> second life in the hands of researchers, etc.  This is why I'd like the
>>> "vanilla" Arrow to be able to deal with Arrow data as-constructed in the
>>> higher performance systems.  Could there be special processes to refactor
>>> all this data?  Of course, but why?  If I'm logging RecordBatches to disk
>>> and some of them happen to have extra array elements (e.g. because of the
>>> inevitable imperfect projection pre-allocation-- running out of variable
>>> length string storage before running out of rows), why would I refactor
>>> terabytes of data (and, more importantly, introduce a time window where
>>> data is not available while this occurs) when I can just have pyarrow skip
>>> the unused rows?  If I want compression/de-duplication I'll do it at the
>>> storage media layer.
>>>
>>> ``the proposal is essentially changing the unit of exchange from
>>> 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-18 Thread John Muehlhausen
Perhaps what I should do is study the batch creation process in the
reference implementation and see whether an alternative approach can be a
lot more efficient in time while being less efficient in space.

And if so, whether this new approach also requires a differentiation
between batch length and underlying array/buffer length, even while the
immutability doctrine of a batch remains unchanged.  (I.e. remove "mutating
record batch headers on the fly" from the discussion as potentially
unnecessary to make the point.)

Just concerned about running out of time if we are freezing the format at
1.0.  Is there any rough guess as to how much time I have to make my case?

-John

On Fri, Oct 18, 2019 at 1:55 AM Micah Kornfield 
wrote:

> On the specification: I'm -.5 on saying array lengths can be different
> then row batch length (especially if both are valid lengths).  I can see
> some wiggle room the the current language [1][2] that might allow for
> modifying this, so I think we should update it one way or another however
> this conversation turns out.
>
> On the implementation: I wouldn't want to see code that is mutating the
> record batch headers on the fly in the current code base.  It is dangerous
> to break immutability assumptions across which standard implementations
> might be relying on. IMO, for such a specialized use-case it makes sense to
> have out-of-band communication (perhaps in a private implementation that
> does modify headers) and then "touch" up the metadata for later analysis,
> so it conforms to the specification (and standard libraries can be used).
>
> [1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49
> [2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34
>
> On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen  wrote:
>
>> Micah, thanks very much for your input.  A few thoughts in response:
>>
>> ``Over the time horizon of desired latency if you aren't receiving enough
>> messages to take advantage of columnar analytics, a system probably has
>> enough time to compact batches after the fact for later analysis and
>> conversely if you are receiving many events you naturally get reasonable
>> batch sizes without having to do further work.''
>>
>> To see my perspective it is necessary to stop thinking about business
>> analytics and latencies of minutes or seconds or significant fractions of
>> seconds.  In real-time financial trading systems we do things like kernel
>> bypass networking, core affinitization and spinning, fake workloads to keep
>> hot paths cached, almost complete avoidance of general purpose allocators
>> (preferring typed pools), all with the goal of shaving off one more
>> MICROsecond.
>>
>> Therefore "probably has enough time" is never in our vocabulary.
>> Microbatching is not a thing.  It is either purely event driven or bust if
>> you are shooting for a dozen microseconds from packet-in to packet-out on
>> the wire.  This is also the reason that certain high-performance systems
>> will never use the reference implementations of Arrow, just as most other
>> financial tech standards are centered on the *protocol* and there end up
>> being dozens of implementations that compete on performance.
>>
>> Arrow *as a protocol* has the potential to be a beautiful thing.  It is
>> simple and straightforward and brings needed standardization.  I would
>> encourage thinking of the reference implementations almost as second-class
>> citizens.  The reference implementations should not de-facto define the
>> protocol, but vice versa. (I know this is probably already the philosophy.)
>>
>> Of course data eventually pops out of high performance systems and has a
>> second life in the hands of researchers, etc.  This is why I'd like the
>> "vanilla" Arrow to be able to deal with Arrow data as-constructed in the
>> higher performance systems.  Could there be special processes to refactor
>> all this data?  Of course, but why?  If I'm logging RecordBatches to disk
>> and some of them happen to have extra array elements (e.g. because of the
>> inevitable imperfect projection pre-allocation-- running out of variable
>> length string storage before running out of rows), why would I refactor
>> terabytes of data (and, more importantly, introduce a time window where
>> data is not available while this occurs) when I can just have pyarrow skip
>> the unused rows?  If I want compression/de-duplication I'll do it at the
>> storage media layer.
>>
>> ``the proposal is essentially changing the unit of exchange from
>> RecordBatches to a segment of a RecordBatch''
>>
>> It is the recognition that a RecordBatch may need to be computationally
>> useful before it is completely received/constructed, and without adding
>> additional overhead to the reception/construction process.  I grab a canned
>> RecordBatch from a pool of them and start filling it in.  If an earlier
>> RecordBatch falls out of the computation window, I put it back in the
>> pool.  At any 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-18 Thread Micah Kornfield
On the specification: I'm -.5 on saying array lengths can be different then
row batch length (especially if both are valid lengths).  I can see some
wiggle room the the current language [1][2] that might allow for modifying
this, so I think we should update it one way or another however this
conversation turns out.

On the implementation: I wouldn't want to see code that is mutating the
record batch headers on the fly in the current code base.  It is dangerous
to break immutability assumptions across which standard implementations
might be relying on. IMO, for such a specialized use-case it makes sense to
have out-of-band communication (perhaps in a private implementation that
does modify headers) and then "touch" up the metadata for later analysis,
so it conforms to the specification (and standard libraries can be used).

[1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49
[2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34

On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen  wrote:

> Micah, thanks very much for your input.  A few thoughts in response:
>
> ``Over the time horizon of desired latency if you aren't receiving enough
> messages to take advantage of columnar analytics, a system probably has
> enough time to compact batches after the fact for later analysis and
> conversely if you are receiving many events you naturally get reasonable
> batch sizes without having to do further work.''
>
> To see my perspective it is necessary to stop thinking about business
> analytics and latencies of minutes or seconds or significant fractions of
> seconds.  In real-time financial trading systems we do things like kernel
> bypass networking, core affinitization and spinning, fake workloads to keep
> hot paths cached, almost complete avoidance of general purpose allocators
> (preferring typed pools), all with the goal of shaving off one more
> MICROsecond.
>
> Therefore "probably has enough time" is never in our vocabulary.
> Microbatching is not a thing.  It is either purely event driven or bust if
> you are shooting for a dozen microseconds from packet-in to packet-out on
> the wire.  This is also the reason that certain high-performance systems
> will never use the reference implementations of Arrow, just as most other
> financial tech standards are centered on the *protocol* and there end up
> being dozens of implementations that compete on performance.
>
> Arrow *as a protocol* has the potential to be a beautiful thing.  It is
> simple and straightforward and brings needed standardization.  I would
> encourage thinking of the reference implementations almost as second-class
> citizens.  The reference implementations should not de-facto define the
> protocol, but vice versa. (I know this is probably already the philosophy.)
>
> Of course data eventually pops out of high performance systems and has a
> second life in the hands of researchers, etc.  This is why I'd like the
> "vanilla" Arrow to be able to deal with Arrow data as-constructed in the
> higher performance systems.  Could there be special processes to refactor
> all this data?  Of course, but why?  If I'm logging RecordBatches to disk
> and some of them happen to have extra array elements (e.g. because of the
> inevitable imperfect projection pre-allocation-- running out of variable
> length string storage before running out of rows), why would I refactor
> terabytes of data (and, more importantly, introduce a time window where
> data is not available while this occurs) when I can just have pyarrow skip
> the unused rows?  If I want compression/de-duplication I'll do it at the
> storage media layer.
>
> ``the proposal is essentially changing the unit of exchange from
> RecordBatches to a segment of a RecordBatch''
>
> It is the recognition that a RecordBatch may need to be computationally
> useful before it is completely received/constructed, and without adding
> additional overhead to the reception/construction process.  I grab a canned
> RecordBatch from a pool of them and start filling it in.  If an earlier
> RecordBatch falls out of the computation window, I put it back in the
> pool.  At any moment of time the batch advertises the truth:
> RecordBatch.length is the immutable section of data at this moment in
> time.  That section will not have changed in future moments.  It is also a
> recognition that some RecordBatches so-constructed are not completely
> fillable.  They are like a moving truck with a little empty space left.
> Sure, we could custom-build trucks to exactly fit the cargo, but why?  It
> takes too much time.  Grab a truck (or a RecordBatch) off the lot (pool)
> and go...  when you've filled it as much as possible, but not perfectly,
> grab another one that is exactly the same.
>
> I think my perception of the situation is clearest if we think about
> "frozen" or "sealed" RecordBatches that, during their construction stage,
> use a one-size-fits-all set of arrays and variable length buffer 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-17 Thread John Muehlhausen
Micah, thanks very much for your input.  A few thoughts in response:

``Over the time horizon of desired latency if you aren't receiving enough
messages to take advantage of columnar analytics, a system probably has
enough time to compact batches after the fact for later analysis and
conversely if you are receiving many events you naturally get reasonable
batch sizes without having to do further work.''

To see my perspective it is necessary to stop thinking about business
analytics and latencies of minutes or seconds or significant fractions of
seconds.  In real-time financial trading systems we do things like kernel
bypass networking, core affinitization and spinning, fake workloads to keep
hot paths cached, almost complete avoidance of general purpose allocators
(preferring typed pools), all with the goal of shaving off one more
MICROsecond.

Therefore "probably has enough time" is never in our vocabulary.
Microbatching is not a thing.  It is either purely event driven or bust if
you are shooting for a dozen microseconds from packet-in to packet-out on
the wire.  This is also the reason that certain high-performance systems
will never use the reference implementations of Arrow, just as most other
financial tech standards are centered on the *protocol* and there end up
being dozens of implementations that compete on performance.

Arrow *as a protocol* has the potential to be a beautiful thing.  It is
simple and straightforward and brings needed standardization.  I would
encourage thinking of the reference implementations almost as second-class
citizens.  The reference implementations should not de-facto define the
protocol, but vice versa. (I know this is probably already the philosophy.)

Of course data eventually pops out of high performance systems and has a
second life in the hands of researchers, etc.  This is why I'd like the
"vanilla" Arrow to be able to deal with Arrow data as-constructed in the
higher performance systems.  Could there be special processes to refactor
all this data?  Of course, but why?  If I'm logging RecordBatches to disk
and some of them happen to have extra array elements (e.g. because of the
inevitable imperfect projection pre-allocation-- running out of variable
length string storage before running out of rows), why would I refactor
terabytes of data (and, more importantly, introduce a time window where
data is not available while this occurs) when I can just have pyarrow skip
the unused rows?  If I want compression/de-duplication I'll do it at the
storage media layer.

``the proposal is essentially changing the unit of exchange from
RecordBatches to a segment of a RecordBatch''

It is the recognition that a RecordBatch may need to be computationally
useful before it is completely received/constructed, and without adding
additional overhead to the reception/construction process.  I grab a canned
RecordBatch from a pool of them and start filling it in.  If an earlier
RecordBatch falls out of the computation window, I put it back in the
pool.  At any moment of time the batch advertises the truth:
RecordBatch.length is the immutable section of data at this moment in
time.  That section will not have changed in future moments.  It is also a
recognition that some RecordBatches so-constructed are not completely
fillable.  They are like a moving truck with a little empty space left.
Sure, we could custom-build trucks to exactly fit the cargo, but why?  It
takes too much time.  Grab a truck (or a RecordBatch) off the lot (pool)
and go...  when you've filled it as much as possible, but not perfectly,
grab another one that is exactly the same.

I think my perception of the situation is clearest if we think about
"frozen" or "sealed" RecordBatches that, during their construction stage,
use a one-size-fits-all set of arrays and variable length buffer storage.
I grab a RecordBatch off the lot that is "for two int columns and a
variable length string column where the average expected length of a string
is 10."  I fill it, then I'm done.  If my strings were slightly longer than
expected I have extra array elements and RecordBatch.length is less than
array length.

While it is true that I have other use-cases in mind regarding simultaneous
collection and computation, I'm hoping that the moving truck analogy by
itself demonstrates enough efficiency advantages (as compared to typical
batch construction) to warrant this change.  Put simply, it is Arrow
accommodating more space/time tradeoff options than it currently does.

When I was baking plasma into a system for the first time, I ran into the
example that I create a record batch to a mock stream in order to know how
much plasma memory to allocate.  That is the kind of "build the moving
truck to fit the cargo" that I just can't have!

-John

On Wed, Oct 16, 2019 at 10:15 PM Micah Kornfield 
wrote:

> Hi John and Wes,
>
> A few thoughts:
> One of the issues which we didn't get into in prior discussions, is the
> proposal is essentially 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Micah Kornfield
Hi John and Wes,

A few thoughts:
One of the issues which we didn't get into in prior discussions, is the
proposal is essentially changing the unit of exchange from RecordBatches to
a segment of a RecordBatch.

I think I brought this up earlier in discussions, an interesting idea that
Trill [1], a columnar streaming engine, illustrates.  Over the time horizon
of desired latency if you aren't receiving enough messages to take
advantage of columnar analytics, a system probably has enough time to
compact batches after the fact for later analysis and conversely if you are
receiving many events you naturally get reasonable batch sizes without
having to do further work.


> I'm objecting to RecordBatch.length being inconsistent with the
> constituent field lengths, that's where the danger lies. If all of the
> lengths are consistent, no code changes are necessary.

John, is it  a viable solution to keep all length in sync for the use case
you are imagining?

A solution I like less, but might be viable: formally specify a negative
constant that signifies length should be inherited from RowBatch length
(this could only be used on top level fields).

I contend that it can only be useful and will never be harmful.  What are
> the counter-examples of concrete harm?


I'm not sure there is anything obviously wrong, however changes to
semantics are always dangerous.  One  blemish on the current proposal  is
one can't determine easily if a mismatch in row-length is a programming
error or intentional.

[1]
https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf

On Wed, Oct 16, 2019 at 4:41 PM John Muehlhausen  wrote:

> "that's where the danger lies"
>
> What danger?  I have no idea what the specific danger is, assuming that all
> reference implementations have test cases that hedge around this.
>
> I contend that it can only be useful and will never be harmful.  What are
> the counter-examples of concrete harm?
>


Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread John Muehlhausen
"that's where the danger lies"

What danger?  I have no idea what the specific danger is, assuming that all
reference implementations have test cases that hedge around this.

I contend that it can only be useful and will never be harmful.  What are
the counter-examples of concrete harm?


Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Wes McKinney
On Wed, Oct 16, 2019 at 12:32 PM John Muehlhausen  wrote:
>
> I really need to "get into the zone" on some other development today, but I
> want to remind us of something earlier in the thread that gave me the
> impression I wasn't stomping on too many paradigms with this proposal:
>
> Wes: ``So the "length" field in RecordBatch is already the utilized number
> of rows. The body buffers can certainly have excess unused space. So
> your application can mutate Flatbuffer "length" field in-place as
> new records are filled in.''

I'm objecting to RecordBatch.length being inconsistent with the
constituent field lengths, that's where the danger lies. If all of the
lengths are consistent, no code changes are necessary.

> If RecordBatch.length is the utilized number of rows then my PR makes this
> actually true.  Yes, we need it in a handful of implementations.  I'm
> willing to provide all of them.  To me that is the lowest complexity
> solution.
>
> -John
>
> On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney  wrote:
>
> > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen  wrote:
> > >
> > > "pyarrow is intended as a developer-facing library, not a user-facing
> > one"
> > >
> > > Is that really the core issue?  I doubt you would want to add this
> > proposed
> > > logic to pandas even though it is user-facing, because then pandas will
> > > either have to re-implement what it means to read a batch (to respect
> > > length when it is smaller than array length) or else rely on the single
> > > blessed custom metadata for doing this, which doesn't make it custom
> > > anymore.
> >
> > What you have proposed in your PR amounts to an alteration of the IPC
> > format to suit this use case. This pushes complexity onto _every_
> > implementation that will need to worry about a "truncated" record
> > batch. I'd rather avoid this unless it is truly the only way.
> >
> > Note that we serialize a significant amount of custom metadata already
> > to address pandas-specific issues, and have not had to make any
> > changes to the columnar format as a result.
> >
> > > I think really your concern is that perhaps nobody wants this but me,
> > > therefore it should not be in arrow or pandas regardless of whether it is
> > > user-facing?  But, if that is your thinking, is it true?  What is our
> > > solution to the locality/latency problem for systems that ingest and
> > > process concurrently, if not this solution?  I do see it as a general
> > > problem that needs at least the beginnings of a general solution... not a
> > > "custom" one.
> >
> > We use the custom_metadata fields to implement a number of built-in
> > things in the project, such as extension types. If enough people find
> > this useful, then it can be promoted to a formalized concept. As far
> > as I can tell, you have developed quite a bit of custom code related
> > to this for your application, including manipulating Flatbuffers
> > metadata in place to maintain the populated length, so the barrier to
> > entry to being able to properly take advantage of this is rather high.
> >
> > > Also, I wonder whether it is true that pyarrow avoids smart/magical
> > > things.  The entire concept of a "Table" seems to be in that category?
> > The
> > > docs specifically mention that it is for convenience.
> > >
> >
> > Table arose out of legitimate developer need. There are a number of
> > areas of the project that would be much more difficult if we had to
> > worry about regularizing column chunking at any call site that returns
> > an in-memory dataset.
> >
> > > I'd like to focus on two questions:
> > > 1- What is the Arrow general solution to the locality/latency tradeoff
> > > problem for systems that ingest and process data concurrently?  This
> > > proposed solution or something else?  Or if we propose not to address the
> > > problem, why?
> > > 2- What will the proposed change negatively impact?  It seems that all we
> > > are talking about is respecting batch length if arrays happen to be
> > longer.
> >
> > I'm suggesting to help you solve the post-read truncation problem
> > without modifying the IPC protocol. If you want to make things work
> > for the users without knowledge, I think this can be achieved through
> > a plug-in API to define a metadata handler-callback to apply the
> > truncation to the record batches.
> >
> > > Thanks,
> > > -John
> > >
> > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney 
> > wrote:
> > >
> > > > hi John,
> > > >
> > > > > As a practical matter, the reason metadata is not a good solution
> > for me
> > > > is that it requires awareness on the part of the reader.  I want
> > (e.g.) a
> > > > researcher in Python to be able to map a file of batches in IPC format
> > > > without needing to worry about the fact that the file was built in a
> > > > streaming fashion and therefore has some unused array elements.
> > > >
> > > > I don't find this argument to be persuasive.
> > > >
> > > > pyarrow is intended as a developer-facing 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Wes McKinney
hi John,

On Wed, Oct 16, 2019 at 11:59 AM John Muehlhausen  wrote:
>
> I'm in Python, I'm a user, and I'm not allowed to import pyarrow because it
> isn't for me.

I think you're misrepresenting what I'm saying.

It's our expectations that users will largely consume pyarrow
indirectly as a dependency rather than using it directly. Not every
piece of software needs to be designed around the needs of end users.

>
> There exists some Arrow record batches in plasma.  I need to get one slice
> of one batch as a pandas dataframe.
>
> What do I do?
>
> There exists some Arrow record batches in a file.  I need to get one slice
> of one batch as a pandas dataframe.
>
> What do I do?
>
> Are you contemplating that all of the above is possible using only
> pandas APIs?
>
> Does "one slice of one batch" go away once pandas (version 2) does not
> require conversion, since it will be zero copy and the user can slice in
> pandas with no performance hit?
>
> I'm really stumbling over this idea that users can't import pyarrow.  I'm
> not sure it makes sense to continue to discuss user-level IPC (plasma,
> files, etc) until I can come to grips with how users use pyarrow without
> importing it.

I'm not saying that. I'm suggesting that you should provide your users
with convenient functions that handle the low-level details for them
as intended automatically.

>
> Once I see how it works without my proposed change, we can go back to how
> the user ignores the empty/undefined array portions without knowing whether
> they exist.
>
> -John
>
> On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney  wrote:
>
> > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen  wrote:
> > >
> > > "pyarrow is intended as a developer-facing library, not a user-facing
> > one"
> > >
> > > Is that really the core issue?  I doubt you would want to add this
> > proposed
> > > logic to pandas even though it is user-facing, because then pandas will
> > > either have to re-implement what it means to read a batch (to respect
> > > length when it is smaller than array length) or else rely on the single
> > > blessed custom metadata for doing this, which doesn't make it custom
> > > anymore.
> >
> > What you have proposed in your PR amounts to an alteration of the IPC
> > format to suit this use case. This pushes complexity onto _every_
> > implementation that will need to worry about a "truncated" record
> > batch. I'd rather avoid this unless it is truly the only way.
> >
> > Note that we serialize a significant amount of custom metadata already
> > to address pandas-specific issues, and have not had to make any
> > changes to the columnar format as a result.
> >
> > > I think really your concern is that perhaps nobody wants this but me,
> > > therefore it should not be in arrow or pandas regardless of whether it is
> > > user-facing?  But, if that is your thinking, is it true?  What is our
> > > solution to the locality/latency problem for systems that ingest and
> > > process concurrently, if not this solution?  I do see it as a general
> > > problem that needs at least the beginnings of a general solution... not a
> > > "custom" one.
> >
> > We use the custom_metadata fields to implement a number of built-in
> > things in the project, such as extension types. If enough people find
> > this useful, then it can be promoted to a formalized concept. As far
> > as I can tell, you have developed quite a bit of custom code related
> > to this for your application, including manipulating Flatbuffers
> > metadata in place to maintain the populated length, so the barrier to
> > entry to being able to properly take advantage of this is rather high.
> >
> > > Also, I wonder whether it is true that pyarrow avoids smart/magical
> > > things.  The entire concept of a "Table" seems to be in that category?
> > The
> > > docs specifically mention that it is for convenience.
> > >
> >
> > Table arose out of legitimate developer need. There are a number of
> > areas of the project that would be much more difficult if we had to
> > worry about regularizing column chunking at any call site that returns
> > an in-memory dataset.
> >
> > > I'd like to focus on two questions:
> > > 1- What is the Arrow general solution to the locality/latency tradeoff
> > > problem for systems that ingest and process data concurrently?  This
> > > proposed solution or something else?  Or if we propose not to address the
> > > problem, why?
> > > 2- What will the proposed change negatively impact?  It seems that all we
> > > are talking about is respecting batch length if arrays happen to be
> > longer.
> >
> > I'm suggesting to help you solve the post-read truncation problem
> > without modifying the IPC protocol. If you want to make things work
> > for the users without knowledge, I think this can be achieved through
> > a plug-in API to define a metadata handler-callback to apply the
> > truncation to the record batches.
> >
> > > Thanks,
> > > -John
> > >
> > > On Wed, Oct 16, 2019 at 8:37 AM 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread John Muehlhausen
I'm in Python, I'm a user, and I'm not allowed to import pyarrow because it
isn't for me.

There exists some Arrow record batches in plasma.  I need to get one slice
of one batch as a pandas dataframe.

What do I do?

There exists some Arrow record batches in a file.  I need to get one slice
of one batch as a pandas dataframe.

What do I do?

Are you contemplating that all of the above is possible using only
pandas APIs?

Does "one slice of one batch" go away once pandas (version 2) does not
require conversion, since it will be zero copy and the user can slice in
pandas with no performance hit?

I'm really stumbling over this idea that users can't import pyarrow.  I'm
not sure it makes sense to continue to discuss user-level IPC (plasma,
files, etc) until I can come to grips with how users use pyarrow without
importing it.

Once I see how it works without my proposed change, we can go back to how
the user ignores the empty/undefined array portions without knowing whether
they exist.

-John

On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney  wrote:

> On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen  wrote:
> >
> > "pyarrow is intended as a developer-facing library, not a user-facing
> one"
> >
> > Is that really the core issue?  I doubt you would want to add this
> proposed
> > logic to pandas even though it is user-facing, because then pandas will
> > either have to re-implement what it means to read a batch (to respect
> > length when it is smaller than array length) or else rely on the single
> > blessed custom metadata for doing this, which doesn't make it custom
> > anymore.
>
> What you have proposed in your PR amounts to an alteration of the IPC
> format to suit this use case. This pushes complexity onto _every_
> implementation that will need to worry about a "truncated" record
> batch. I'd rather avoid this unless it is truly the only way.
>
> Note that we serialize a significant amount of custom metadata already
> to address pandas-specific issues, and have not had to make any
> changes to the columnar format as a result.
>
> > I think really your concern is that perhaps nobody wants this but me,
> > therefore it should not be in arrow or pandas regardless of whether it is
> > user-facing?  But, if that is your thinking, is it true?  What is our
> > solution to the locality/latency problem for systems that ingest and
> > process concurrently, if not this solution?  I do see it as a general
> > problem that needs at least the beginnings of a general solution... not a
> > "custom" one.
>
> We use the custom_metadata fields to implement a number of built-in
> things in the project, such as extension types. If enough people find
> this useful, then it can be promoted to a formalized concept. As far
> as I can tell, you have developed quite a bit of custom code related
> to this for your application, including manipulating Flatbuffers
> metadata in place to maintain the populated length, so the barrier to
> entry to being able to properly take advantage of this is rather high.
>
> > Also, I wonder whether it is true that pyarrow avoids smart/magical
> > things.  The entire concept of a "Table" seems to be in that category?
> The
> > docs specifically mention that it is for convenience.
> >
>
> Table arose out of legitimate developer need. There are a number of
> areas of the project that would be much more difficult if we had to
> worry about regularizing column chunking at any call site that returns
> an in-memory dataset.
>
> > I'd like to focus on two questions:
> > 1- What is the Arrow general solution to the locality/latency tradeoff
> > problem for systems that ingest and process data concurrently?  This
> > proposed solution or something else?  Or if we propose not to address the
> > problem, why?
> > 2- What will the proposed change negatively impact?  It seems that all we
> > are talking about is respecting batch length if arrays happen to be
> longer.
>
> I'm suggesting to help you solve the post-read truncation problem
> without modifying the IPC protocol. If you want to make things work
> for the users without knowledge, I think this can be achieved through
> a plug-in API to define a metadata handler-callback to apply the
> truncation to the record batches.
>
> > Thanks,
> > -John
> >
> > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney 
> wrote:
> >
> > > hi John,
> > >
> > > > As a practical matter, the reason metadata is not a good solution
> for me
> > > is that it requires awareness on the part of the reader.  I want
> (e.g.) a
> > > researcher in Python to be able to map a file of batches in IPC format
> > > without needing to worry about the fact that the file was built in a
> > > streaming fashion and therefore has some unused array elements.
> > >
> > > I don't find this argument to be persuasive.
> > >
> > > pyarrow is intended as a developer-facing library, not a user-facing
> > > one. I don't think you should be having the kinds of users you are
> > > describing using pyarrow 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Wes McKinney
On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen  wrote:
>
> "pyarrow is intended as a developer-facing library, not a user-facing one"
>
> Is that really the core issue?  I doubt you would want to add this proposed
> logic to pandas even though it is user-facing, because then pandas will
> either have to re-implement what it means to read a batch (to respect
> length when it is smaller than array length) or else rely on the single
> blessed custom metadata for doing this, which doesn't make it custom
> anymore.

What you have proposed in your PR amounts to an alteration of the IPC
format to suit this use case. This pushes complexity onto _every_
implementation that will need to worry about a "truncated" record
batch. I'd rather avoid this unless it is truly the only way.

Note that we serialize a significant amount of custom metadata already
to address pandas-specific issues, and have not had to make any
changes to the columnar format as a result.

> I think really your concern is that perhaps nobody wants this but me,
> therefore it should not be in arrow or pandas regardless of whether it is
> user-facing?  But, if that is your thinking, is it true?  What is our
> solution to the locality/latency problem for systems that ingest and
> process concurrently, if not this solution?  I do see it as a general
> problem that needs at least the beginnings of a general solution... not a
> "custom" one.

We use the custom_metadata fields to implement a number of built-in
things in the project, such as extension types. If enough people find
this useful, then it can be promoted to a formalized concept. As far
as I can tell, you have developed quite a bit of custom code related
to this for your application, including manipulating Flatbuffers
metadata in place to maintain the populated length, so the barrier to
entry to being able to properly take advantage of this is rather high.

> Also, I wonder whether it is true that pyarrow avoids smart/magical
> things.  The entire concept of a "Table" seems to be in that category?  The
> docs specifically mention that it is for convenience.
>

Table arose out of legitimate developer need. There are a number of
areas of the project that would be much more difficult if we had to
worry about regularizing column chunking at any call site that returns
an in-memory dataset.

> I'd like to focus on two questions:
> 1- What is the Arrow general solution to the locality/latency tradeoff
> problem for systems that ingest and process data concurrently?  This
> proposed solution or something else?  Or if we propose not to address the
> problem, why?
> 2- What will the proposed change negatively impact?  It seems that all we
> are talking about is respecting batch length if arrays happen to be longer.

I'm suggesting to help you solve the post-read truncation problem
without modifying the IPC protocol. If you want to make things work
for the users without knowledge, I think this can be achieved through
a plug-in API to define a metadata handler-callback to apply the
truncation to the record batches.

> Thanks,
> -John
>
> On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney  wrote:
>
> > hi John,
> >
> > > As a practical matter, the reason metadata is not a good solution for me
> > is that it requires awareness on the part of the reader.  I want (e.g.) a
> > researcher in Python to be able to map a file of batches in IPC format
> > without needing to worry about the fact that the file was built in a
> > streaming fashion and therefore has some unused array elements.
> >
> > I don't find this argument to be persuasive.
> >
> > pyarrow is intended as a developer-facing library, not a user-facing
> > one. I don't think you should be having the kinds of users you are
> > describing using pyarrow directly, instead consuming the library
> > through a layer above it. Specifically, we are deliberately avoiding
> > doing anything too "smart" or "magical", instead maintaining tight
> > developer control over what is going on.
> >
> > - Wes
> >
> > On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield 
> > wrote:
> > >
> > > Still thinking through the implications here, but to save others from
> > > having to go search [1] is the PR.
> > >
> > > [1] https://github.com/apache/arrow/pull/5663/files
> > >
> > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen  wrote:
> > >
> > > > A proposal with linked PR now exists in ARROW-5916 and Wes commented
> > that
> > > > we should kick it around some more.
> > > >
> > > > The high-level topic is how Apache Arrow intersects with streaming
> > > > methodologies:
> > > >
> > > > If record batches are strictly immutable, a difficult trade-off is
> > created
> > > > for streaming data collection: either I can have low-latency
> > presentation
> > > > of new data by appending very small batches (often 1 row) to the IPC
> > stream
> > > > and lose columnar layout benefits, or I can have high-latency
> > presentation
> > > > of new data by waiting to append a batch until it is 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Wes McKinney
hi John,

> As a practical matter, the reason metadata is not a good solution for me is 
> that it requires awareness on the part of the reader.  I want (e.g.) a 
> researcher in Python to be able to map a file of batches in IPC format 
> without needing to worry about the fact that the file was built in a 
> streaming fashion and therefore has some unused array elements.

I don't find this argument to be persuasive.

pyarrow is intended as a developer-facing library, not a user-facing
one. I don't think you should be having the kinds of users you are
describing using pyarrow directly, instead consuming the library
through a layer above it. Specifically, we are deliberately avoiding
doing anything too "smart" or "magical", instead maintaining tight
developer control over what is going on.

- Wes

On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield  wrote:
>
> Still thinking through the implications here, but to save others from
> having to go search [1] is the PR.
>
> [1] https://github.com/apache/arrow/pull/5663/files
>
> On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen  wrote:
>
> > A proposal with linked PR now exists in ARROW-5916 and Wes commented that
> > we should kick it around some more.
> >
> > The high-level topic is how Apache Arrow intersects with streaming
> > methodologies:
> >
> > If record batches are strictly immutable, a difficult trade-off is created
> > for streaming data collection: either I can have low-latency presentation
> > of new data by appending very small batches (often 1 row) to the IPC stream
> > and lose columnar layout benefits, or I can have high-latency presentation
> > of new data by waiting to append a batch until it is large enough to gain
> > significant columnar layout benefits.  During this waiting period the new
> > data is unavailable to processing.
> >
> > If, on the other hand, [0,length) of a batch is immutable but length may
> > increase, the trade-off is eliminated: I can pre-allocate a batch and
> > populate records in it when they occur (without waiting), and also gain
> > columnar benefits as each "closed" batch will be large.  (A batch may be
> > practically "closed" before the arrays are full when the projection of
> > variable-length buffer space is wrong... a space/time tradeoff in favor of
> > time.)
> >
> > Looking ahead to a day when the reference implementation(s) will be able to
> > bump RecordBatch.length while populating pre-allocated records
> > in-place, ARROW-5916 reads such batches by ignoring portions of arrays that
> > are beyond RecordBatch.length.
> >
> > If we are not looking ahead to such a day, the discussion is about the
> > alternative way that Arrow will avoid the latency/locality tradeoff
> > inherent in streaming data collection.  Or, if the answer is "streaming
> > apps are and will always be out of scope", that idea needs to be defended
> > from the observation that practitioners are moving more towards the fusion
> > of batch and streaming, not away from it.
> >
> > As a practical matter, the reason metadata is not a good solution for me is
> > that it requires awareness on the part of the reader.  I want (e.g.) a
> > researcher in Python to be able to map a file of batches in IPC format
> > without needing to worry about the fact that the file was built in a
> > streaming fashion and therefore has some unused array elements.
> >
> > The change itself seems relatively simple.  What negative consequences do
> > we anticipate, if any?
> >
> > Thanks,
> > -John
> >
> > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen  wrote:
> >
> > > This seems to help... still testing it though.
> > >
> > >   Status GetFieldMetadata(int field_index, ArrayData* out) {
> > > auto nodes = metadata_->nodes();
> > > // pop off a field
> > > if (field_index >= static_cast(nodes->size())) {
> > >   return Status::Invalid("Ran out of field metadata, likely
> > > malformed");
> > > }
> > > const flatbuf::FieldNode* node = nodes->Get(field_index);
> > >
> > > *//out->length = node->length();*
> > > *out->length = metadata_->length();*
> > > out->null_count = node->null_count();
> > > out->offset = 0;
> > > return Status::OK();
> > >   }
> > >
> > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen  wrote:
> > >
> > >> So far it seems as if pyarrow is completely ignoring the
> > >> RecordBatch.length field.  More info to follow...
> > >>
> > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen  wrote:
> > >>
> > >>> Crikey! I'll do some testing around that and suggest some test cases to
> > >>> ensure it continues to work, assuming that it does.
> > >>>
> > >>> -John
> > >>>
> > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney 
> > wrote:
> > >>>
> >  Thanks for the attachment, it's helpful.
> > 
> >  On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
> >  >
> >  > Attachments referred to in previous two messages:
> >  >
> > 
> > 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-16 Thread Micah Kornfield
Still thinking through the implications here, but to save others from
having to go search [1] is the PR.

[1] https://github.com/apache/arrow/pull/5663/files

On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen  wrote:

> A proposal with linked PR now exists in ARROW-5916 and Wes commented that
> we should kick it around some more.
>
> The high-level topic is how Apache Arrow intersects with streaming
> methodologies:
>
> If record batches are strictly immutable, a difficult trade-off is created
> for streaming data collection: either I can have low-latency presentation
> of new data by appending very small batches (often 1 row) to the IPC stream
> and lose columnar layout benefits, or I can have high-latency presentation
> of new data by waiting to append a batch until it is large enough to gain
> significant columnar layout benefits.  During this waiting period the new
> data is unavailable to processing.
>
> If, on the other hand, [0,length) of a batch is immutable but length may
> increase, the trade-off is eliminated: I can pre-allocate a batch and
> populate records in it when they occur (without waiting), and also gain
> columnar benefits as each "closed" batch will be large.  (A batch may be
> practically "closed" before the arrays are full when the projection of
> variable-length buffer space is wrong... a space/time tradeoff in favor of
> time.)
>
> Looking ahead to a day when the reference implementation(s) will be able to
> bump RecordBatch.length while populating pre-allocated records
> in-place, ARROW-5916 reads such batches by ignoring portions of arrays that
> are beyond RecordBatch.length.
>
> If we are not looking ahead to such a day, the discussion is about the
> alternative way that Arrow will avoid the latency/locality tradeoff
> inherent in streaming data collection.  Or, if the answer is "streaming
> apps are and will always be out of scope", that idea needs to be defended
> from the observation that practitioners are moving more towards the fusion
> of batch and streaming, not away from it.
>
> As a practical matter, the reason metadata is not a good solution for me is
> that it requires awareness on the part of the reader.  I want (e.g.) a
> researcher in Python to be able to map a file of batches in IPC format
> without needing to worry about the fact that the file was built in a
> streaming fashion and therefore has some unused array elements.
>
> The change itself seems relatively simple.  What negative consequences do
> we anticipate, if any?
>
> Thanks,
> -John
>
> On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen  wrote:
>
> > This seems to help... still testing it though.
> >
> >   Status GetFieldMetadata(int field_index, ArrayData* out) {
> > auto nodes = metadata_->nodes();
> > // pop off a field
> > if (field_index >= static_cast(nodes->size())) {
> >   return Status::Invalid("Ran out of field metadata, likely
> > malformed");
> > }
> > const flatbuf::FieldNode* node = nodes->Get(field_index);
> >
> > *//out->length = node->length();*
> > *out->length = metadata_->length();*
> > out->null_count = node->null_count();
> > out->offset = 0;
> > return Status::OK();
> >   }
> >
> > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen  wrote:
> >
> >> So far it seems as if pyarrow is completely ignoring the
> >> RecordBatch.length field.  More info to follow...
> >>
> >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen  wrote:
> >>
> >>> Crikey! I'll do some testing around that and suggest some test cases to
> >>> ensure it continues to work, assuming that it does.
> >>>
> >>> -John
> >>>
> >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney 
> wrote:
> >>>
>  Thanks for the attachment, it's helpful.
> 
>  On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
>  >
>  > Attachments referred to in previous two messages:
>  >
> 
> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
>  >
>  > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen 
> wrote:
>  >
>  > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
>  > > engagement.  In order to clarify things a bit, I am attaching a
>  graphic of
>  > > how our application will take record-wise (row-oriented) data from
>  an event
>  > > source and incrementally populate a pre-allocated Arrow-compatible
>  buffer,
>  > > including for variable-length fields.  (Obviously at this stage I
>  am not
>  > > using the reference implementation Arrow code, although that would
>  be a
>  > > goal to contribute that back to the project.)
>  > >
>  > > For sake of simplicity these are non-nullable fields.  As a
> result a
>  > > reader of "y" that has no knowledge of the "utilized" metadata
>  would get a
>  > > long string (zeros, spaces, uninitialized, or whatever we decide
>  for the
>  > > pre-allocation model) for the record just beyond the last utilized

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-10-15 Thread John Muehlhausen
A proposal with linked PR now exists in ARROW-5916 and Wes commented that
we should kick it around some more.

The high-level topic is how Apache Arrow intersects with streaming
methodologies:

If record batches are strictly immutable, a difficult trade-off is created
for streaming data collection: either I can have low-latency presentation
of new data by appending very small batches (often 1 row) to the IPC stream
and lose columnar layout benefits, or I can have high-latency presentation
of new data by waiting to append a batch until it is large enough to gain
significant columnar layout benefits.  During this waiting period the new
data is unavailable to processing.

If, on the other hand, [0,length) of a batch is immutable but length may
increase, the trade-off is eliminated: I can pre-allocate a batch and
populate records in it when they occur (without waiting), and also gain
columnar benefits as each "closed" batch will be large.  (A batch may be
practically "closed" before the arrays are full when the projection of
variable-length buffer space is wrong... a space/time tradeoff in favor of
time.)

Looking ahead to a day when the reference implementation(s) will be able to
bump RecordBatch.length while populating pre-allocated records
in-place, ARROW-5916 reads such batches by ignoring portions of arrays that
are beyond RecordBatch.length.

If we are not looking ahead to such a day, the discussion is about the
alternative way that Arrow will avoid the latency/locality tradeoff
inherent in streaming data collection.  Or, if the answer is "streaming
apps are and will always be out of scope", that idea needs to be defended
from the observation that practitioners are moving more towards the fusion
of batch and streaming, not away from it.

As a practical matter, the reason metadata is not a good solution for me is
that it requires awareness on the part of the reader.  I want (e.g.) a
researcher in Python to be able to map a file of batches in IPC format
without needing to worry about the fact that the file was built in a
streaming fashion and therefore has some unused array elements.

The change itself seems relatively simple.  What negative consequences do
we anticipate, if any?

Thanks,
-John

On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen  wrote:

> This seems to help... still testing it though.
>
>   Status GetFieldMetadata(int field_index, ArrayData* out) {
> auto nodes = metadata_->nodes();
> // pop off a field
> if (field_index >= static_cast(nodes->size())) {
>   return Status::Invalid("Ran out of field metadata, likely
> malformed");
> }
> const flatbuf::FieldNode* node = nodes->Get(field_index);
>
> *//out->length = node->length();*
> *out->length = metadata_->length();*
> out->null_count = node->null_count();
> out->offset = 0;
> return Status::OK();
>   }
>
> On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen  wrote:
>
>> So far it seems as if pyarrow is completely ignoring the
>> RecordBatch.length field.  More info to follow...
>>
>> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen  wrote:
>>
>>> Crikey! I'll do some testing around that and suggest some test cases to
>>> ensure it continues to work, assuming that it does.
>>>
>>> -John
>>>
>>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney  wrote:
>>>
 Thanks for the attachment, it's helpful.

 On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
 >
 > Attachments referred to in previous two messages:
 >
 https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
 >
 > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:
 >
 > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
 > > engagement.  In order to clarify things a bit, I am attaching a
 graphic of
 > > how our application will take record-wise (row-oriented) data from
 an event
 > > source and incrementally populate a pre-allocated Arrow-compatible
 buffer,
 > > including for variable-length fields.  (Obviously at this stage I
 am not
 > > using the reference implementation Arrow code, although that would
 be a
 > > goal to contribute that back to the project.)
 > >
 > > For sake of simplicity these are non-nullable fields.  As a result a
 > > reader of "y" that has no knowledge of the "utilized" metadata
 would get a
 > > long string (zeros, spaces, uninitialized, or whatever we decide
 for the
 > > pre-allocation model) for the record just beyond the last utilized
 record.
 > >
 > > I don't see any "big O"-analysis problems with this approach.  The
 > > space/time tradeoff is that we have to guess how much room to
 allocate for
 > > variable-length fields.  We will probably almost always be wrong.
 This
 > > ends up in "wasted" space.  However, we can do calculations based
 on these
 > > partially filled batches that take full advantage of the 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-05 Thread John Muehlhausen
This seems to help... still testing it though.

  Status GetFieldMetadata(int field_index, ArrayData* out) {
auto nodes = metadata_->nodes();
// pop off a field
if (field_index >= static_cast(nodes->size())) {
  return Status::Invalid("Ran out of field metadata, likely malformed");
}
const flatbuf::FieldNode* node = nodes->Get(field_index);

*//out->length = node->length();*
*out->length = metadata_->length();*
out->null_count = node->null_count();
out->offset = 0;
return Status::OK();
  }

On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen  wrote:

> So far it seems as if pyarrow is completely ignoring the
> RecordBatch.length field.  More info to follow...
>
> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen  wrote:
>
>> Crikey! I'll do some testing around that and suggest some test cases to
>> ensure it continues to work, assuming that it does.
>>
>> -John
>>
>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney  wrote:
>>
>>> Thanks for the attachment, it's helpful.
>>>
>>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
>>> >
>>> > Attachments referred to in previous two messages:
>>> >
>>> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
>>> >
>>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:
>>> >
>>> > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
>>> > > engagement.  In order to clarify things a bit, I am attaching a
>>> graphic of
>>> > > how our application will take record-wise (row-oriented) data from
>>> an event
>>> > > source and incrementally populate a pre-allocated Arrow-compatible
>>> buffer,
>>> > > including for variable-length fields.  (Obviously at this stage I am
>>> not
>>> > > using the reference implementation Arrow code, although that would
>>> be a
>>> > > goal to contribute that back to the project.)
>>> > >
>>> > > For sake of simplicity these are non-nullable fields.  As a result a
>>> > > reader of "y" that has no knowledge of the "utilized" metadata would
>>> get a
>>> > > long string (zeros, spaces, uninitialized, or whatever we decide for
>>> the
>>> > > pre-allocation model) for the record just beyond the last utilized
>>> record.
>>> > >
>>> > > I don't see any "big O"-analysis problems with this approach.  The
>>> > > space/time tradeoff is that we have to guess how much room to
>>> allocate for
>>> > > variable-length fields.  We will probably almost always be wrong.
>>> This
>>> > > ends up in "wasted" space.  However, we can do calculations based on
>>> these
>>> > > partially filled batches that take full advantage of the columnar
>>> layout.
>>> > >  (Here I've shown the case where we had too little variable-length
>>> buffer
>>> > > set aside, resulting in "wasted" rows.  The flip side is that rows
>>> achieve
>>> > > full [1] utilization but there is wasted variable-length buffer if
>>> we guess
>>> > > incorrectly in the other direction.)
>>> > >
>>> > > I proposed a few things that are "nice to have" but really what I'm
>>> eyeing
>>> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see
>>> that some
>>> > > of the rows in a RecordBatch are not to be read, based on the new
>>> > > "utilized" (or whatever name) metadata.  That single tweak to the
>>> > > metadata-- and readers honoring it-- is the core of the proposal.
>>> > >  (Proposal 4.)  This would indicate that the attached example (or
>>> something
>>> > > similar) is the blessed approach for those seeking to accumulate
>>> events and
>>> > > process them while still expecting more data, with the
>>> heavier-weight task
>>> > > of creating a new pre-allocated batch being a rare occurrence.
>>> > >
>>>
>>> So the "length" field in RecordBatch is already the utilized number of
>>> rows. The body buffers can certainly have excess unused space. So your
>>> application can mutate Flatbuffer "length" field in-place as new
>>> records are filled in.
>>>
>>> > > Notice that the mutability is only in the sense of "appending."  The
>>> > > current doctrine of total immutability would be revised to refer to
>>> the
>>> > > immutability of only the already-populated rows.
>>> > >
>>> > > It gives folks an option other than choosing the lesser of two
>>> evils: on
>>> > > the one hand, length 1 RecordBatches that don't result in a stream
>>> that is
>>> > > computationally efficient.  On the other hand, adding artificial
>>> latency by
>>> > > accumulating events before "freezing" a larger batch and only then
>>> making
>>> > > it available to computation.
>>> > >
>>> > > -John
>>> > >
>>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney 
>>> wrote:
>>> > >
>>> > >> hi John,
>>> > >>
>>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen 
>>> wrote:
>>> > >> >
>>> > >> > During my time building financial analytics and trading systems
>>> (23
>>> > >> years!), both the "batch processing" and "stream processing"
>>> paradigms have
>>> > >> been extensively used by myself and by colleagues.
>>> > 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-05 Thread John Muehlhausen
So far it seems as if pyarrow is completely ignoring the RecordBatch.length
field.  More info to follow...

On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen  wrote:

> Crikey! I'll do some testing around that and suggest some test cases to
> ensure it continues to work, assuming that it does.
>
> -John
>
> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney  wrote:
>
>> Thanks for the attachment, it's helpful.
>>
>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
>> >
>> > Attachments referred to in previous two messages:
>> >
>> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
>> >
>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:
>> >
>> > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
>> > > engagement.  In order to clarify things a bit, I am attaching a
>> graphic of
>> > > how our application will take record-wise (row-oriented) data from an
>> event
>> > > source and incrementally populate a pre-allocated Arrow-compatible
>> buffer,
>> > > including for variable-length fields.  (Obviously at this stage I am
>> not
>> > > using the reference implementation Arrow code, although that would be
>> a
>> > > goal to contribute that back to the project.)
>> > >
>> > > For sake of simplicity these are non-nullable fields.  As a result a
>> > > reader of "y" that has no knowledge of the "utilized" metadata would
>> get a
>> > > long string (zeros, spaces, uninitialized, or whatever we decide for
>> the
>> > > pre-allocation model) for the record just beyond the last utilized
>> record.
>> > >
>> > > I don't see any "big O"-analysis problems with this approach.  The
>> > > space/time tradeoff is that we have to guess how much room to
>> allocate for
>> > > variable-length fields.  We will probably almost always be wrong.
>> This
>> > > ends up in "wasted" space.  However, we can do calculations based on
>> these
>> > > partially filled batches that take full advantage of the columnar
>> layout.
>> > >  (Here I've shown the case where we had too little variable-length
>> buffer
>> > > set aside, resulting in "wasted" rows.  The flip side is that rows
>> achieve
>> > > full [1] utilization but there is wasted variable-length buffer if we
>> guess
>> > > incorrectly in the other direction.)
>> > >
>> > > I proposed a few things that are "nice to have" but really what I'm
>> eyeing
>> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that
>> some
>> > > of the rows in a RecordBatch are not to be read, based on the new
>> > > "utilized" (or whatever name) metadata.  That single tweak to the
>> > > metadata-- and readers honoring it-- is the core of the proposal.
>> > >  (Proposal 4.)  This would indicate that the attached example (or
>> something
>> > > similar) is the blessed approach for those seeking to accumulate
>> events and
>> > > process them while still expecting more data, with the heavier-weight
>> task
>> > > of creating a new pre-allocated batch being a rare occurrence.
>> > >
>>
>> So the "length" field in RecordBatch is already the utilized number of
>> rows. The body buffers can certainly have excess unused space. So your
>> application can mutate Flatbuffer "length" field in-place as new
>> records are filled in.
>>
>> > > Notice that the mutability is only in the sense of "appending."  The
>> > > current doctrine of total immutability would be revised to refer to
>> the
>> > > immutability of only the already-populated rows.
>> > >
>> > > It gives folks an option other than choosing the lesser of two evils:
>> on
>> > > the one hand, length 1 RecordBatches that don't result in a stream
>> that is
>> > > computationally efficient.  On the other hand, adding artificial
>> latency by
>> > > accumulating events before "freezing" a larger batch and only then
>> making
>> > > it available to computation.
>> > >
>> > > -John
>> > >
>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney 
>> wrote:
>> > >
>> > >> hi John,
>> > >>
>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen 
>> wrote:
>> > >> >
>> > >> > During my time building financial analytics and trading systems (23
>> > >> years!), both the "batch processing" and "stream processing"
>> paradigms have
>> > >> been extensively used by myself and by colleagues.
>> > >> >
>> > >> > Unfortunately, the tools used in these paradigms have not
>> successfully
>> > >> overlapped.  For example, an analyst might use a Python notebook with
>> > >> pandas to do some batch analysis.  Then, for acceptable latency and
>> > >> throughput, a C++ programmer must implement the same schemas and
>> processing
>> > >> logic in order to analyze real-time data for real-time decision
>> support.
>> > >> (Time horizons often being sub-second or even sub-millisecond for an
>> > >> acceptable reaction to an event.  The most aggressive software-based
>> > >> systems, leaving custom hardware aside other than things like
>> kernel-bypass
>> > >> NICs, target 10s of microseconds for a full round 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread John Muehlhausen
Crikey! I'll do some testing around that and suggest some test cases to
ensure it continues to work, assuming that it does.

-John

On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney  wrote:

> Thanks for the attachment, it's helpful.
>
> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
> >
> > Attachments referred to in previous two messages:
> >
> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
> >
> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:
> >
> > > Thanks, Wes, for the thoughtful reply.  I really appreciate the
> > > engagement.  In order to clarify things a bit, I am attaching a
> graphic of
> > > how our application will take record-wise (row-oriented) data from an
> event
> > > source and incrementally populate a pre-allocated Arrow-compatible
> buffer,
> > > including for variable-length fields.  (Obviously at this stage I am
> not
> > > using the reference implementation Arrow code, although that would be a
> > > goal to contribute that back to the project.)
> > >
> > > For sake of simplicity these are non-nullable fields.  As a result a
> > > reader of "y" that has no knowledge of the "utilized" metadata would
> get a
> > > long string (zeros, spaces, uninitialized, or whatever we decide for
> the
> > > pre-allocation model) for the record just beyond the last utilized
> record.
> > >
> > > I don't see any "big O"-analysis problems with this approach.  The
> > > space/time tradeoff is that we have to guess how much room to allocate
> for
> > > variable-length fields.  We will probably almost always be wrong.  This
> > > ends up in "wasted" space.  However, we can do calculations based on
> these
> > > partially filled batches that take full advantage of the columnar
> layout.
> > >  (Here I've shown the case where we had too little variable-length
> buffer
> > > set aside, resulting in "wasted" rows.  The flip side is that rows
> achieve
> > > full [1] utilization but there is wasted variable-length buffer if we
> guess
> > > incorrectly in the other direction.)
> > >
> > > I proposed a few things that are "nice to have" but really what I'm
> eyeing
> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that
> some
> > > of the rows in a RecordBatch are not to be read, based on the new
> > > "utilized" (or whatever name) metadata.  That single tweak to the
> > > metadata-- and readers honoring it-- is the core of the proposal.
> > >  (Proposal 4.)  This would indicate that the attached example (or
> something
> > > similar) is the blessed approach for those seeking to accumulate
> events and
> > > process them while still expecting more data, with the heavier-weight
> task
> > > of creating a new pre-allocated batch being a rare occurrence.
> > >
>
> So the "length" field in RecordBatch is already the utilized number of
> rows. The body buffers can certainly have excess unused space. So your
> application can mutate Flatbuffer "length" field in-place as new
> records are filled in.
>
> > > Notice that the mutability is only in the sense of "appending."  The
> > > current doctrine of total immutability would be revised to refer to the
> > > immutability of only the already-populated rows.
> > >
> > > It gives folks an option other than choosing the lesser of two evils:
> on
> > > the one hand, length 1 RecordBatches that don't result in a stream
> that is
> > > computationally efficient.  On the other hand, adding artificial
> latency by
> > > accumulating events before "freezing" a larger batch and only then
> making
> > > it available to computation.
> > >
> > > -John
> > >
> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney 
> wrote:
> > >
> > >> hi John,
> > >>
> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen  wrote:
> > >> >
> > >> > During my time building financial analytics and trading systems (23
> > >> years!), both the "batch processing" and "stream processing"
> paradigms have
> > >> been extensively used by myself and by colleagues.
> > >> >
> > >> > Unfortunately, the tools used in these paradigms have not
> successfully
> > >> overlapped.  For example, an analyst might use a Python notebook with
> > >> pandas to do some batch analysis.  Then, for acceptable latency and
> > >> throughput, a C++ programmer must implement the same schemas and
> processing
> > >> logic in order to analyze real-time data for real-time decision
> support.
> > >> (Time horizons often being sub-second or even sub-millisecond for an
> > >> acceptable reaction to an event.  The most aggressive software-based
> > >> systems, leaving custom hardware aside other than things like
> kernel-bypass
> > >> NICs, target 10s of microseconds for a full round trip from data
> ingestion
> > >> to decision.)
> > >> >
> > >> > As a result, TCO is more than doubled.  A doubling can be accounted
> for
> > >> by two implementations that share little or nothing in the way of
> > >> architecture.  Then additional effort is required to ensure that these
> > >> 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread Wes McKinney
Thanks for the attachment, it's helpful.

On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen  wrote:
>
> Attachments referred to in previous two messages:
> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0
>
> On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:
>
> > Thanks, Wes, for the thoughtful reply.  I really appreciate the
> > engagement.  In order to clarify things a bit, I am attaching a graphic of
> > how our application will take record-wise (row-oriented) data from an event
> > source and incrementally populate a pre-allocated Arrow-compatible buffer,
> > including for variable-length fields.  (Obviously at this stage I am not
> > using the reference implementation Arrow code, although that would be a
> > goal to contribute that back to the project.)
> >
> > For sake of simplicity these are non-nullable fields.  As a result a
> > reader of "y" that has no knowledge of the "utilized" metadata would get a
> > long string (zeros, spaces, uninitialized, or whatever we decide for the
> > pre-allocation model) for the record just beyond the last utilized record.
> >
> > I don't see any "big O"-analysis problems with this approach.  The
> > space/time tradeoff is that we have to guess how much room to allocate for
> > variable-length fields.  We will probably almost always be wrong.  This
> > ends up in "wasted" space.  However, we can do calculations based on these
> > partially filled batches that take full advantage of the columnar layout.
> >  (Here I've shown the case where we had too little variable-length buffer
> > set aside, resulting in "wasted" rows.  The flip side is that rows achieve
> > full [1] utilization but there is wasted variable-length buffer if we guess
> > incorrectly in the other direction.)
> >
> > I proposed a few things that are "nice to have" but really what I'm eyeing
> > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some
> > of the rows in a RecordBatch are not to be read, based on the new
> > "utilized" (or whatever name) metadata.  That single tweak to the
> > metadata-- and readers honoring it-- is the core of the proposal.
> >  (Proposal 4.)  This would indicate that the attached example (or something
> > similar) is the blessed approach for those seeking to accumulate events and
> > process them while still expecting more data, with the heavier-weight task
> > of creating a new pre-allocated batch being a rare occurrence.
> >

So the "length" field in RecordBatch is already the utilized number of
rows. The body buffers can certainly have excess unused space. So your
application can mutate Flatbuffer "length" field in-place as new
records are filled in.

> > Notice that the mutability is only in the sense of "appending."  The
> > current doctrine of total immutability would be revised to refer to the
> > immutability of only the already-populated rows.
> >
> > It gives folks an option other than choosing the lesser of two evils: on
> > the one hand, length 1 RecordBatches that don't result in a stream that is
> > computationally efficient.  On the other hand, adding artificial latency by
> > accumulating events before "freezing" a larger batch and only then making
> > it available to computation.
> >
> > -John
> >
> > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney  wrote:
> >
> >> hi John,
> >>
> >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen  wrote:
> >> >
> >> > During my time building financial analytics and trading systems (23
> >> years!), both the "batch processing" and "stream processing" paradigms have
> >> been extensively used by myself and by colleagues.
> >> >
> >> > Unfortunately, the tools used in these paradigms have not successfully
> >> overlapped.  For example, an analyst might use a Python notebook with
> >> pandas to do some batch analysis.  Then, for acceptable latency and
> >> throughput, a C++ programmer must implement the same schemas and processing
> >> logic in order to analyze real-time data for real-time decision support.
> >> (Time horizons often being sub-second or even sub-millisecond for an
> >> acceptable reaction to an event.  The most aggressive software-based
> >> systems, leaving custom hardware aside other than things like kernel-bypass
> >> NICs, target 10s of microseconds for a full round trip from data ingestion
> >> to decision.)
> >> >
> >> > As a result, TCO is more than doubled.  A doubling can be accounted for
> >> by two implementations that share little or nothing in the way of
> >> architecture.  Then additional effort is required to ensure that these
> >> implementations continue to behave the same way and are upgraded in
> >> lock-step.
> >> >
> >> > Arrow purports to be a "bridge" technology that eases one of the pain
> >> points of working in different ecosystems by providing a common event
> >> stream data structure.  (Discussion of common processing techniques is
> >> beyond the scope of this discussion.  Suffice it to say that a streaming
> >> algo can 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread John Muehlhausen
Attachments referred to in previous two messages:
https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0

On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen  wrote:

> Thanks, Wes, for the thoughtful reply.  I really appreciate the
> engagement.  In order to clarify things a bit, I am attaching a graphic of
> how our application will take record-wise (row-oriented) data from an event
> source and incrementally populate a pre-allocated Arrow-compatible buffer,
> including for variable-length fields.  (Obviously at this stage I am not
> using the reference implementation Arrow code, although that would be a
> goal to contribute that back to the project.)
>
> For sake of simplicity these are non-nullable fields.  As a result a
> reader of "y" that has no knowledge of the "utilized" metadata would get a
> long string (zeros, spaces, uninitialized, or whatever we decide for the
> pre-allocation model) for the record just beyond the last utilized record.
>
> I don't see any "big O"-analysis problems with this approach.  The
> space/time tradeoff is that we have to guess how much room to allocate for
> variable-length fields.  We will probably almost always be wrong.  This
> ends up in "wasted" space.  However, we can do calculations based on these
> partially filled batches that take full advantage of the columnar layout.
>  (Here I've shown the case where we had too little variable-length buffer
> set aside, resulting in "wasted" rows.  The flip side is that rows achieve
> full [1] utilization but there is wasted variable-length buffer if we guess
> incorrectly in the other direction.)
>
> I proposed a few things that are "nice to have" but really what I'm eyeing
> is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some
> of the rows in a RecordBatch are not to be read, based on the new
> "utilized" (or whatever name) metadata.  That single tweak to the
> metadata-- and readers honoring it-- is the core of the proposal.
>  (Proposal 4.)  This would indicate that the attached example (or something
> similar) is the blessed approach for those seeking to accumulate events and
> process them while still expecting more data, with the heavier-weight task
> of creating a new pre-allocated batch being a rare occurrence.
>
> Notice that the mutability is only in the sense of "appending."  The
> current doctrine of total immutability would be revised to refer to the
> immutability of only the already-populated rows.
>
> It gives folks an option other than choosing the lesser of two evils: on
> the one hand, length 1 RecordBatches that don't result in a stream that is
> computationally efficient.  On the other hand, adding artificial latency by
> accumulating events before "freezing" a larger batch and only then making
> it available to computation.
>
> -John
>
> On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney  wrote:
>
>> hi John,
>>
>> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen  wrote:
>> >
>> > During my time building financial analytics and trading systems (23
>> years!), both the "batch processing" and "stream processing" paradigms have
>> been extensively used by myself and by colleagues.
>> >
>> > Unfortunately, the tools used in these paradigms have not successfully
>> overlapped.  For example, an analyst might use a Python notebook with
>> pandas to do some batch analysis.  Then, for acceptable latency and
>> throughput, a C++ programmer must implement the same schemas and processing
>> logic in order to analyze real-time data for real-time decision support.
>> (Time horizons often being sub-second or even sub-millisecond for an
>> acceptable reaction to an event.  The most aggressive software-based
>> systems, leaving custom hardware aside other than things like kernel-bypass
>> NICs, target 10s of microseconds for a full round trip from data ingestion
>> to decision.)
>> >
>> > As a result, TCO is more than doubled.  A doubling can be accounted for
>> by two implementations that share little or nothing in the way of
>> architecture.  Then additional effort is required to ensure that these
>> implementations continue to behave the same way and are upgraded in
>> lock-step.
>> >
>> > Arrow purports to be a "bridge" technology that eases one of the pain
>> points of working in different ecosystems by providing a common event
>> stream data structure.  (Discussion of common processing techniques is
>> beyond the scope of this discussion.  Suffice it to say that a streaming
>> algo can always be run in batch, but not vice versa.)
>> >
>> > Arrow seems to be growing up primarily in the batch processing world.
>> One publication notes that "the missing piece is streaming, where the
>> velocity of incoming data poses a special challenge. There are some early
>> experiments to populate Arrow nodes in microbatches..." [1]  Part our our
>> discussion could be a response to this observation.  In what ways is it
>> true or false?  What are the plans to remedy this shortcoming, if 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread John Muehlhausen
Thanks, Wes, for the thoughtful reply.  I really appreciate the
engagement.  In order to clarify things a bit, I am attaching a graphic of
how our application will take record-wise (row-oriented) data from an event
source and incrementally populate a pre-allocated Arrow-compatible buffer,
including for variable-length fields.  (Obviously at this stage I am not
using the reference implementation Arrow code, although that would be a
goal to contribute that back to the project.)

For sake of simplicity these are non-nullable fields.  As a result a reader
of "y" that has no knowledge of the "utilized" metadata would get a long
string (zeros, spaces, uninitialized, or whatever we decide for the
pre-allocation model) for the record just beyond the last utilized record.

I don't see any "big O"-analysis problems with this approach.  The
space/time tradeoff is that we have to guess how much room to allocate for
variable-length fields.  We will probably almost always be wrong.  This
ends up in "wasted" space.  However, we can do calculations based on these
partially filled batches that take full advantage of the columnar layout.
 (Here I've shown the case where we had too little variable-length buffer
set aside, resulting in "wasted" rows.  The flip side is that rows achieve
full [1] utilization but there is wasted variable-length buffer if we guess
incorrectly in the other direction.)

I proposed a few things that are "nice to have" but really what I'm eyeing
is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some
of the rows in a RecordBatch are not to be read, based on the new
"utilized" (or whatever name) metadata.  That single tweak to the
metadata-- and readers honoring it-- is the core of the proposal.
 (Proposal 4.)  This would indicate that the attached example (or something
similar) is the blessed approach for those seeking to accumulate events and
process them while still expecting more data, with the heavier-weight task
of creating a new pre-allocated batch being a rare occurrence.

Notice that the mutability is only in the sense of "appending."  The
current doctrine of total immutability would be revised to refer to the
immutability of only the already-populated rows.

It gives folks an option other than choosing the lesser of two evils: on
the one hand, length 1 RecordBatches that don't result in a stream that is
computationally efficient.  On the other hand, adding artificial latency by
accumulating events before "freezing" a larger batch and only then making
it available to computation.

-John

On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney  wrote:

> hi John,
>
> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen  wrote:
> >
> > During my time building financial analytics and trading systems (23
> years!), both the "batch processing" and "stream processing" paradigms have
> been extensively used by myself and by colleagues.
> >
> > Unfortunately, the tools used in these paradigms have not successfully
> overlapped.  For example, an analyst might use a Python notebook with
> pandas to do some batch analysis.  Then, for acceptable latency and
> throughput, a C++ programmer must implement the same schemas and processing
> logic in order to analyze real-time data for real-time decision support.
> (Time horizons often being sub-second or even sub-millisecond for an
> acceptable reaction to an event.  The most aggressive software-based
> systems, leaving custom hardware aside other than things like kernel-bypass
> NICs, target 10s of microseconds for a full round trip from data ingestion
> to decision.)
> >
> > As a result, TCO is more than doubled.  A doubling can be accounted for
> by two implementations that share little or nothing in the way of
> architecture.  Then additional effort is required to ensure that these
> implementations continue to behave the same way and are upgraded in
> lock-step.
> >
> > Arrow purports to be a "bridge" technology that eases one of the pain
> points of working in different ecosystems by providing a common event
> stream data structure.  (Discussion of common processing techniques is
> beyond the scope of this discussion.  Suffice it to say that a streaming
> algo can always be run in batch, but not vice versa.)
> >
> > Arrow seems to be growing up primarily in the batch processing world.
> One publication notes that "the missing piece is streaming, where the
> velocity of incoming data poses a special challenge. There are some early
> experiments to populate Arrow nodes in microbatches..." [1]  Part our our
> discussion could be a response to this observation.  In what ways is it
> true or false?  What are the plans to remedy this shortcoming, if it
> exists?  What steps can be taken now to ease the transition to low-latency
> streaming support in the future?
> >
>
> Arrow columnar format describes a collection of records with values
> between records being placed adjacent to each other in memory. If you
> break that assumption, you don't have a 

Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread Wes McKinney
hi John,

On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen  wrote:
>
> During my time building financial analytics and trading systems (23 years!), 
> both the "batch processing" and "stream processing" paradigms have been 
> extensively used by myself and by colleagues.
>
> Unfortunately, the tools used in these paradigms have not successfully 
> overlapped.  For example, an analyst might use a Python notebook with pandas 
> to do some batch analysis.  Then, for acceptable latency and throughput, a 
> C++ programmer must implement the same schemas and processing logic in order 
> to analyze real-time data for real-time decision support.  (Time horizons 
> often being sub-second or even sub-millisecond for an acceptable reaction to 
> an event.  The most aggressive software-based systems, leaving custom 
> hardware aside other than things like kernel-bypass NICs, target 10s of 
> microseconds for a full round trip from data ingestion to decision.)
>
> As a result, TCO is more than doubled.  A doubling can be accounted for by 
> two implementations that share little or nothing in the way of architecture.  
> Then additional effort is required to ensure that these implementations 
> continue to behave the same way and are upgraded in lock-step.
>
> Arrow purports to be a "bridge" technology that eases one of the pain points 
> of working in different ecosystems by providing a common event stream data 
> structure.  (Discussion of common processing techniques is beyond the scope 
> of this discussion.  Suffice it to say that a streaming algo can always be 
> run in batch, but not vice versa.)
>
> Arrow seems to be growing up primarily in the batch processing world.  One 
> publication notes that "the missing piece is streaming, where the velocity of 
> incoming data poses a special challenge. There are some early experiments to 
> populate Arrow nodes in microbatches..." [1]  Part our our discussion could 
> be a response to this observation.  In what ways is it true or false?  What 
> are the plans to remedy this shortcoming, if it exists?  What steps can be 
> taken now to ease the transition to low-latency streaming support in the 
> future?
>

Arrow columnar format describes a collection of records with values
between records being placed adjacent to each other in memory. If you
break that assumption, you don't have a columnar format anymore. So I
don't where the "shortcoming" is. We don't have any software in the
project for managing the creation of record batches in a streaming
application, but this seems like an interesting development expansion
area for the project.

Note that many contributors have already expanded the surface area of
what's in the Arrow libraries in many directions.

Streaming data collection is yet another area of expansion, but
_personally_ it is not on the short list of projects that I will
personally be working on (or asking my direct or indirect colleagues
to work on). Since this is a project made up of volunteers, it's up to
contributors to drive new directions for the project by writing design
documents and pull requests.

> In my own experience, a successful strategy for stream processing where 
> context (i.e. recent past events) must be considered by calculations is to 
> pre-allocate memory for event collection, to organize this memory in a 
> columnar layout, and to run incremental calculations at each event ingress 
> into the partially populated memory.  [Fig 1]  When the pre-allocated memory 
> has been exhausted, allocate a new batch of column-wise memory and continue.  
> When a batch is no longer pertinent to the calculation look-back window, free 
> the memory back to the heap or pool.
>
> Here we run into the first philosophical barrier with Arrow, where "Arrow 
> data is immutable." [2]  There is currently little or no consideration for 
> reading a partially constructed RecordBatch, e.g. one with only some of the 
> rows containing event data at the present moment in time.
>

It seems like the use case you have heavily revolves around mutating
pre-allocated, memory-mapped datasets that are being consumed by other
processes on the same host. So you want to incrementally fill some
memory-mapped data that you've already exposed to another process.

Because of the memory layout for variable-size and nested cells, it is
impossible in general to mutate Arrow record batches. This is not a
philosophical position: this was a deliberate technical decision to
guarantee data locality for scans and predictable O(1) random access
on variable-length and nested data.

Technically speaking, you can mutate memory in-place for fixed-size
types in-RAM or on-disk, if you want to. It's an "off-label" use case
but no one is saying you can't do this.

> Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated 
> records of a RecordBatch instead of to all records?
>

Per above, this is impossible in generality. You can't alter
variable-length or nested records without 

[Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?

2019-07-02 Thread John Muehlhausen
During my time building financial analytics and trading systems (23
years!), both the "batch processing" and "stream processing" paradigms have
been extensively used by myself and by colleagues.

Unfortunately, the tools used in these paradigms have not successfully
overlapped.  For example, an analyst might use a Python notebook with
pandas to do some batch analysis.  Then, for acceptable latency and
throughput, a C++ programmer must implement the same schemas and processing
logic in order to analyze real-time data for real-time decision support.
 (Time horizons often being sub-second or even sub-millisecond for an
acceptable reaction to an event.  The most aggressive software-based
systems, leaving custom hardware aside other than things like kernel-bypass
NICs, target 10s of microseconds for a full round trip from data ingestion
to decision.)

As a result, TCO is more than doubled.  A doubling can be accounted for by
two implementations that share little or nothing in the way of
architecture.  Then additional effort is required to ensure that these
implementations continue to behave the same way and are upgraded in
lock-step.

Arrow purports to be a "bridge" technology that eases one of the pain
points of working in different ecosystems by providing a common event
stream data structure.  (Discussion of common processing techniques is
beyond the scope of this discussion.  Suffice it to say that a streaming
algo can always be run in batch, but not vice versa.)

Arrow seems to be growing up primarily in the batch processing world.  One
publication notes that "the missing piece is streaming, where the velocity
of incoming data poses a special challenge. There are some early
experiments to populate Arrow nodes in microbatches..." [1]  Part our our
discussion could be a response to this observation.  In what ways is it
true or false?  What are the plans to remedy this shortcoming, if it
exists?  What steps can be taken now to ease the transition to low-latency
streaming support in the future?

In my own experience, a successful strategy for stream processing where
context (i.e. recent past events) must be considered by calculations is to
pre-allocate memory for event collection, to organize this memory in a
columnar layout, and to run incremental calculations at each event ingress
into the partially populated memory.  [Fig 1]  When the pre-allocated
memory has been exhausted, allocate a new batch of column-wise memory and
continue.  When a batch is no longer pertinent to the calculation look-back
window, free the memory back to the heap or pool.

Here we run into the first philosophical barrier with Arrow, where "Arrow
data is immutable." [2]  There is currently little or no consideration for
reading a partially constructed RecordBatch, e.g. one with only some of the
rows containing event data at the present moment in time.

Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated
records of a RecordBatch instead of to all records?

As an alternative approach, RecordBatch can be used as a single Record
(batch length of one).  [Fig 2]  In this approach the benefit of the
columnar layout is lost for look-back window processing.

Another alternative approach is to collect an entire RecordBatch before
stepping through it with the stream processing calculation. [Fig 3]  With
this approach some columnar processing benefit can be recovered, however
artificial latency is introduced.  As tolerance for delays in decision
support dwindles, this model will be of increasingly limited value.  It is
already unworkable in many areas of finance.

When considering the Arrow format and variable length values such as
strings, the pre-allocation approach (and subsequent processing of a
partially populated batch) encounters a hiccup.  How do we know the amount
of buffer space to pre-allocate?  If we allocate too much buffer for
variable-length data, some of it will be unused.  If we allocate too little
buffer for variable-length data, some row entities will be unusable.
 (Additional "rows" remain but when populating string fields there is no
longer string storage space to point them to.)

As with many optimization space/time tradeoff problems, the solution seems
to be to guess.  Pre-allocation sets aside variable length buffer storage
based on the typical "expected size" of the variable length data.  This can
result in some unused rows, as discussed above.  [Fig 4]  In fact it will
necessarily result in one unused row unless the last of each variable
length field in the last row exactly fits into the remaining space in the
variable length data buffer.  Consider the case where there is more
variable length buffer space than data:

Given variable-length field x, last row index of y, variable length buffer
v, beginning offset into v of o:
x[y] begins at o
x[y] ends at the offset of the next record, there is no next record, so
x[y] ends after the total remaining area in variable length buffer...
however, this