Comments inline below.

________________________________
From: Steve Lawrence <slawre...@apache.org>
Sent: Thursday, November 16, 2017 10:04 AM
To: Mike Beckerle; dev@daffodil.apache.org
Subject: Re: streaming API design note for review

On 11/09/2017 03:23 PM, Mike Beckerle wrote:
>
> The low-level changes to support this API I believe require us to implement 
> the
> bucket algorithm for the DataInputStream layer, so that the stream isn't being
> read into an ever-growing buffer.
>
>
> The key need/idea here is that the state, including our DaffodilInputStream
> state, and it's current bitPos0b on the input stream, is preserved from one
> parse call to the next so that the next parse picks up with the bitPos0b 
> exactly
> where the prior parse left it. That eliminates the java InputStream/Channel
> issue - the caller provides that once, upon which it disappears into the 
> state.
>
>
> This API I proposed thusfar provides no guarantees about how the java
> InputStream is positioned after a parse call returns except that only a finite
> section of it will have been consumed, meaning no seek to the end will have 
> been
> performed. The amount consumed may, as you observed, be more than what was
> converted into the infoset, whether for pre-fetch reasons, or 
> try-and-backtrack
> reasons.
>
>
> If one calls parse(), and a parse error prevents it from parsing anything, one
> might want the InputStream positioned where it started. I think it may be
> possible to provide this but only if the API requires a BufferedInputStream on
> input. But I think we should leave this out, and require users to implement 
> this
> behavior themselves - something they can always do, by their own use of
> BufferedInputStream's mark and reset calls. This repositioning back to where 
> it
> started may not be the desired behavior. Leaving the state such that yet 
> another
> parse call can be done.
>
>
> I'd like to avoid exposing something as rich as DataInputStream to the API, 
> but
> I think making the State something that is passed to the dataProcessor
> parse/unparse methods is probably essential.
>
>
> If we allow the state object to be visible at the API, then in theory one 
> could
> pass it to different data processors that will then try to parse different
> things from that stream/state. E.g., suppose you want to parse first a pcap
> header, then you want to call parse() to fetch and parse each packet. You 
> cannot
> do that with the API as proposed.
>
>
> This sounds like a real flaw in the design. There's lots of 
> header-body*-trailer
> data formats in the world where one does NOT want to create a single document,
> but rather one wants to parse the header, then in a loop parse each body 
> record,
> and when that fails, parse the trailer record.
>
>
> That allows one to process gigantic header-body*-trailer files in finite 
> space.
>
>
> So that's two things to reconsider:
>
>
> 1) precise InputStream behavior - providing some guarantees about where the
> InputStream is positioned.
>
> 2) enabling different data processors to parse from the same InputStream if
> called in sequence.
>


Regarding issue #1, I'm not sure we can really make any guarantees about
where the InputStream ends up. And this seems pretty standard for Java
APIs. For example, the Scanner class accepts an InputStream, but under
the hood, it essentially just reads the InputStream into a large buffer
and then acts on that Buffer. If the user ends the scanning before
reaching the end of the buffer, that extra data is lost.

Ok. So I am on board with just not having this requirement.

However, if we really wanted to support this, I think it's possible. If
an InputStream supports mark()'s, we could just take a mark at the
beginning of parse(), and at the end of parse we reset() back to that
mark and then skip() the number of parsed bytes. The drawback to this is
that the InputStream will now buffer the data, that buffer size is
limited (but could be controlled by a tunable), and we're already
buffering the data for our own backtrack purposes in our
DaffodiInputStream, so we're wasting space. This just seems to add extra
complication to support guarantees that most API's don't even make as
is. So I'm not sure 1 is necessary for generic java.io.InputStreams. If
someone passes an InputStream to parse(), we will make no guarantees
about where the InputStream is positioned at the end.

Which means if we want to supporting #2, we need to have some wrapper
around an InputStream that can buffer the data (ideally in buckets, but
that is an implementation detail), and that buffered data could be
shared among different calls to parse of potentially different data
processors. So how to share that data?

== Method #1:

Currently, the parse method looks something like this:

  def parse(is: InputStream, ...) {
    val dis = new DaffodilInputStream(is)
    val pstate = new PState(dis, ...)
    val res = doParse(pstate)
    res
  }

So we create our buffering wrapper around the InputStream, store that in
the pstate, do the parse, and return the parse result. Since the
ParseResult contains a pointer to the PState, which contains a pointer
to the DaffodilInputStream, we could potentially do something like this:

  val is = // get InputStream
  val pr1 = dp1.parse(is, ...)
  val pr2 = dp2.parse(pr1, ...)

And then we could have this overloaded parse method:

  def parse(pr: ParseResult, ...) {
    val dis = pr.pstate.dis
    val pstate = new PState(dis, ...)
    val res = doParse(pstate)
    res
  }

So the parse call, potentially from a completely different
DataProcessor, just extracts the DaffodilInputStream, creates a new
PState with it, and then parses. So parsing an InputStream in sequence
just means passing the previous ParseResult to the next parse() call.
This has a nice benefit in that it completely hides the internals of our
DaffodilInputStream or how we buffer the InputStream. It also logically
makes sense--let's start a parse from the result of a previous start. It
does have some downsides though. For example, this almost implies you
could do something like this:

  val pr1 = dp1.parse(is, ...)
  val pr2 = dp2.parse(pr1, ...)
  val pr3 = dp3.parse(pr1, ...)

I think a user should have to use some explicit device to make a split 
duplicated stream. I don't expect much call for it.

Note that we're passing pr1 to both dp2 and dp3 parses. The idea here is
that we first parse dp1, then we want to start two different parses from
the exact same spot where dp1 finished. This clearly wouldn't work since
the DaffodilInputStream is mutable. Now, we could potentially make it so
parse(ParseResult) creates a copy of the DaffodilInputStream, but there
are extra complications that now two DIS's both share a single
InputStream, so extra care is needed to make that work. I'd rather just
not support this use case, as it adds a lot of extra complexity. Though,
maybe this just needs more thought? Might be too terrible?

Another minor issue is that calling parse now depends on the result of
another, which means that ParseResult must stick around. This means it
can't be garbage collected  until the second parse finishes (maybe not a
big deal). It also means streaming logic becomes a bit awkward, since
the first parse needs an InputStream, and the following parses need a
ParseResult,

  var prevPR = null
  def items : Stream[Node] = {
    xmlOut.reset()
    val pr =
      if (prevPR == null) {
        dp.parse(is, xmlOut)
      } els {
        dp.parse(prevPR, xmlOut)
      }
    prevPR = pr
    val item = if (pr.isError) Nil else xmlOut.getResult()
    item #:: items
  }

Not awful, but definitely a bit awkward, and doesn't seem like the
correct functional way to do it.

== Method #2:

An alternative to the above is to just make DaffodilInputStream part of
the public API. This does make a bit more of the API visible, which
isn't ideal, but I think that can be limited to very little pretty
easily. Then the API just become ssomething like this:

  val is = // get InputStream
  val dis = DaffodilInputStream(is)
  val pr1 = dp1.parse(dis, ...)
  val pr2 = dp2.parse(dis, ...)

I like this design best.  I wonder if we should make a complete PState object, 
including the input stream, the infosetOutputter, and the optional 
lengthLimitInBits, and then just pass that one object to parse over and over. 
This would minimize object allocation.

I don't feel terribly strongly about it either way though.

Also, the DaffodilInputStream(is) could return a object that has a very small 
interface so that we're not exposing the API of the internal DataInputStream 
class/trait.  Just constructing it, and little else.

This method might want to be called parseOne(PState) to make it clear that one 
item is being parsed off the front of the input stream.




The DaffodilInputStream will be very similar to an InputStream in that
little guarantees are made. The only real guarantee is that after a call
to parse() is made, the same DaffodilInputStream can be passed to anther
call to parse to continue where the other left off. Anything else (e.g.
multiple threads acting on the same stream), all bets are off. With
this, the streaming example is also very simple:

  val is = // get InputStream
  val dis = DaffodilInputStream(is)

  def items : Stream[Node] = {
    xmlOut.reset()
    val pr = dp.parse(dis, xmlOut)
    val item = if (pr.isError) Nil else xmlOut.getResult()
    item #:: items
  }

== Method #3

We have a new DataProcessor that access an InputStream:

  val sdp = StreamingDataProcessor(is)

And we can successively call parse on this, but parse accepts a
DataProcessor to use, e.g.

  val pr1 = sdp.parse(dp1)
  val pr2 = sdp.parse(dp2)
  val pr3 = sdp.parse(dp3)

Essentially the StreamingDataProcessor wraps the InputStream in a
DaffodilInputStream, stores that, and then just passes it to the parse
call of each dp. So the code looks something like:

  class StreamingDataProcessor(is: InputStream) {
    val dis = new DaffodilInputStream(is)

    def parse(dp: DataProcessor) = {
      dp.parse(dis)
    }
  }

Note that this is very similar to Method #2. The DataProcessor class
still must be modified to accept a DaffidilInputStream in the parse
method. All this does is hide creation of the DaffodilInputStream, which
I'm not sure gains very much.


Definitely open to other thoughts and ideas. There's also other thing
sto think about (e.g. how bucketing affects this, how to support layers,
large data blobs, and efficient regular expressions), but I think those
are mostly implementation details of the DaffodilInputStream that aren't
affected too much from the actual API usage. I plan to investigate these
soon and begin some discussions on how that might all work.

- Steve

Reply via email to