Comments inline below.
________________________________ From: Steve Lawrence <slawre...@apache.org> Sent: Thursday, November 16, 2017 10:04 AM To: Mike Beckerle; dev@daffodil.apache.org Subject: Re: streaming API design note for review On 11/09/2017 03:23 PM, Mike Beckerle wrote: > > The low-level changes to support this API I believe require us to implement > the > bucket algorithm for the DataInputStream layer, so that the stream isn't being > read into an ever-growing buffer. > > > The key need/idea here is that the state, including our DaffodilInputStream > state, and it's current bitPos0b on the input stream, is preserved from one > parse call to the next so that the next parse picks up with the bitPos0b > exactly > where the prior parse left it. That eliminates the java InputStream/Channel > issue - the caller provides that once, upon which it disappears into the > state. > > > This API I proposed thusfar provides no guarantees about how the java > InputStream is positioned after a parse call returns except that only a finite > section of it will have been consumed, meaning no seek to the end will have > been > performed. The amount consumed may, as you observed, be more than what was > converted into the infoset, whether for pre-fetch reasons, or > try-and-backtrack > reasons. > > > If one calls parse(), and a parse error prevents it from parsing anything, one > might want the InputStream positioned where it started. I think it may be > possible to provide this but only if the API requires a BufferedInputStream on > input. But I think we should leave this out, and require users to implement > this > behavior themselves - something they can always do, by their own use of > BufferedInputStream's mark and reset calls. This repositioning back to where > it > started may not be the desired behavior. Leaving the state such that yet > another > parse call can be done. > > > I'd like to avoid exposing something as rich as DataInputStream to the API, > but > I think making the State something that is passed to the dataProcessor > parse/unparse methods is probably essential. > > > If we allow the state object to be visible at the API, then in theory one > could > pass it to different data processors that will then try to parse different > things from that stream/state. E.g., suppose you want to parse first a pcap > header, then you want to call parse() to fetch and parse each packet. You > cannot > do that with the API as proposed. > > > This sounds like a real flaw in the design. There's lots of > header-body*-trailer > data formats in the world where one does NOT want to create a single document, > but rather one wants to parse the header, then in a loop parse each body > record, > and when that fails, parse the trailer record. > > > That allows one to process gigantic header-body*-trailer files in finite > space. > > > So that's two things to reconsider: > > > 1) precise InputStream behavior - providing some guarantees about where the > InputStream is positioned. > > 2) enabling different data processors to parse from the same InputStream if > called in sequence. > Regarding issue #1, I'm not sure we can really make any guarantees about where the InputStream ends up. And this seems pretty standard for Java APIs. For example, the Scanner class accepts an InputStream, but under the hood, it essentially just reads the InputStream into a large buffer and then acts on that Buffer. If the user ends the scanning before reaching the end of the buffer, that extra data is lost. Ok. So I am on board with just not having this requirement. However, if we really wanted to support this, I think it's possible. If an InputStream supports mark()'s, we could just take a mark at the beginning of parse(), and at the end of parse we reset() back to that mark and then skip() the number of parsed bytes. The drawback to this is that the InputStream will now buffer the data, that buffer size is limited (but could be controlled by a tunable), and we're already buffering the data for our own backtrack purposes in our DaffodiInputStream, so we're wasting space. This just seems to add extra complication to support guarantees that most API's don't even make as is. So I'm not sure 1 is necessary for generic java.io.InputStreams. If someone passes an InputStream to parse(), we will make no guarantees about where the InputStream is positioned at the end. Which means if we want to supporting #2, we need to have some wrapper around an InputStream that can buffer the data (ideally in buckets, but that is an implementation detail), and that buffered data could be shared among different calls to parse of potentially different data processors. So how to share that data? == Method #1: Currently, the parse method looks something like this: def parse(is: InputStream, ...) { val dis = new DaffodilInputStream(is) val pstate = new PState(dis, ...) val res = doParse(pstate) res } So we create our buffering wrapper around the InputStream, store that in the pstate, do the parse, and return the parse result. Since the ParseResult contains a pointer to the PState, which contains a pointer to the DaffodilInputStream, we could potentially do something like this: val is = // get InputStream val pr1 = dp1.parse(is, ...) val pr2 = dp2.parse(pr1, ...) And then we could have this overloaded parse method: def parse(pr: ParseResult, ...) { val dis = pr.pstate.dis val pstate = new PState(dis, ...) val res = doParse(pstate) res } So the parse call, potentially from a completely different DataProcessor, just extracts the DaffodilInputStream, creates a new PState with it, and then parses. So parsing an InputStream in sequence just means passing the previous ParseResult to the next parse() call. This has a nice benefit in that it completely hides the internals of our DaffodilInputStream or how we buffer the InputStream. It also logically makes sense--let's start a parse from the result of a previous start. It does have some downsides though. For example, this almost implies you could do something like this: val pr1 = dp1.parse(is, ...) val pr2 = dp2.parse(pr1, ...) val pr3 = dp3.parse(pr1, ...) I think a user should have to use some explicit device to make a split duplicated stream. I don't expect much call for it. Note that we're passing pr1 to both dp2 and dp3 parses. The idea here is that we first parse dp1, then we want to start two different parses from the exact same spot where dp1 finished. This clearly wouldn't work since the DaffodilInputStream is mutable. Now, we could potentially make it so parse(ParseResult) creates a copy of the DaffodilInputStream, but there are extra complications that now two DIS's both share a single InputStream, so extra care is needed to make that work. I'd rather just not support this use case, as it adds a lot of extra complexity. Though, maybe this just needs more thought? Might be too terrible? Another minor issue is that calling parse now depends on the result of another, which means that ParseResult must stick around. This means it can't be garbage collected until the second parse finishes (maybe not a big deal). It also means streaming logic becomes a bit awkward, since the first parse needs an InputStream, and the following parses need a ParseResult, var prevPR = null def items : Stream[Node] = { xmlOut.reset() val pr = if (prevPR == null) { dp.parse(is, xmlOut) } els { dp.parse(prevPR, xmlOut) } prevPR = pr val item = if (pr.isError) Nil else xmlOut.getResult() item #:: items } Not awful, but definitely a bit awkward, and doesn't seem like the correct functional way to do it. == Method #2: An alternative to the above is to just make DaffodilInputStream part of the public API. This does make a bit more of the API visible, which isn't ideal, but I think that can be limited to very little pretty easily. Then the API just become ssomething like this: val is = // get InputStream val dis = DaffodilInputStream(is) val pr1 = dp1.parse(dis, ...) val pr2 = dp2.parse(dis, ...) I like this design best. I wonder if we should make a complete PState object, including the input stream, the infosetOutputter, and the optional lengthLimitInBits, and then just pass that one object to parse over and over. This would minimize object allocation. I don't feel terribly strongly about it either way though. Also, the DaffodilInputStream(is) could return a object that has a very small interface so that we're not exposing the API of the internal DataInputStream class/trait. Just constructing it, and little else. This method might want to be called parseOne(PState) to make it clear that one item is being parsed off the front of the input stream. The DaffodilInputStream will be very similar to an InputStream in that little guarantees are made. The only real guarantee is that after a call to parse() is made, the same DaffodilInputStream can be passed to anther call to parse to continue where the other left off. Anything else (e.g. multiple threads acting on the same stream), all bets are off. With this, the streaming example is also very simple: val is = // get InputStream val dis = DaffodilInputStream(is) def items : Stream[Node] = { xmlOut.reset() val pr = dp.parse(dis, xmlOut) val item = if (pr.isError) Nil else xmlOut.getResult() item #:: items } == Method #3 We have a new DataProcessor that access an InputStream: val sdp = StreamingDataProcessor(is) And we can successively call parse on this, but parse accepts a DataProcessor to use, e.g. val pr1 = sdp.parse(dp1) val pr2 = sdp.parse(dp2) val pr3 = sdp.parse(dp3) Essentially the StreamingDataProcessor wraps the InputStream in a DaffodilInputStream, stores that, and then just passes it to the parse call of each dp. So the code looks something like: class StreamingDataProcessor(is: InputStream) { val dis = new DaffodilInputStream(is) def parse(dp: DataProcessor) = { dp.parse(dis) } } Note that this is very similar to Method #2. The DataProcessor class still must be modified to accept a DaffidilInputStream in the parse method. All this does is hide creation of the DaffodilInputStream, which I'm not sure gains very much. Definitely open to other thoughts and ideas. There's also other thing sto think about (e.g. how bucketing affects this, how to support layers, large data blobs, and efficient regular expressions), but I think those are mostly implementation details of the DaffodilInputStream that aren't affected too much from the actual API usage. I plan to investigate these soon and begin some discussions on how that might all work. - Steve