Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Hi John, > Just concerned about running out of time if we are freezing the format at > 1.0. Is there any rough guess as to how much time I have to make my case? We haven't discussed it formally, but we have been doing releases ~3 months, so i would expect 1.0.0 sometime in January. On Fri, Oct 18, 2019 at 10:18 AM John Muehlhausen wrote: > Perhaps what I should do is study the batch creation process in the > reference implementation and see whether an alternative approach can be a > lot more efficient in time while being less efficient in space. > > And if so, whether this new approach also requires a differentiation > between batch length and underlying array/buffer length, even while the > immutability doctrine of a batch remains unchanged. (I.e. remove "mutating > record batch headers on the fly" from the discussion as potentially > unnecessary to make the point.) > > Just concerned about running out of time if we are freezing the format at > 1.0. Is there any rough guess as to how much time I have to make my case? > > -John > > On Fri, Oct 18, 2019 at 1:55 AM Micah Kornfield > wrote: > >> On the specification: I'm -.5 on saying array lengths can be different >> then row batch length (especially if both are valid lengths). I can see >> some wiggle room the the current language [1][2] that might allow for >> modifying this, so I think we should update it one way or another however >> this conversation turns out. >> >> On the implementation: I wouldn't want to see code that is mutating the >> record batch headers on the fly in the current code base. It is dangerous >> to break immutability assumptions across which standard implementations >> might be relying on. IMO, for such a specialized use-case it makes sense to >> have out-of-band communication (perhaps in a private implementation that >> does modify headers) and then "touch" up the metadata for later analysis, >> so it conforms to the specification (and standard libraries can be used). >> >> [1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49 >> [2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34 >> >> On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen wrote: >> >>> Micah, thanks very much for your input. A few thoughts in response: >>> >>> ``Over the time horizon of desired latency if you aren't receiving >>> enough messages to take advantage of columnar analytics, a system probably >>> has enough time to compact batches after the fact for later analysis and >>> conversely if you are receiving many events you naturally get reasonable >>> batch sizes without having to do further work.'' >>> >>> To see my perspective it is necessary to stop thinking about business >>> analytics and latencies of minutes or seconds or significant fractions of >>> seconds. In real-time financial trading systems we do things like kernel >>> bypass networking, core affinitization and spinning, fake workloads to keep >>> hot paths cached, almost complete avoidance of general purpose allocators >>> (preferring typed pools), all with the goal of shaving off one more >>> MICROsecond. >>> >>> Therefore "probably has enough time" is never in our vocabulary. >>> Microbatching is not a thing. It is either purely event driven or bust if >>> you are shooting for a dozen microseconds from packet-in to packet-out on >>> the wire. This is also the reason that certain high-performance systems >>> will never use the reference implementations of Arrow, just as most other >>> financial tech standards are centered on the *protocol* and there end up >>> being dozens of implementations that compete on performance. >>> >>> Arrow *as a protocol* has the potential to be a beautiful thing. It is >>> simple and straightforward and brings needed standardization. I would >>> encourage thinking of the reference implementations almost as second-class >>> citizens. The reference implementations should not de-facto define the >>> protocol, but vice versa. (I know this is probably already the philosophy.) >>> >>> Of course data eventually pops out of high performance systems and has a >>> second life in the hands of researchers, etc. This is why I'd like the >>> "vanilla" Arrow to be able to deal with Arrow data as-constructed in the >>> higher performance systems. Could there be special processes to refactor >>> all this data? Of course, but why? If I'm logging RecordBatches to disk >>> and some of them happen to have extra array elements (e.g. because of the >>> inevitable imperfect projection pre-allocation-- running out of variable >>> length string storage before running out of rows), why would I refactor >>> terabytes of data (and, more importantly, introduce a time window where >>> data is not available while this occurs) when I can just have pyarrow skip >>> the unused rows? If I want compression/de-duplication I'll do it at the >>> storage media layer. >>> >>> ``the proposal is essentially changing the unit of exchange from >>>
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Perhaps what I should do is study the batch creation process in the reference implementation and see whether an alternative approach can be a lot more efficient in time while being less efficient in space. And if so, whether this new approach also requires a differentiation between batch length and underlying array/buffer length, even while the immutability doctrine of a batch remains unchanged. (I.e. remove "mutating record batch headers on the fly" from the discussion as potentially unnecessary to make the point.) Just concerned about running out of time if we are freezing the format at 1.0. Is there any rough guess as to how much time I have to make my case? -John On Fri, Oct 18, 2019 at 1:55 AM Micah Kornfield wrote: > On the specification: I'm -.5 on saying array lengths can be different > then row batch length (especially if both are valid lengths). I can see > some wiggle room the the current language [1][2] that might allow for > modifying this, so I think we should update it one way or another however > this conversation turns out. > > On the implementation: I wouldn't want to see code that is mutating the > record batch headers on the fly in the current code base. It is dangerous > to break immutability assumptions across which standard implementations > might be relying on. IMO, for such a specialized use-case it makes sense to > have out-of-band communication (perhaps in a private implementation that > does modify headers) and then "touch" up the metadata for later analysis, > so it conforms to the specification (and standard libraries can be used). > > [1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49 > [2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34 > > On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen wrote: > >> Micah, thanks very much for your input. A few thoughts in response: >> >> ``Over the time horizon of desired latency if you aren't receiving enough >> messages to take advantage of columnar analytics, a system probably has >> enough time to compact batches after the fact for later analysis and >> conversely if you are receiving many events you naturally get reasonable >> batch sizes without having to do further work.'' >> >> To see my perspective it is necessary to stop thinking about business >> analytics and latencies of minutes or seconds or significant fractions of >> seconds. In real-time financial trading systems we do things like kernel >> bypass networking, core affinitization and spinning, fake workloads to keep >> hot paths cached, almost complete avoidance of general purpose allocators >> (preferring typed pools), all with the goal of shaving off one more >> MICROsecond. >> >> Therefore "probably has enough time" is never in our vocabulary. >> Microbatching is not a thing. It is either purely event driven or bust if >> you are shooting for a dozen microseconds from packet-in to packet-out on >> the wire. This is also the reason that certain high-performance systems >> will never use the reference implementations of Arrow, just as most other >> financial tech standards are centered on the *protocol* and there end up >> being dozens of implementations that compete on performance. >> >> Arrow *as a protocol* has the potential to be a beautiful thing. It is >> simple and straightforward and brings needed standardization. I would >> encourage thinking of the reference implementations almost as second-class >> citizens. The reference implementations should not de-facto define the >> protocol, but vice versa. (I know this is probably already the philosophy.) >> >> Of course data eventually pops out of high performance systems and has a >> second life in the hands of researchers, etc. This is why I'd like the >> "vanilla" Arrow to be able to deal with Arrow data as-constructed in the >> higher performance systems. Could there be special processes to refactor >> all this data? Of course, but why? If I'm logging RecordBatches to disk >> and some of them happen to have extra array elements (e.g. because of the >> inevitable imperfect projection pre-allocation-- running out of variable >> length string storage before running out of rows), why would I refactor >> terabytes of data (and, more importantly, introduce a time window where >> data is not available while this occurs) when I can just have pyarrow skip >> the unused rows? If I want compression/de-duplication I'll do it at the >> storage media layer. >> >> ``the proposal is essentially changing the unit of exchange from >> RecordBatches to a segment of a RecordBatch'' >> >> It is the recognition that a RecordBatch may need to be computationally >> useful before it is completely received/constructed, and without adding >> additional overhead to the reception/construction process. I grab a canned >> RecordBatch from a pool of them and start filling it in. If an earlier >> RecordBatch falls out of the computation window, I put it back in the >> pool. At any
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
On the specification: I'm -.5 on saying array lengths can be different then row batch length (especially if both are valid lengths). I can see some wiggle room the the current language [1][2] that might allow for modifying this, so I think we should update it one way or another however this conversation turns out. On the implementation: I wouldn't want to see code that is mutating the record batch headers on the fly in the current code base. It is dangerous to break immutability assumptions across which standard implementations might be relying on. IMO, for such a specialized use-case it makes sense to have out-of-band communication (perhaps in a private implementation that does modify headers) and then "touch" up the metadata for later analysis, so it conforms to the specification (and standard libraries can be used). [1] https://github.com/apache/arrow/blob/master/format/Message.fbs#L49 [2] https://github.com/apache/arrow/blob/master/format/Message.fbs#L34 On Thu, Oct 17, 2019 at 7:17 AM John Muehlhausen wrote: > Micah, thanks very much for your input. A few thoughts in response: > > ``Over the time horizon of desired latency if you aren't receiving enough > messages to take advantage of columnar analytics, a system probably has > enough time to compact batches after the fact for later analysis and > conversely if you are receiving many events you naturally get reasonable > batch sizes without having to do further work.'' > > To see my perspective it is necessary to stop thinking about business > analytics and latencies of minutes or seconds or significant fractions of > seconds. In real-time financial trading systems we do things like kernel > bypass networking, core affinitization and spinning, fake workloads to keep > hot paths cached, almost complete avoidance of general purpose allocators > (preferring typed pools), all with the goal of shaving off one more > MICROsecond. > > Therefore "probably has enough time" is never in our vocabulary. > Microbatching is not a thing. It is either purely event driven or bust if > you are shooting for a dozen microseconds from packet-in to packet-out on > the wire. This is also the reason that certain high-performance systems > will never use the reference implementations of Arrow, just as most other > financial tech standards are centered on the *protocol* and there end up > being dozens of implementations that compete on performance. > > Arrow *as a protocol* has the potential to be a beautiful thing. It is > simple and straightforward and brings needed standardization. I would > encourage thinking of the reference implementations almost as second-class > citizens. The reference implementations should not de-facto define the > protocol, but vice versa. (I know this is probably already the philosophy.) > > Of course data eventually pops out of high performance systems and has a > second life in the hands of researchers, etc. This is why I'd like the > "vanilla" Arrow to be able to deal with Arrow data as-constructed in the > higher performance systems. Could there be special processes to refactor > all this data? Of course, but why? If I'm logging RecordBatches to disk > and some of them happen to have extra array elements (e.g. because of the > inevitable imperfect projection pre-allocation-- running out of variable > length string storage before running out of rows), why would I refactor > terabytes of data (and, more importantly, introduce a time window where > data is not available while this occurs) when I can just have pyarrow skip > the unused rows? If I want compression/de-duplication I'll do it at the > storage media layer. > > ``the proposal is essentially changing the unit of exchange from > RecordBatches to a segment of a RecordBatch'' > > It is the recognition that a RecordBatch may need to be computationally > useful before it is completely received/constructed, and without adding > additional overhead to the reception/construction process. I grab a canned > RecordBatch from a pool of them and start filling it in. If an earlier > RecordBatch falls out of the computation window, I put it back in the > pool. At any moment of time the batch advertises the truth: > RecordBatch.length is the immutable section of data at this moment in > time. That section will not have changed in future moments. It is also a > recognition that some RecordBatches so-constructed are not completely > fillable. They are like a moving truck with a little empty space left. > Sure, we could custom-build trucks to exactly fit the cargo, but why? It > takes too much time. Grab a truck (or a RecordBatch) off the lot (pool) > and go... when you've filled it as much as possible, but not perfectly, > grab another one that is exactly the same. > > I think my perception of the situation is clearest if we think about > "frozen" or "sealed" RecordBatches that, during their construction stage, > use a one-size-fits-all set of arrays and variable length buffer
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Micah, thanks very much for your input. A few thoughts in response: ``Over the time horizon of desired latency if you aren't receiving enough messages to take advantage of columnar analytics, a system probably has enough time to compact batches after the fact for later analysis and conversely if you are receiving many events you naturally get reasonable batch sizes without having to do further work.'' To see my perspective it is necessary to stop thinking about business analytics and latencies of minutes or seconds or significant fractions of seconds. In real-time financial trading systems we do things like kernel bypass networking, core affinitization and spinning, fake workloads to keep hot paths cached, almost complete avoidance of general purpose allocators (preferring typed pools), all with the goal of shaving off one more MICROsecond. Therefore "probably has enough time" is never in our vocabulary. Microbatching is not a thing. It is either purely event driven or bust if you are shooting for a dozen microseconds from packet-in to packet-out on the wire. This is also the reason that certain high-performance systems will never use the reference implementations of Arrow, just as most other financial tech standards are centered on the *protocol* and there end up being dozens of implementations that compete on performance. Arrow *as a protocol* has the potential to be a beautiful thing. It is simple and straightforward and brings needed standardization. I would encourage thinking of the reference implementations almost as second-class citizens. The reference implementations should not de-facto define the protocol, but vice versa. (I know this is probably already the philosophy.) Of course data eventually pops out of high performance systems and has a second life in the hands of researchers, etc. This is why I'd like the "vanilla" Arrow to be able to deal with Arrow data as-constructed in the higher performance systems. Could there be special processes to refactor all this data? Of course, but why? If I'm logging RecordBatches to disk and some of them happen to have extra array elements (e.g. because of the inevitable imperfect projection pre-allocation-- running out of variable length string storage before running out of rows), why would I refactor terabytes of data (and, more importantly, introduce a time window where data is not available while this occurs) when I can just have pyarrow skip the unused rows? If I want compression/de-duplication I'll do it at the storage media layer. ``the proposal is essentially changing the unit of exchange from RecordBatches to a segment of a RecordBatch'' It is the recognition that a RecordBatch may need to be computationally useful before it is completely received/constructed, and without adding additional overhead to the reception/construction process. I grab a canned RecordBatch from a pool of them and start filling it in. If an earlier RecordBatch falls out of the computation window, I put it back in the pool. At any moment of time the batch advertises the truth: RecordBatch.length is the immutable section of data at this moment in time. That section will not have changed in future moments. It is also a recognition that some RecordBatches so-constructed are not completely fillable. They are like a moving truck with a little empty space left. Sure, we could custom-build trucks to exactly fit the cargo, but why? It takes too much time. Grab a truck (or a RecordBatch) off the lot (pool) and go... when you've filled it as much as possible, but not perfectly, grab another one that is exactly the same. I think my perception of the situation is clearest if we think about "frozen" or "sealed" RecordBatches that, during their construction stage, use a one-size-fits-all set of arrays and variable length buffer storage. I grab a RecordBatch off the lot that is "for two int columns and a variable length string column where the average expected length of a string is 10." I fill it, then I'm done. If my strings were slightly longer than expected I have extra array elements and RecordBatch.length is less than array length. While it is true that I have other use-cases in mind regarding simultaneous collection and computation, I'm hoping that the moving truck analogy by itself demonstrates enough efficiency advantages (as compared to typical batch construction) to warrant this change. Put simply, it is Arrow accommodating more space/time tradeoff options than it currently does. When I was baking plasma into a system for the first time, I ran into the example that I create a record batch to a mock stream in order to know how much plasma memory to allocate. That is the kind of "build the moving truck to fit the cargo" that I just can't have! -John On Wed, Oct 16, 2019 at 10:15 PM Micah Kornfield wrote: > Hi John and Wes, > > A few thoughts: > One of the issues which we didn't get into in prior discussions, is the > proposal is essentially
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Hi John and Wes, A few thoughts: One of the issues which we didn't get into in prior discussions, is the proposal is essentially changing the unit of exchange from RecordBatches to a segment of a RecordBatch. I think I brought this up earlier in discussions, an interesting idea that Trill [1], a columnar streaming engine, illustrates. Over the time horizon of desired latency if you aren't receiving enough messages to take advantage of columnar analytics, a system probably has enough time to compact batches after the fact for later analysis and conversely if you are receiving many events you naturally get reasonable batch sizes without having to do further work. > I'm objecting to RecordBatch.length being inconsistent with the > constituent field lengths, that's where the danger lies. If all of the > lengths are consistent, no code changes are necessary. John, is it a viable solution to keep all length in sync for the use case you are imagining? A solution I like less, but might be viable: formally specify a negative constant that signifies length should be inherited from RowBatch length (this could only be used on top level fields). I contend that it can only be useful and will never be harmful. What are > the counter-examples of concrete harm? I'm not sure there is anything obviously wrong, however changes to semantics are always dangerous. One blemish on the current proposal is one can't determine easily if a mismatch in row-length is a programming error or intentional. [1] https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf On Wed, Oct 16, 2019 at 4:41 PM John Muehlhausen wrote: > "that's where the danger lies" > > What danger? I have no idea what the specific danger is, assuming that all > reference implementations have test cases that hedge around this. > > I contend that it can only be useful and will never be harmful. What are > the counter-examples of concrete harm? >
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
"that's where the danger lies" What danger? I have no idea what the specific danger is, assuming that all reference implementations have test cases that hedge around this. I contend that it can only be useful and will never be harmful. What are the counter-examples of concrete harm?
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
On Wed, Oct 16, 2019 at 12:32 PM John Muehlhausen wrote: > > I really need to "get into the zone" on some other development today, but I > want to remind us of something earlier in the thread that gave me the > impression I wasn't stomping on too many paradigms with this proposal: > > Wes: ``So the "length" field in RecordBatch is already the utilized number > of rows. The body buffers can certainly have excess unused space. So > your application can mutate Flatbuffer "length" field in-place as > new records are filled in.'' I'm objecting to RecordBatch.length being inconsistent with the constituent field lengths, that's where the danger lies. If all of the lengths are consistent, no code changes are necessary. > If RecordBatch.length is the utilized number of rows then my PR makes this > actually true. Yes, we need it in a handful of implementations. I'm > willing to provide all of them. To me that is the lowest complexity > solution. > > -John > > On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney wrote: > > > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen wrote: > > > > > > "pyarrow is intended as a developer-facing library, not a user-facing > > one" > > > > > > Is that really the core issue? I doubt you would want to add this > > proposed > > > logic to pandas even though it is user-facing, because then pandas will > > > either have to re-implement what it means to read a batch (to respect > > > length when it is smaller than array length) or else rely on the single > > > blessed custom metadata for doing this, which doesn't make it custom > > > anymore. > > > > What you have proposed in your PR amounts to an alteration of the IPC > > format to suit this use case. This pushes complexity onto _every_ > > implementation that will need to worry about a "truncated" record > > batch. I'd rather avoid this unless it is truly the only way. > > > > Note that we serialize a significant amount of custom metadata already > > to address pandas-specific issues, and have not had to make any > > changes to the columnar format as a result. > > > > > I think really your concern is that perhaps nobody wants this but me, > > > therefore it should not be in arrow or pandas regardless of whether it is > > > user-facing? But, if that is your thinking, is it true? What is our > > > solution to the locality/latency problem for systems that ingest and > > > process concurrently, if not this solution? I do see it as a general > > > problem that needs at least the beginnings of a general solution... not a > > > "custom" one. > > > > We use the custom_metadata fields to implement a number of built-in > > things in the project, such as extension types. If enough people find > > this useful, then it can be promoted to a formalized concept. As far > > as I can tell, you have developed quite a bit of custom code related > > to this for your application, including manipulating Flatbuffers > > metadata in place to maintain the populated length, so the barrier to > > entry to being able to properly take advantage of this is rather high. > > > > > Also, I wonder whether it is true that pyarrow avoids smart/magical > > > things. The entire concept of a "Table" seems to be in that category? > > The > > > docs specifically mention that it is for convenience. > > > > > > > Table arose out of legitimate developer need. There are a number of > > areas of the project that would be much more difficult if we had to > > worry about regularizing column chunking at any call site that returns > > an in-memory dataset. > > > > > I'd like to focus on two questions: > > > 1- What is the Arrow general solution to the locality/latency tradeoff > > > problem for systems that ingest and process data concurrently? This > > > proposed solution or something else? Or if we propose not to address the > > > problem, why? > > > 2- What will the proposed change negatively impact? It seems that all we > > > are talking about is respecting batch length if arrays happen to be > > longer. > > > > I'm suggesting to help you solve the post-read truncation problem > > without modifying the IPC protocol. If you want to make things work > > for the users without knowledge, I think this can be achieved through > > a plug-in API to define a metadata handler-callback to apply the > > truncation to the record batches. > > > > > Thanks, > > > -John > > > > > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney > > wrote: > > > > > > > hi John, > > > > > > > > > As a practical matter, the reason metadata is not a good solution > > for me > > > > is that it requires awareness on the part of the reader. I want > > (e.g.) a > > > > researcher in Python to be able to map a file of batches in IPC format > > > > without needing to worry about the fact that the file was built in a > > > > streaming fashion and therefore has some unused array elements. > > > > > > > > I don't find this argument to be persuasive. > > > > > > > > pyarrow is intended as a developer-facing
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
hi John, On Wed, Oct 16, 2019 at 11:59 AM John Muehlhausen wrote: > > I'm in Python, I'm a user, and I'm not allowed to import pyarrow because it > isn't for me. I think you're misrepresenting what I'm saying. It's our expectations that users will largely consume pyarrow indirectly as a dependency rather than using it directly. Not every piece of software needs to be designed around the needs of end users. > > There exists some Arrow record batches in plasma. I need to get one slice > of one batch as a pandas dataframe. > > What do I do? > > There exists some Arrow record batches in a file. I need to get one slice > of one batch as a pandas dataframe. > > What do I do? > > Are you contemplating that all of the above is possible using only > pandas APIs? > > Does "one slice of one batch" go away once pandas (version 2) does not > require conversion, since it will be zero copy and the user can slice in > pandas with no performance hit? > > I'm really stumbling over this idea that users can't import pyarrow. I'm > not sure it makes sense to continue to discuss user-level IPC (plasma, > files, etc) until I can come to grips with how users use pyarrow without > importing it. I'm not saying that. I'm suggesting that you should provide your users with convenient functions that handle the low-level details for them as intended automatically. > > Once I see how it works without my proposed change, we can go back to how > the user ignores the empty/undefined array portions without knowing whether > they exist. > > -John > > On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney wrote: > > > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen wrote: > > > > > > "pyarrow is intended as a developer-facing library, not a user-facing > > one" > > > > > > Is that really the core issue? I doubt you would want to add this > > proposed > > > logic to pandas even though it is user-facing, because then pandas will > > > either have to re-implement what it means to read a batch (to respect > > > length when it is smaller than array length) or else rely on the single > > > blessed custom metadata for doing this, which doesn't make it custom > > > anymore. > > > > What you have proposed in your PR amounts to an alteration of the IPC > > format to suit this use case. This pushes complexity onto _every_ > > implementation that will need to worry about a "truncated" record > > batch. I'd rather avoid this unless it is truly the only way. > > > > Note that we serialize a significant amount of custom metadata already > > to address pandas-specific issues, and have not had to make any > > changes to the columnar format as a result. > > > > > I think really your concern is that perhaps nobody wants this but me, > > > therefore it should not be in arrow or pandas regardless of whether it is > > > user-facing? But, if that is your thinking, is it true? What is our > > > solution to the locality/latency problem for systems that ingest and > > > process concurrently, if not this solution? I do see it as a general > > > problem that needs at least the beginnings of a general solution... not a > > > "custom" one. > > > > We use the custom_metadata fields to implement a number of built-in > > things in the project, such as extension types. If enough people find > > this useful, then it can be promoted to a formalized concept. As far > > as I can tell, you have developed quite a bit of custom code related > > to this for your application, including manipulating Flatbuffers > > metadata in place to maintain the populated length, so the barrier to > > entry to being able to properly take advantage of this is rather high. > > > > > Also, I wonder whether it is true that pyarrow avoids smart/magical > > > things. The entire concept of a "Table" seems to be in that category? > > The > > > docs specifically mention that it is for convenience. > > > > > > > Table arose out of legitimate developer need. There are a number of > > areas of the project that would be much more difficult if we had to > > worry about regularizing column chunking at any call site that returns > > an in-memory dataset. > > > > > I'd like to focus on two questions: > > > 1- What is the Arrow general solution to the locality/latency tradeoff > > > problem for systems that ingest and process data concurrently? This > > > proposed solution or something else? Or if we propose not to address the > > > problem, why? > > > 2- What will the proposed change negatively impact? It seems that all we > > > are talking about is respecting batch length if arrays happen to be > > longer. > > > > I'm suggesting to help you solve the post-read truncation problem > > without modifying the IPC protocol. If you want to make things work > > for the users without knowledge, I think this can be achieved through > > a plug-in API to define a metadata handler-callback to apply the > > truncation to the record batches. > > > > > Thanks, > > > -John > > > > > > On Wed, Oct 16, 2019 at 8:37 AM
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
I'm in Python, I'm a user, and I'm not allowed to import pyarrow because it isn't for me. There exists some Arrow record batches in plasma. I need to get one slice of one batch as a pandas dataframe. What do I do? There exists some Arrow record batches in a file. I need to get one slice of one batch as a pandas dataframe. What do I do? Are you contemplating that all of the above is possible using only pandas APIs? Does "one slice of one batch" go away once pandas (version 2) does not require conversion, since it will be zero copy and the user can slice in pandas with no performance hit? I'm really stumbling over this idea that users can't import pyarrow. I'm not sure it makes sense to continue to discuss user-level IPC (plasma, files, etc) until I can come to grips with how users use pyarrow without importing it. Once I see how it works without my proposed change, we can go back to how the user ignores the empty/undefined array portions without knowing whether they exist. -John On Wed, Oct 16, 2019 at 10:45 AM Wes McKinney wrote: > On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen wrote: > > > > "pyarrow is intended as a developer-facing library, not a user-facing > one" > > > > Is that really the core issue? I doubt you would want to add this > proposed > > logic to pandas even though it is user-facing, because then pandas will > > either have to re-implement what it means to read a batch (to respect > > length when it is smaller than array length) or else rely on the single > > blessed custom metadata for doing this, which doesn't make it custom > > anymore. > > What you have proposed in your PR amounts to an alteration of the IPC > format to suit this use case. This pushes complexity onto _every_ > implementation that will need to worry about a "truncated" record > batch. I'd rather avoid this unless it is truly the only way. > > Note that we serialize a significant amount of custom metadata already > to address pandas-specific issues, and have not had to make any > changes to the columnar format as a result. > > > I think really your concern is that perhaps nobody wants this but me, > > therefore it should not be in arrow or pandas regardless of whether it is > > user-facing? But, if that is your thinking, is it true? What is our > > solution to the locality/latency problem for systems that ingest and > > process concurrently, if not this solution? I do see it as a general > > problem that needs at least the beginnings of a general solution... not a > > "custom" one. > > We use the custom_metadata fields to implement a number of built-in > things in the project, such as extension types. If enough people find > this useful, then it can be promoted to a formalized concept. As far > as I can tell, you have developed quite a bit of custom code related > to this for your application, including manipulating Flatbuffers > metadata in place to maintain the populated length, so the barrier to > entry to being able to properly take advantage of this is rather high. > > > Also, I wonder whether it is true that pyarrow avoids smart/magical > > things. The entire concept of a "Table" seems to be in that category? > The > > docs specifically mention that it is for convenience. > > > > Table arose out of legitimate developer need. There are a number of > areas of the project that would be much more difficult if we had to > worry about regularizing column chunking at any call site that returns > an in-memory dataset. > > > I'd like to focus on two questions: > > 1- What is the Arrow general solution to the locality/latency tradeoff > > problem for systems that ingest and process data concurrently? This > > proposed solution or something else? Or if we propose not to address the > > problem, why? > > 2- What will the proposed change negatively impact? It seems that all we > > are talking about is respecting batch length if arrays happen to be > longer. > > I'm suggesting to help you solve the post-read truncation problem > without modifying the IPC protocol. If you want to make things work > for the users without knowledge, I think this can be achieved through > a plug-in API to define a metadata handler-callback to apply the > truncation to the record batches. > > > Thanks, > > -John > > > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney > wrote: > > > > > hi John, > > > > > > > As a practical matter, the reason metadata is not a good solution > for me > > > is that it requires awareness on the part of the reader. I want > (e.g.) a > > > researcher in Python to be able to map a file of batches in IPC format > > > without needing to worry about the fact that the file was built in a > > > streaming fashion and therefore has some unused array elements. > > > > > > I don't find this argument to be persuasive. > > > > > > pyarrow is intended as a developer-facing library, not a user-facing > > > one. I don't think you should be having the kinds of users you are > > > describing using pyarrow
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
On Wed, Oct 16, 2019 at 10:17 AM John Muehlhausen wrote: > > "pyarrow is intended as a developer-facing library, not a user-facing one" > > Is that really the core issue? I doubt you would want to add this proposed > logic to pandas even though it is user-facing, because then pandas will > either have to re-implement what it means to read a batch (to respect > length when it is smaller than array length) or else rely on the single > blessed custom metadata for doing this, which doesn't make it custom > anymore. What you have proposed in your PR amounts to an alteration of the IPC format to suit this use case. This pushes complexity onto _every_ implementation that will need to worry about a "truncated" record batch. I'd rather avoid this unless it is truly the only way. Note that we serialize a significant amount of custom metadata already to address pandas-specific issues, and have not had to make any changes to the columnar format as a result. > I think really your concern is that perhaps nobody wants this but me, > therefore it should not be in arrow or pandas regardless of whether it is > user-facing? But, if that is your thinking, is it true? What is our > solution to the locality/latency problem for systems that ingest and > process concurrently, if not this solution? I do see it as a general > problem that needs at least the beginnings of a general solution... not a > "custom" one. We use the custom_metadata fields to implement a number of built-in things in the project, such as extension types. If enough people find this useful, then it can be promoted to a formalized concept. As far as I can tell, you have developed quite a bit of custom code related to this for your application, including manipulating Flatbuffers metadata in place to maintain the populated length, so the barrier to entry to being able to properly take advantage of this is rather high. > Also, I wonder whether it is true that pyarrow avoids smart/magical > things. The entire concept of a "Table" seems to be in that category? The > docs specifically mention that it is for convenience. > Table arose out of legitimate developer need. There are a number of areas of the project that would be much more difficult if we had to worry about regularizing column chunking at any call site that returns an in-memory dataset. > I'd like to focus on two questions: > 1- What is the Arrow general solution to the locality/latency tradeoff > problem for systems that ingest and process data concurrently? This > proposed solution or something else? Or if we propose not to address the > problem, why? > 2- What will the proposed change negatively impact? It seems that all we > are talking about is respecting batch length if arrays happen to be longer. I'm suggesting to help you solve the post-read truncation problem without modifying the IPC protocol. If you want to make things work for the users without knowledge, I think this can be achieved through a plug-in API to define a metadata handler-callback to apply the truncation to the record batches. > Thanks, > -John > > On Wed, Oct 16, 2019 at 8:37 AM Wes McKinney wrote: > > > hi John, > > > > > As a practical matter, the reason metadata is not a good solution for me > > is that it requires awareness on the part of the reader. I want (e.g.) a > > researcher in Python to be able to map a file of batches in IPC format > > without needing to worry about the fact that the file was built in a > > streaming fashion and therefore has some unused array elements. > > > > I don't find this argument to be persuasive. > > > > pyarrow is intended as a developer-facing library, not a user-facing > > one. I don't think you should be having the kinds of users you are > > describing using pyarrow directly, instead consuming the library > > through a layer above it. Specifically, we are deliberately avoiding > > doing anything too "smart" or "magical", instead maintaining tight > > developer control over what is going on. > > > > - Wes > > > > On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield > > wrote: > > > > > > Still thinking through the implications here, but to save others from > > > having to go search [1] is the PR. > > > > > > [1] https://github.com/apache/arrow/pull/5663/files > > > > > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen wrote: > > > > > > > A proposal with linked PR now exists in ARROW-5916 and Wes commented > > that > > > > we should kick it around some more. > > > > > > > > The high-level topic is how Apache Arrow intersects with streaming > > > > methodologies: > > > > > > > > If record batches are strictly immutable, a difficult trade-off is > > created > > > > for streaming data collection: either I can have low-latency > > presentation > > > > of new data by appending very small batches (often 1 row) to the IPC > > stream > > > > and lose columnar layout benefits, or I can have high-latency > > presentation > > > > of new data by waiting to append a batch until it is
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
hi John, > As a practical matter, the reason metadata is not a good solution for me is > that it requires awareness on the part of the reader. I want (e.g.) a > researcher in Python to be able to map a file of batches in IPC format > without needing to worry about the fact that the file was built in a > streaming fashion and therefore has some unused array elements. I don't find this argument to be persuasive. pyarrow is intended as a developer-facing library, not a user-facing one. I don't think you should be having the kinds of users you are describing using pyarrow directly, instead consuming the library through a layer above it. Specifically, we are deliberately avoiding doing anything too "smart" or "magical", instead maintaining tight developer control over what is going on. - Wes On Wed, Oct 16, 2019 at 2:18 AM Micah Kornfield wrote: > > Still thinking through the implications here, but to save others from > having to go search [1] is the PR. > > [1] https://github.com/apache/arrow/pull/5663/files > > On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen wrote: > > > A proposal with linked PR now exists in ARROW-5916 and Wes commented that > > we should kick it around some more. > > > > The high-level topic is how Apache Arrow intersects with streaming > > methodologies: > > > > If record batches are strictly immutable, a difficult trade-off is created > > for streaming data collection: either I can have low-latency presentation > > of new data by appending very small batches (often 1 row) to the IPC stream > > and lose columnar layout benefits, or I can have high-latency presentation > > of new data by waiting to append a batch until it is large enough to gain > > significant columnar layout benefits. During this waiting period the new > > data is unavailable to processing. > > > > If, on the other hand, [0,length) of a batch is immutable but length may > > increase, the trade-off is eliminated: I can pre-allocate a batch and > > populate records in it when they occur (without waiting), and also gain > > columnar benefits as each "closed" batch will be large. (A batch may be > > practically "closed" before the arrays are full when the projection of > > variable-length buffer space is wrong... a space/time tradeoff in favor of > > time.) > > > > Looking ahead to a day when the reference implementation(s) will be able to > > bump RecordBatch.length while populating pre-allocated records > > in-place, ARROW-5916 reads such batches by ignoring portions of arrays that > > are beyond RecordBatch.length. > > > > If we are not looking ahead to such a day, the discussion is about the > > alternative way that Arrow will avoid the latency/locality tradeoff > > inherent in streaming data collection. Or, if the answer is "streaming > > apps are and will always be out of scope", that idea needs to be defended > > from the observation that practitioners are moving more towards the fusion > > of batch and streaming, not away from it. > > > > As a practical matter, the reason metadata is not a good solution for me is > > that it requires awareness on the part of the reader. I want (e.g.) a > > researcher in Python to be able to map a file of batches in IPC format > > without needing to worry about the fact that the file was built in a > > streaming fashion and therefore has some unused array elements. > > > > The change itself seems relatively simple. What negative consequences do > > we anticipate, if any? > > > > Thanks, > > -John > > > > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen wrote: > > > > > This seems to help... still testing it though. > > > > > > Status GetFieldMetadata(int field_index, ArrayData* out) { > > > auto nodes = metadata_->nodes(); > > > // pop off a field > > > if (field_index >= static_cast(nodes->size())) { > > > return Status::Invalid("Ran out of field metadata, likely > > > malformed"); > > > } > > > const flatbuf::FieldNode* node = nodes->Get(field_index); > > > > > > *//out->length = node->length();* > > > *out->length = metadata_->length();* > > > out->null_count = node->null_count(); > > > out->offset = 0; > > > return Status::OK(); > > > } > > > > > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen wrote: > > > > > >> So far it seems as if pyarrow is completely ignoring the > > >> RecordBatch.length field. More info to follow... > > >> > > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen wrote: > > >> > > >>> Crikey! I'll do some testing around that and suggest some test cases to > > >>> ensure it continues to work, assuming that it does. > > >>> > > >>> -John > > >>> > > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney > > wrote: > > >>> > > Thanks for the attachment, it's helpful. > > > > On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: > > > > > > Attachments referred to in previous two messages: > > > > > > >
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Still thinking through the implications here, but to save others from having to go search [1] is the PR. [1] https://github.com/apache/arrow/pull/5663/files On Tue, Oct 15, 2019 at 1:42 PM John Muehlhausen wrote: > A proposal with linked PR now exists in ARROW-5916 and Wes commented that > we should kick it around some more. > > The high-level topic is how Apache Arrow intersects with streaming > methodologies: > > If record batches are strictly immutable, a difficult trade-off is created > for streaming data collection: either I can have low-latency presentation > of new data by appending very small batches (often 1 row) to the IPC stream > and lose columnar layout benefits, or I can have high-latency presentation > of new data by waiting to append a batch until it is large enough to gain > significant columnar layout benefits. During this waiting period the new > data is unavailable to processing. > > If, on the other hand, [0,length) of a batch is immutable but length may > increase, the trade-off is eliminated: I can pre-allocate a batch and > populate records in it when they occur (without waiting), and also gain > columnar benefits as each "closed" batch will be large. (A batch may be > practically "closed" before the arrays are full when the projection of > variable-length buffer space is wrong... a space/time tradeoff in favor of > time.) > > Looking ahead to a day when the reference implementation(s) will be able to > bump RecordBatch.length while populating pre-allocated records > in-place, ARROW-5916 reads such batches by ignoring portions of arrays that > are beyond RecordBatch.length. > > If we are not looking ahead to such a day, the discussion is about the > alternative way that Arrow will avoid the latency/locality tradeoff > inherent in streaming data collection. Or, if the answer is "streaming > apps are and will always be out of scope", that idea needs to be defended > from the observation that practitioners are moving more towards the fusion > of batch and streaming, not away from it. > > As a practical matter, the reason metadata is not a good solution for me is > that it requires awareness on the part of the reader. I want (e.g.) a > researcher in Python to be able to map a file of batches in IPC format > without needing to worry about the fact that the file was built in a > streaming fashion and therefore has some unused array elements. > > The change itself seems relatively simple. What negative consequences do > we anticipate, if any? > > Thanks, > -John > > On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen wrote: > > > This seems to help... still testing it though. > > > > Status GetFieldMetadata(int field_index, ArrayData* out) { > > auto nodes = metadata_->nodes(); > > // pop off a field > > if (field_index >= static_cast(nodes->size())) { > > return Status::Invalid("Ran out of field metadata, likely > > malformed"); > > } > > const flatbuf::FieldNode* node = nodes->Get(field_index); > > > > *//out->length = node->length();* > > *out->length = metadata_->length();* > > out->null_count = node->null_count(); > > out->offset = 0; > > return Status::OK(); > > } > > > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen wrote: > > > >> So far it seems as if pyarrow is completely ignoring the > >> RecordBatch.length field. More info to follow... > >> > >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen wrote: > >> > >>> Crikey! I'll do some testing around that and suggest some test cases to > >>> ensure it continues to work, assuming that it does. > >>> > >>> -John > >>> > >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney > wrote: > >>> > Thanks for the attachment, it's helpful. > > On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: > > > > Attachments referred to in previous two messages: > > > > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 > > > > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen > wrote: > > > > > Thanks, Wes, for the thoughtful reply. I really appreciate the > > > engagement. In order to clarify things a bit, I am attaching a > graphic of > > > how our application will take record-wise (row-oriented) data from > an event > > > source and incrementally populate a pre-allocated Arrow-compatible > buffer, > > > including for variable-length fields. (Obviously at this stage I > am not > > > using the reference implementation Arrow code, although that would > be a > > > goal to contribute that back to the project.) > > > > > > For sake of simplicity these are non-nullable fields. As a > result a > > > reader of "y" that has no knowledge of the "utilized" metadata > would get a > > > long string (zeros, spaces, uninitialized, or whatever we decide > for the > > > pre-allocation model) for the record just beyond the last utilized
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
A proposal with linked PR now exists in ARROW-5916 and Wes commented that we should kick it around some more. The high-level topic is how Apache Arrow intersects with streaming methodologies: If record batches are strictly immutable, a difficult trade-off is created for streaming data collection: either I can have low-latency presentation of new data by appending very small batches (often 1 row) to the IPC stream and lose columnar layout benefits, or I can have high-latency presentation of new data by waiting to append a batch until it is large enough to gain significant columnar layout benefits. During this waiting period the new data is unavailable to processing. If, on the other hand, [0,length) of a batch is immutable but length may increase, the trade-off is eliminated: I can pre-allocate a batch and populate records in it when they occur (without waiting), and also gain columnar benefits as each "closed" batch will be large. (A batch may be practically "closed" before the arrays are full when the projection of variable-length buffer space is wrong... a space/time tradeoff in favor of time.) Looking ahead to a day when the reference implementation(s) will be able to bump RecordBatch.length while populating pre-allocated records in-place, ARROW-5916 reads such batches by ignoring portions of arrays that are beyond RecordBatch.length. If we are not looking ahead to such a day, the discussion is about the alternative way that Arrow will avoid the latency/locality tradeoff inherent in streaming data collection. Or, if the answer is "streaming apps are and will always be out of scope", that idea needs to be defended from the observation that practitioners are moving more towards the fusion of batch and streaming, not away from it. As a practical matter, the reason metadata is not a good solution for me is that it requires awareness on the part of the reader. I want (e.g.) a researcher in Python to be able to map a file of batches in IPC format without needing to worry about the fact that the file was built in a streaming fashion and therefore has some unused array elements. The change itself seems relatively simple. What negative consequences do we anticipate, if any? Thanks, -John On Fri, Jul 5, 2019 at 10:42 AM John Muehlhausen wrote: > This seems to help... still testing it though. > > Status GetFieldMetadata(int field_index, ArrayData* out) { > auto nodes = metadata_->nodes(); > // pop off a field > if (field_index >= static_cast(nodes->size())) { > return Status::Invalid("Ran out of field metadata, likely > malformed"); > } > const flatbuf::FieldNode* node = nodes->Get(field_index); > > *//out->length = node->length();* > *out->length = metadata_->length();* > out->null_count = node->null_count(); > out->offset = 0; > return Status::OK(); > } > > On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen wrote: > >> So far it seems as if pyarrow is completely ignoring the >> RecordBatch.length field. More info to follow... >> >> On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen wrote: >> >>> Crikey! I'll do some testing around that and suggest some test cases to >>> ensure it continues to work, assuming that it does. >>> >>> -John >>> >>> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney wrote: >>> Thanks for the attachment, it's helpful. On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: > > Attachments referred to in previous two messages: > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 > > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: > > > Thanks, Wes, for the thoughtful reply. I really appreciate the > > engagement. In order to clarify things a bit, I am attaching a graphic of > > how our application will take record-wise (row-oriented) data from an event > > source and incrementally populate a pre-allocated Arrow-compatible buffer, > > including for variable-length fields. (Obviously at this stage I am not > > using the reference implementation Arrow code, although that would be a > > goal to contribute that back to the project.) > > > > For sake of simplicity these are non-nullable fields. As a result a > > reader of "y" that has no knowledge of the "utilized" metadata would get a > > long string (zeros, spaces, uninitialized, or whatever we decide for the > > pre-allocation model) for the record just beyond the last utilized record. > > > > I don't see any "big O"-analysis problems with this approach. The > > space/time tradeoff is that we have to guess how much room to allocate for > > variable-length fields. We will probably almost always be wrong. This > > ends up in "wasted" space. However, we can do calculations based on these > > partially filled batches that take full advantage of the
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
This seems to help... still testing it though. Status GetFieldMetadata(int field_index, ArrayData* out) { auto nodes = metadata_->nodes(); // pop off a field if (field_index >= static_cast(nodes->size())) { return Status::Invalid("Ran out of field metadata, likely malformed"); } const flatbuf::FieldNode* node = nodes->Get(field_index); *//out->length = node->length();* *out->length = metadata_->length();* out->null_count = node->null_count(); out->offset = 0; return Status::OK(); } On Fri, Jul 5, 2019 at 10:24 AM John Muehlhausen wrote: > So far it seems as if pyarrow is completely ignoring the > RecordBatch.length field. More info to follow... > > On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen wrote: > >> Crikey! I'll do some testing around that and suggest some test cases to >> ensure it continues to work, assuming that it does. >> >> -John >> >> On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney wrote: >> >>> Thanks for the attachment, it's helpful. >>> >>> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: >>> > >>> > Attachments referred to in previous two messages: >>> > >>> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 >>> > >>> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: >>> > >>> > > Thanks, Wes, for the thoughtful reply. I really appreciate the >>> > > engagement. In order to clarify things a bit, I am attaching a >>> graphic of >>> > > how our application will take record-wise (row-oriented) data from >>> an event >>> > > source and incrementally populate a pre-allocated Arrow-compatible >>> buffer, >>> > > including for variable-length fields. (Obviously at this stage I am >>> not >>> > > using the reference implementation Arrow code, although that would >>> be a >>> > > goal to contribute that back to the project.) >>> > > >>> > > For sake of simplicity these are non-nullable fields. As a result a >>> > > reader of "y" that has no knowledge of the "utilized" metadata would >>> get a >>> > > long string (zeros, spaces, uninitialized, or whatever we decide for >>> the >>> > > pre-allocation model) for the record just beyond the last utilized >>> record. >>> > > >>> > > I don't see any "big O"-analysis problems with this approach. The >>> > > space/time tradeoff is that we have to guess how much room to >>> allocate for >>> > > variable-length fields. We will probably almost always be wrong. >>> This >>> > > ends up in "wasted" space. However, we can do calculations based on >>> these >>> > > partially filled batches that take full advantage of the columnar >>> layout. >>> > > (Here I've shown the case where we had too little variable-length >>> buffer >>> > > set aside, resulting in "wasted" rows. The flip side is that rows >>> achieve >>> > > full [1] utilization but there is wasted variable-length buffer if >>> we guess >>> > > incorrectly in the other direction.) >>> > > >>> > > I proposed a few things that are "nice to have" but really what I'm >>> eyeing >>> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see >>> that some >>> > > of the rows in a RecordBatch are not to be read, based on the new >>> > > "utilized" (or whatever name) metadata. That single tweak to the >>> > > metadata-- and readers honoring it-- is the core of the proposal. >>> > > (Proposal 4.) This would indicate that the attached example (or >>> something >>> > > similar) is the blessed approach for those seeking to accumulate >>> events and >>> > > process them while still expecting more data, with the >>> heavier-weight task >>> > > of creating a new pre-allocated batch being a rare occurrence. >>> > > >>> >>> So the "length" field in RecordBatch is already the utilized number of >>> rows. The body buffers can certainly have excess unused space. So your >>> application can mutate Flatbuffer "length" field in-place as new >>> records are filled in. >>> >>> > > Notice that the mutability is only in the sense of "appending." The >>> > > current doctrine of total immutability would be revised to refer to >>> the >>> > > immutability of only the already-populated rows. >>> > > >>> > > It gives folks an option other than choosing the lesser of two >>> evils: on >>> > > the one hand, length 1 RecordBatches that don't result in a stream >>> that is >>> > > computationally efficient. On the other hand, adding artificial >>> latency by >>> > > accumulating events before "freezing" a larger batch and only then >>> making >>> > > it available to computation. >>> > > >>> > > -John >>> > > >>> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney >>> wrote: >>> > > >>> > >> hi John, >>> > >> >>> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen >>> wrote: >>> > >> > >>> > >> > During my time building financial analytics and trading systems >>> (23 >>> > >> years!), both the "batch processing" and "stream processing" >>> paradigms have >>> > >> been extensively used by myself and by colleagues. >>> >
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
So far it seems as if pyarrow is completely ignoring the RecordBatch.length field. More info to follow... On Tue, Jul 2, 2019 at 3:02 PM John Muehlhausen wrote: > Crikey! I'll do some testing around that and suggest some test cases to > ensure it continues to work, assuming that it does. > > -John > > On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney wrote: > >> Thanks for the attachment, it's helpful. >> >> On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: >> > >> > Attachments referred to in previous two messages: >> > >> https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 >> > >> > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: >> > >> > > Thanks, Wes, for the thoughtful reply. I really appreciate the >> > > engagement. In order to clarify things a bit, I am attaching a >> graphic of >> > > how our application will take record-wise (row-oriented) data from an >> event >> > > source and incrementally populate a pre-allocated Arrow-compatible >> buffer, >> > > including for variable-length fields. (Obviously at this stage I am >> not >> > > using the reference implementation Arrow code, although that would be >> a >> > > goal to contribute that back to the project.) >> > > >> > > For sake of simplicity these are non-nullable fields. As a result a >> > > reader of "y" that has no knowledge of the "utilized" metadata would >> get a >> > > long string (zeros, spaces, uninitialized, or whatever we decide for >> the >> > > pre-allocation model) for the record just beyond the last utilized >> record. >> > > >> > > I don't see any "big O"-analysis problems with this approach. The >> > > space/time tradeoff is that we have to guess how much room to >> allocate for >> > > variable-length fields. We will probably almost always be wrong. >> This >> > > ends up in "wasted" space. However, we can do calculations based on >> these >> > > partially filled batches that take full advantage of the columnar >> layout. >> > > (Here I've shown the case where we had too little variable-length >> buffer >> > > set aside, resulting in "wasted" rows. The flip side is that rows >> achieve >> > > full [1] utilization but there is wasted variable-length buffer if we >> guess >> > > incorrectly in the other direction.) >> > > >> > > I proposed a few things that are "nice to have" but really what I'm >> eyeing >> > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that >> some >> > > of the rows in a RecordBatch are not to be read, based on the new >> > > "utilized" (or whatever name) metadata. That single tweak to the >> > > metadata-- and readers honoring it-- is the core of the proposal. >> > > (Proposal 4.) This would indicate that the attached example (or >> something >> > > similar) is the blessed approach for those seeking to accumulate >> events and >> > > process them while still expecting more data, with the heavier-weight >> task >> > > of creating a new pre-allocated batch being a rare occurrence. >> > > >> >> So the "length" field in RecordBatch is already the utilized number of >> rows. The body buffers can certainly have excess unused space. So your >> application can mutate Flatbuffer "length" field in-place as new >> records are filled in. >> >> > > Notice that the mutability is only in the sense of "appending." The >> > > current doctrine of total immutability would be revised to refer to >> the >> > > immutability of only the already-populated rows. >> > > >> > > It gives folks an option other than choosing the lesser of two evils: >> on >> > > the one hand, length 1 RecordBatches that don't result in a stream >> that is >> > > computationally efficient. On the other hand, adding artificial >> latency by >> > > accumulating events before "freezing" a larger batch and only then >> making >> > > it available to computation. >> > > >> > > -John >> > > >> > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney >> wrote: >> > > >> > >> hi John, >> > >> >> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen >> wrote: >> > >> > >> > >> > During my time building financial analytics and trading systems (23 >> > >> years!), both the "batch processing" and "stream processing" >> paradigms have >> > >> been extensively used by myself and by colleagues. >> > >> > >> > >> > Unfortunately, the tools used in these paradigms have not >> successfully >> > >> overlapped. For example, an analyst might use a Python notebook with >> > >> pandas to do some batch analysis. Then, for acceptable latency and >> > >> throughput, a C++ programmer must implement the same schemas and >> processing >> > >> logic in order to analyze real-time data for real-time decision >> support. >> > >> (Time horizons often being sub-second or even sub-millisecond for an >> > >> acceptable reaction to an event. The most aggressive software-based >> > >> systems, leaving custom hardware aside other than things like >> kernel-bypass >> > >> NICs, target 10s of microseconds for a full round
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Crikey! I'll do some testing around that and suggest some test cases to ensure it continues to work, assuming that it does. -John On Tue, Jul 2, 2019 at 2:41 PM Wes McKinney wrote: > Thanks for the attachment, it's helpful. > > On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: > > > > Attachments referred to in previous two messages: > > > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 > > > > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: > > > > > Thanks, Wes, for the thoughtful reply. I really appreciate the > > > engagement. In order to clarify things a bit, I am attaching a > graphic of > > > how our application will take record-wise (row-oriented) data from an > event > > > source and incrementally populate a pre-allocated Arrow-compatible > buffer, > > > including for variable-length fields. (Obviously at this stage I am > not > > > using the reference implementation Arrow code, although that would be a > > > goal to contribute that back to the project.) > > > > > > For sake of simplicity these are non-nullable fields. As a result a > > > reader of "y" that has no knowledge of the "utilized" metadata would > get a > > > long string (zeros, spaces, uninitialized, or whatever we decide for > the > > > pre-allocation model) for the record just beyond the last utilized > record. > > > > > > I don't see any "big O"-analysis problems with this approach. The > > > space/time tradeoff is that we have to guess how much room to allocate > for > > > variable-length fields. We will probably almost always be wrong. This > > > ends up in "wasted" space. However, we can do calculations based on > these > > > partially filled batches that take full advantage of the columnar > layout. > > > (Here I've shown the case where we had too little variable-length > buffer > > > set aside, resulting in "wasted" rows. The flip side is that rows > achieve > > > full [1] utilization but there is wasted variable-length buffer if we > guess > > > incorrectly in the other direction.) > > > > > > I proposed a few things that are "nice to have" but really what I'm > eyeing > > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that > some > > > of the rows in a RecordBatch are not to be read, based on the new > > > "utilized" (or whatever name) metadata. That single tweak to the > > > metadata-- and readers honoring it-- is the core of the proposal. > > > (Proposal 4.) This would indicate that the attached example (or > something > > > similar) is the blessed approach for those seeking to accumulate > events and > > > process them while still expecting more data, with the heavier-weight > task > > > of creating a new pre-allocated batch being a rare occurrence. > > > > > So the "length" field in RecordBatch is already the utilized number of > rows. The body buffers can certainly have excess unused space. So your > application can mutate Flatbuffer "length" field in-place as new > records are filled in. > > > > Notice that the mutability is only in the sense of "appending." The > > > current doctrine of total immutability would be revised to refer to the > > > immutability of only the already-populated rows. > > > > > > It gives folks an option other than choosing the lesser of two evils: > on > > > the one hand, length 1 RecordBatches that don't result in a stream > that is > > > computationally efficient. On the other hand, adding artificial > latency by > > > accumulating events before "freezing" a larger batch and only then > making > > > it available to computation. > > > > > > -John > > > > > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney > wrote: > > > > > >> hi John, > > >> > > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen wrote: > > >> > > > >> > During my time building financial analytics and trading systems (23 > > >> years!), both the "batch processing" and "stream processing" > paradigms have > > >> been extensively used by myself and by colleagues. > > >> > > > >> > Unfortunately, the tools used in these paradigms have not > successfully > > >> overlapped. For example, an analyst might use a Python notebook with > > >> pandas to do some batch analysis. Then, for acceptable latency and > > >> throughput, a C++ programmer must implement the same schemas and > processing > > >> logic in order to analyze real-time data for real-time decision > support. > > >> (Time horizons often being sub-second or even sub-millisecond for an > > >> acceptable reaction to an event. The most aggressive software-based > > >> systems, leaving custom hardware aside other than things like > kernel-bypass > > >> NICs, target 10s of microseconds for a full round trip from data > ingestion > > >> to decision.) > > >> > > > >> > As a result, TCO is more than doubled. A doubling can be accounted > for > > >> by two implementations that share little or nothing in the way of > > >> architecture. Then additional effort is required to ensure that these > > >>
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Thanks for the attachment, it's helpful. On Tue, Jul 2, 2019 at 1:40 PM John Muehlhausen wrote: > > Attachments referred to in previous two messages: > https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 > > On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: > > > Thanks, Wes, for the thoughtful reply. I really appreciate the > > engagement. In order to clarify things a bit, I am attaching a graphic of > > how our application will take record-wise (row-oriented) data from an event > > source and incrementally populate a pre-allocated Arrow-compatible buffer, > > including for variable-length fields. (Obviously at this stage I am not > > using the reference implementation Arrow code, although that would be a > > goal to contribute that back to the project.) > > > > For sake of simplicity these are non-nullable fields. As a result a > > reader of "y" that has no knowledge of the "utilized" metadata would get a > > long string (zeros, spaces, uninitialized, or whatever we decide for the > > pre-allocation model) for the record just beyond the last utilized record. > > > > I don't see any "big O"-analysis problems with this approach. The > > space/time tradeoff is that we have to guess how much room to allocate for > > variable-length fields. We will probably almost always be wrong. This > > ends up in "wasted" space. However, we can do calculations based on these > > partially filled batches that take full advantage of the columnar layout. > > (Here I've shown the case where we had too little variable-length buffer > > set aside, resulting in "wasted" rows. The flip side is that rows achieve > > full [1] utilization but there is wasted variable-length buffer if we guess > > incorrectly in the other direction.) > > > > I proposed a few things that are "nice to have" but really what I'm eyeing > > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some > > of the rows in a RecordBatch are not to be read, based on the new > > "utilized" (or whatever name) metadata. That single tweak to the > > metadata-- and readers honoring it-- is the core of the proposal. > > (Proposal 4.) This would indicate that the attached example (or something > > similar) is the blessed approach for those seeking to accumulate events and > > process them while still expecting more data, with the heavier-weight task > > of creating a new pre-allocated batch being a rare occurrence. > > So the "length" field in RecordBatch is already the utilized number of rows. The body buffers can certainly have excess unused space. So your application can mutate Flatbuffer "length" field in-place as new records are filled in. > > Notice that the mutability is only in the sense of "appending." The > > current doctrine of total immutability would be revised to refer to the > > immutability of only the already-populated rows. > > > > It gives folks an option other than choosing the lesser of two evils: on > > the one hand, length 1 RecordBatches that don't result in a stream that is > > computationally efficient. On the other hand, adding artificial latency by > > accumulating events before "freezing" a larger batch and only then making > > it available to computation. > > > > -John > > > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney wrote: > > > >> hi John, > >> > >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen wrote: > >> > > >> > During my time building financial analytics and trading systems (23 > >> years!), both the "batch processing" and "stream processing" paradigms have > >> been extensively used by myself and by colleagues. > >> > > >> > Unfortunately, the tools used in these paradigms have not successfully > >> overlapped. For example, an analyst might use a Python notebook with > >> pandas to do some batch analysis. Then, for acceptable latency and > >> throughput, a C++ programmer must implement the same schemas and processing > >> logic in order to analyze real-time data for real-time decision support. > >> (Time horizons often being sub-second or even sub-millisecond for an > >> acceptable reaction to an event. The most aggressive software-based > >> systems, leaving custom hardware aside other than things like kernel-bypass > >> NICs, target 10s of microseconds for a full round trip from data ingestion > >> to decision.) > >> > > >> > As a result, TCO is more than doubled. A doubling can be accounted for > >> by two implementations that share little or nothing in the way of > >> architecture. Then additional effort is required to ensure that these > >> implementations continue to behave the same way and are upgraded in > >> lock-step. > >> > > >> > Arrow purports to be a "bridge" technology that eases one of the pain > >> points of working in different ecosystems by providing a common event > >> stream data structure. (Discussion of common processing techniques is > >> beyond the scope of this discussion. Suffice it to say that a streaming > >> algo can
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Attachments referred to in previous two messages: https://www.dropbox.com/sh/6ycfuivrx70q2jx/AAAt-RDaZWmQ2VqlM-0s6TqWa?dl=0 On Tue, Jul 2, 2019 at 1:14 PM John Muehlhausen wrote: > Thanks, Wes, for the thoughtful reply. I really appreciate the > engagement. In order to clarify things a bit, I am attaching a graphic of > how our application will take record-wise (row-oriented) data from an event > source and incrementally populate a pre-allocated Arrow-compatible buffer, > including for variable-length fields. (Obviously at this stage I am not > using the reference implementation Arrow code, although that would be a > goal to contribute that back to the project.) > > For sake of simplicity these are non-nullable fields. As a result a > reader of "y" that has no knowledge of the "utilized" metadata would get a > long string (zeros, spaces, uninitialized, or whatever we decide for the > pre-allocation model) for the record just beyond the last utilized record. > > I don't see any "big O"-analysis problems with this approach. The > space/time tradeoff is that we have to guess how much room to allocate for > variable-length fields. We will probably almost always be wrong. This > ends up in "wasted" space. However, we can do calculations based on these > partially filled batches that take full advantage of the columnar layout. > (Here I've shown the case where we had too little variable-length buffer > set aside, resulting in "wasted" rows. The flip side is that rows achieve > full [1] utilization but there is wasted variable-length buffer if we guess > incorrectly in the other direction.) > > I proposed a few things that are "nice to have" but really what I'm eyeing > is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some > of the rows in a RecordBatch are not to be read, based on the new > "utilized" (or whatever name) metadata. That single tweak to the > metadata-- and readers honoring it-- is the core of the proposal. > (Proposal 4.) This would indicate that the attached example (or something > similar) is the blessed approach for those seeking to accumulate events and > process them while still expecting more data, with the heavier-weight task > of creating a new pre-allocated batch being a rare occurrence. > > Notice that the mutability is only in the sense of "appending." The > current doctrine of total immutability would be revised to refer to the > immutability of only the already-populated rows. > > It gives folks an option other than choosing the lesser of two evils: on > the one hand, length 1 RecordBatches that don't result in a stream that is > computationally efficient. On the other hand, adding artificial latency by > accumulating events before "freezing" a larger batch and only then making > it available to computation. > > -John > > On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney wrote: > >> hi John, >> >> On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen wrote: >> > >> > During my time building financial analytics and trading systems (23 >> years!), both the "batch processing" and "stream processing" paradigms have >> been extensively used by myself and by colleagues. >> > >> > Unfortunately, the tools used in these paradigms have not successfully >> overlapped. For example, an analyst might use a Python notebook with >> pandas to do some batch analysis. Then, for acceptable latency and >> throughput, a C++ programmer must implement the same schemas and processing >> logic in order to analyze real-time data for real-time decision support. >> (Time horizons often being sub-second or even sub-millisecond for an >> acceptable reaction to an event. The most aggressive software-based >> systems, leaving custom hardware aside other than things like kernel-bypass >> NICs, target 10s of microseconds for a full round trip from data ingestion >> to decision.) >> > >> > As a result, TCO is more than doubled. A doubling can be accounted for >> by two implementations that share little or nothing in the way of >> architecture. Then additional effort is required to ensure that these >> implementations continue to behave the same way and are upgraded in >> lock-step. >> > >> > Arrow purports to be a "bridge" technology that eases one of the pain >> points of working in different ecosystems by providing a common event >> stream data structure. (Discussion of common processing techniques is >> beyond the scope of this discussion. Suffice it to say that a streaming >> algo can always be run in batch, but not vice versa.) >> > >> > Arrow seems to be growing up primarily in the batch processing world. >> One publication notes that "the missing piece is streaming, where the >> velocity of incoming data poses a special challenge. There are some early >> experiments to populate Arrow nodes in microbatches..." [1] Part our our >> discussion could be a response to this observation. In what ways is it >> true or false? What are the plans to remedy this shortcoming, if
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
Thanks, Wes, for the thoughtful reply. I really appreciate the engagement. In order to clarify things a bit, I am attaching a graphic of how our application will take record-wise (row-oriented) data from an event source and incrementally populate a pre-allocated Arrow-compatible buffer, including for variable-length fields. (Obviously at this stage I am not using the reference implementation Arrow code, although that would be a goal to contribute that back to the project.) For sake of simplicity these are non-nullable fields. As a result a reader of "y" that has no knowledge of the "utilized" metadata would get a long string (zeros, spaces, uninitialized, or whatever we decide for the pre-allocation model) for the record just beyond the last utilized record. I don't see any "big O"-analysis problems with this approach. The space/time tradeoff is that we have to guess how much room to allocate for variable-length fields. We will probably almost always be wrong. This ends up in "wasted" space. However, we can do calculations based on these partially filled batches that take full advantage of the columnar layout. (Here I've shown the case where we had too little variable-length buffer set aside, resulting in "wasted" rows. The flip side is that rows achieve full [1] utilization but there is wasted variable-length buffer if we guess incorrectly in the other direction.) I proposed a few things that are "nice to have" but really what I'm eyeing is the ability for a reader-- any reader (e.g. pyarrow)-- to see that some of the rows in a RecordBatch are not to be read, based on the new "utilized" (or whatever name) metadata. That single tweak to the metadata-- and readers honoring it-- is the core of the proposal. (Proposal 4.) This would indicate that the attached example (or something similar) is the blessed approach for those seeking to accumulate events and process them while still expecting more data, with the heavier-weight task of creating a new pre-allocated batch being a rare occurrence. Notice that the mutability is only in the sense of "appending." The current doctrine of total immutability would be revised to refer to the immutability of only the already-populated rows. It gives folks an option other than choosing the lesser of two evils: on the one hand, length 1 RecordBatches that don't result in a stream that is computationally efficient. On the other hand, adding artificial latency by accumulating events before "freezing" a larger batch and only then making it available to computation. -John On Tue, Jul 2, 2019 at 12:21 PM Wes McKinney wrote: > hi John, > > On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen wrote: > > > > During my time building financial analytics and trading systems (23 > years!), both the "batch processing" and "stream processing" paradigms have > been extensively used by myself and by colleagues. > > > > Unfortunately, the tools used in these paradigms have not successfully > overlapped. For example, an analyst might use a Python notebook with > pandas to do some batch analysis. Then, for acceptable latency and > throughput, a C++ programmer must implement the same schemas and processing > logic in order to analyze real-time data for real-time decision support. > (Time horizons often being sub-second or even sub-millisecond for an > acceptable reaction to an event. The most aggressive software-based > systems, leaving custom hardware aside other than things like kernel-bypass > NICs, target 10s of microseconds for a full round trip from data ingestion > to decision.) > > > > As a result, TCO is more than doubled. A doubling can be accounted for > by two implementations that share little or nothing in the way of > architecture. Then additional effort is required to ensure that these > implementations continue to behave the same way and are upgraded in > lock-step. > > > > Arrow purports to be a "bridge" technology that eases one of the pain > points of working in different ecosystems by providing a common event > stream data structure. (Discussion of common processing techniques is > beyond the scope of this discussion. Suffice it to say that a streaming > algo can always be run in batch, but not vice versa.) > > > > Arrow seems to be growing up primarily in the batch processing world. > One publication notes that "the missing piece is streaming, where the > velocity of incoming data poses a special challenge. There are some early > experiments to populate Arrow nodes in microbatches..." [1] Part our our > discussion could be a response to this observation. In what ways is it > true or false? What are the plans to remedy this shortcoming, if it > exists? What steps can be taken now to ease the transition to low-latency > streaming support in the future? > > > > Arrow columnar format describes a collection of records with values > between records being placed adjacent to each other in memory. If you > break that assumption, you don't have a
Re: [Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
hi John, On Tue, Jul 2, 2019 at 11:23 AM John Muehlhausen wrote: > > During my time building financial analytics and trading systems (23 years!), > both the "batch processing" and "stream processing" paradigms have been > extensively used by myself and by colleagues. > > Unfortunately, the tools used in these paradigms have not successfully > overlapped. For example, an analyst might use a Python notebook with pandas > to do some batch analysis. Then, for acceptable latency and throughput, a > C++ programmer must implement the same schemas and processing logic in order > to analyze real-time data for real-time decision support. (Time horizons > often being sub-second or even sub-millisecond for an acceptable reaction to > an event. The most aggressive software-based systems, leaving custom > hardware aside other than things like kernel-bypass NICs, target 10s of > microseconds for a full round trip from data ingestion to decision.) > > As a result, TCO is more than doubled. A doubling can be accounted for by > two implementations that share little or nothing in the way of architecture. > Then additional effort is required to ensure that these implementations > continue to behave the same way and are upgraded in lock-step. > > Arrow purports to be a "bridge" technology that eases one of the pain points > of working in different ecosystems by providing a common event stream data > structure. (Discussion of common processing techniques is beyond the scope > of this discussion. Suffice it to say that a streaming algo can always be > run in batch, but not vice versa.) > > Arrow seems to be growing up primarily in the batch processing world. One > publication notes that "the missing piece is streaming, where the velocity of > incoming data poses a special challenge. There are some early experiments to > populate Arrow nodes in microbatches..." [1] Part our our discussion could > be a response to this observation. In what ways is it true or false? What > are the plans to remedy this shortcoming, if it exists? What steps can be > taken now to ease the transition to low-latency streaming support in the > future? > Arrow columnar format describes a collection of records with values between records being placed adjacent to each other in memory. If you break that assumption, you don't have a columnar format anymore. So I don't where the "shortcoming" is. We don't have any software in the project for managing the creation of record batches in a streaming application, but this seems like an interesting development expansion area for the project. Note that many contributors have already expanded the surface area of what's in the Arrow libraries in many directions. Streaming data collection is yet another area of expansion, but _personally_ it is not on the short list of projects that I will personally be working on (or asking my direct or indirect colleagues to work on). Since this is a project made up of volunteers, it's up to contributors to drive new directions for the project by writing design documents and pull requests. > In my own experience, a successful strategy for stream processing where > context (i.e. recent past events) must be considered by calculations is to > pre-allocate memory for event collection, to organize this memory in a > columnar layout, and to run incremental calculations at each event ingress > into the partially populated memory. [Fig 1] When the pre-allocated memory > has been exhausted, allocate a new batch of column-wise memory and continue. > When a batch is no longer pertinent to the calculation look-back window, free > the memory back to the heap or pool. > > Here we run into the first philosophical barrier with Arrow, where "Arrow > data is immutable." [2] There is currently little or no consideration for > reading a partially constructed RecordBatch, e.g. one with only some of the > rows containing event data at the present moment in time. > It seems like the use case you have heavily revolves around mutating pre-allocated, memory-mapped datasets that are being consumed by other processes on the same host. So you want to incrementally fill some memory-mapped data that you've already exposed to another process. Because of the memory layout for variable-size and nested cells, it is impossible in general to mutate Arrow record batches. This is not a philosophical position: this was a deliberate technical decision to guarantee data locality for scans and predictable O(1) random access on variable-length and nested data. Technically speaking, you can mutate memory in-place for fixed-size types in-RAM or on-disk, if you want to. It's an "off-label" use case but no one is saying you can't do this. > Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated > records of a RecordBatch instead of to all records? > Per above, this is impossible in generality. You can't alter variable-length or nested records without
[Discuss] Streaming: Differentiate between length of RecordBatch and utilized portion-- common use-case?
During my time building financial analytics and trading systems (23 years!), both the "batch processing" and "stream processing" paradigms have been extensively used by myself and by colleagues. Unfortunately, the tools used in these paradigms have not successfully overlapped. For example, an analyst might use a Python notebook with pandas to do some batch analysis. Then, for acceptable latency and throughput, a C++ programmer must implement the same schemas and processing logic in order to analyze real-time data for real-time decision support. (Time horizons often being sub-second or even sub-millisecond for an acceptable reaction to an event. The most aggressive software-based systems, leaving custom hardware aside other than things like kernel-bypass NICs, target 10s of microseconds for a full round trip from data ingestion to decision.) As a result, TCO is more than doubled. A doubling can be accounted for by two implementations that share little or nothing in the way of architecture. Then additional effort is required to ensure that these implementations continue to behave the same way and are upgraded in lock-step. Arrow purports to be a "bridge" technology that eases one of the pain points of working in different ecosystems by providing a common event stream data structure. (Discussion of common processing techniques is beyond the scope of this discussion. Suffice it to say that a streaming algo can always be run in batch, but not vice versa.) Arrow seems to be growing up primarily in the batch processing world. One publication notes that "the missing piece is streaming, where the velocity of incoming data poses a special challenge. There are some early experiments to populate Arrow nodes in microbatches..." [1] Part our our discussion could be a response to this observation. In what ways is it true or false? What are the plans to remedy this shortcoming, if it exists? What steps can be taken now to ease the transition to low-latency streaming support in the future? In my own experience, a successful strategy for stream processing where context (i.e. recent past events) must be considered by calculations is to pre-allocate memory for event collection, to organize this memory in a columnar layout, and to run incremental calculations at each event ingress into the partially populated memory. [Fig 1] When the pre-allocated memory has been exhausted, allocate a new batch of column-wise memory and continue. When a batch is no longer pertinent to the calculation look-back window, free the memory back to the heap or pool. Here we run into the first philosophical barrier with Arrow, where "Arrow data is immutable." [2] There is currently little or no consideration for reading a partially constructed RecordBatch, e.g. one with only some of the rows containing event data at the present moment in time. Proposal 1: Shift the Arrow "immutability" doctrine to apply to populated records of a RecordBatch instead of to all records? As an alternative approach, RecordBatch can be used as a single Record (batch length of one). [Fig 2] In this approach the benefit of the columnar layout is lost for look-back window processing. Another alternative approach is to collect an entire RecordBatch before stepping through it with the stream processing calculation. [Fig 3] With this approach some columnar processing benefit can be recovered, however artificial latency is introduced. As tolerance for delays in decision support dwindles, this model will be of increasingly limited value. It is already unworkable in many areas of finance. When considering the Arrow format and variable length values such as strings, the pre-allocation approach (and subsequent processing of a partially populated batch) encounters a hiccup. How do we know the amount of buffer space to pre-allocate? If we allocate too much buffer for variable-length data, some of it will be unused. If we allocate too little buffer for variable-length data, some row entities will be unusable. (Additional "rows" remain but when populating string fields there is no longer string storage space to point them to.) As with many optimization space/time tradeoff problems, the solution seems to be to guess. Pre-allocation sets aside variable length buffer storage based on the typical "expected size" of the variable length data. This can result in some unused rows, as discussed above. [Fig 4] In fact it will necessarily result in one unused row unless the last of each variable length field in the last row exactly fits into the remaining space in the variable length data buffer. Consider the case where there is more variable length buffer space than data: Given variable-length field x, last row index of y, variable length buffer v, beginning offset into v of o: x[y] begins at o x[y] ends at the offset of the next record, there is no next record, so x[y] ends after the total remaining area in variable length buffer... however, this