I suggest keep it simple. As you suggested the reset operation can slide the 
array buffer contents over, and adjust indexes referring into it.



________________________________
From: Steve Lawrence <slawre...@apache.org>
Sent: Wednesday, December 6, 2017 12:58:14 PM
To: dev@daffodil.apache.org; Mike Beckerle
Subject: Re: Removal of ByteBuffer backing in the I/O layer

Kindof. I was thinking that part of the resetForContinuedParse() method
could delete all buckets from the front of the ArrayBuffer of the
previous parse and move the last remaining buckets for the next parse to
the front of the ArrayBuffer. So the ArrayBuffer will only grow to the
maximum number of buckets needed for the largest single parse.

During a parse, I imagined we'd remove buckets by just setting their
ArrayBuffer index to null and let them be garbage collected. So this
would mean if lots of Buckets were freed, we'd still have a lot of empty
space being taken up for that parse at the front of the ArrayBuffer.
Ideally we'd use something like a mutable.Queue, but I don't think they
allow constant time indexing which is pretty important for finding
buckets quickly. Without creating our own IndexedQueue or something
similar, ArrayBuffer is probably the quickest way to get going.

But with the ArrayBuffer if the stream contained mostly small messages
and one really large one, that could mean a good amount of wasted space
at the end. So we could add a tunable that if the ArrayBuffer grows to
some size and has some amount empty for some number of parses, then we
could start a new ArrayBuffer so the memory could be freed up. It's not
clear what those numbers should be, or if this even matters too much in
practice, so this could maybe use some extra thought. This could be
useful for other things like Pools which will grow to use but never
shrink again.

On 12/06/2017 12:32 PM, Mike Beckerle wrote:
> The ref counting sounds like a good way to manage this.
>
>
> Small issue, but will your arraybuffer grow without bound here? I.e., size N 
> where N is total number of buckets used? This is still going to be massively 
> less than current input buffering, but it is a slow leak.
>
> ________________________________
> From: Steve Lawrence <slawre...@apache.org>
> Sent: Wednesday, December 6, 2017 11:54:40 AM
> To: dev@daffodil.apache.org; Mike Beckerle
> Subject: Re: Removal of ByteBuffer backing in the I/O layer
>
> Would commit() be called whenever the oldest mark is returned? So the
> stack you mention is essentially the stack of given out marks? Doesn't
> seem unreasonable. Maybe adds a some complexity to manage that stack?
>
> Another option, which I think might avoid the stack would be to just
> take advantage of the existing mark capability + some refcounting to
> determine when to clean things up. When a mark is taken, whatever bucket
> that mark is in increments a refcount. And when the mark is returned
> (either via a reset or discard), the refcount is decremented. When a
> refcount is zero, we can remove the bucket and any others with a zero
> refcount. So something like this:
>
> class Bucket {
>   val data = new ArrayBuffer[Array[Byte]](bucketSize)
>   var refCount = 0
> }
>
> trait BucketingDataInputStream { self: DataInputStream
>
>    private val buckets = new ArrayBuffer[Bucket]
>
>    override def mark(requestorID: String): DataInputStream.Mark = {
>      val mark = super.mark(requestorID)
>      val markedBucket = // figure out bucket used by mark
>      buckets(markedBucket).refCount += 1
>      mark
>    }
>
>    override def discard(mark: DataInputStream.Mark) = {
>      val markedBucket = // figured out bucket used by mark
>      buckets(markedBucket).refCount -= 1
>      while (...) {
>        // remove all buckets from the front of the array until we reach
>        // one with a refCount > 0 or has the current bytePosition in it
>      }
>
>      super.discard(mark)
>    }
>
>    ...
> }
>
> Makes the logic a bit more complex since it needs to know about
> mark/discard/reset/etc., but means things are a little less complex
> regarding the stack method and managing it and knowing when to commit.
> Not sure which one is better without putting more thought into it. But I
> agree that this capability is needed, and I don't think it would require
> a tone of extra effort.
>
> On 12/06/2017 10:36 AM, Mike Beckerle wrote:
>> Additional operation is to support backtracking. The current single buffer 
>> design makes this easy - you just save/restore the position and limit, and 
>> restore after. With a bucketing algorithm the backtracking is trickier, 
>> because it is resolution of a point of uncertainty (PoU) that allows one to 
>> drop a mark and thereby free up storage.
>>
>>
>> Unlike a java BufferedInputStream we need a quasi-stack of marks. I say 
>> Quasi because we also have a commit operation which is used when a PoU is 
>> resolved.
>>
>>
>> So I think there is a mark/reset and this "commit" (no more going back to 
>> this mark point) operation as well.
>>
>> ________________________________
>> From: Steve Lawrence <slawre...@apache.org>
>> Sent: Wednesday, December 6, 2017 8:49:55 AM
>> To: dev@daffodil.apache.org
>> Subject: Removal of ByteBuffer backing in the I/O layer
>>
>> Currently, our I/O layer only supports parsing data in the form of a
>> ByteBuffer. Note that although our API does support other inputs (e.g.
>> Channels, InputStreams, Files), internally these are all
>> converted to a ByteBuffer. This has a few problems. Due to ByteBuffer's
>> use of integer positions, it can only support a maximum size of 2GB.
>> This also prevents streaming data as well as discarding old data that
>> we can determine won't ever be needed again during a parse to reduce
>> memory usage. So coming up with changes to our I/O layer to not rely on
>> ByteBuffer is important if we want to support larger files and improve
>> performance.
>>
>> The features we currently use on the ByteBuffer are:
>>
>>   1. Set and get byte position
>>   2. Set and get byte limit
>>   3. Set and get byte order
>>   4. Convert buffer to a FloatBuffer to get a float
>>   5. Convert buffer to a DoubleBuffer to get a double
>>   6. Decode into a char buffer
>>   7. Get a single byte starting at current position
>>   8. Get byte array of length starting at current position
>>
>> So we need to replace these features. The ByteBufferDataInputStream
>> already understands byte/bit position and byte/bitLimit (features 1 and
>> 2), so we do not need to do much here aside from perhaps add some
>> additional checks if we rely on the ByteBuffer to tell us about
>> when the limit was reached, for example. Byte order (feature 3) is
>> essentially just flipping bytes around, so that's something we can
>> easily handle ourselves. Converting to a float, double, or char
>> (features 4, 5, 6) is straightforward enough once we have the right
>> bytes. So all that really needs to be modified is how to get a single
>> byte or byte array (features 7 and  8). To support this, I propose that
>> the DataInputStream trait needs new abstract methods that concrete
>> classes must implement for how best to get bytes from the input data,
>> they are:
>>
>> trait DataInputStream {
>>
>>   ...
>>
>>   /**
>>    * Return a single byte at the current byte position
>>    */
>>   def getByte(): Int
>>
>>   /**
>>    * Read a given length of bytes starting at the current byte position,
>>    * filling in a pre-allocated array starting at an offset. Returns the
>>    * number of bytes read.
>>    */
>>   def getBytes(b: Array[Byte], off: Int, len: Int): Int
>>
>>   ...
>> }
>>
>> These are based off of the InputStream read() methods. So our
>> ByteBufferDataInpustream (renamed to DataInputStreamImpl since it no
>> longer depends on a ByteBuffer?) only needs to implement these two
>> functions, and make some modifications to use these when getting
>> floats/double/chars/etc.
>>
>> But we still need to get rid of the ByteBuffer as the input data for
>> reasons mentioned above. The data could come from something simple like
>> an Array[Byte] (implementing these functions would be fairly
>> straightforward) or something more complex like an InputStream or
>> Channel. In such cases, we need to implement some sort of caching
>> mechanism to allow for backtracking the streams. A bucketing algorithm
>> allows this, as well as allows files greater than 2GB and optimizations
>> to remove old data that are no longer needed. To support this, I propose
>> the following:
>>
>> A new BucketingDataInputStream trait is created which implements a
>> bucketing algorithm used for streaming inputs. This includes the
>> getByte/getBytes methods to read from the stream and cache into buckets
>> and return the appropriate values. So I imagine something like this:
>>
>> trait BucketingDataInputStream { self: DataInputStream
>>
>>   private val buckets = new ArrayBuffer[Array[Byte]]
>>
>>   abstract def getBytesForBucket(b: Array[Byte], len: Int): Int
>>
>>   final override def getByte(): Int = {
>>     val currentPos = self.bytePosition0b
>>     if (!byteAtPositionIsInBuckets(currentPos))  {
>>       // get more bytes for a new bucket
>>       val newBucket = new Array(bucketSize)
>>       val read = getBytesForBucket(newBucket, bucketSize)
>>       buckets += newBucket
>>     }
>>
>>     // return the cached byte out of the bucket
>>   }
>>
>>   final override def getBytes(...) = ...
>> }
>>
>> Then we can create various implementations based on the input type, e.g.
>>
>> class DaffodilDataInputStream(is: InputStream)
>>   extends DataInputStreamImpl
>>   with BucketingDataInputStream {
>>
>>   override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
>>     is.read(arr, 0, len)
>>   }
>> }
>>
>> class DaffodilDataInputChannel(ch: ReadableByteChannel)
>>   extends DaffodilDataInputImpl
>>   with BucketingDaffodilDataInput {
>>
>>   override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
>>     var bb = ByteBuffer.wrap(b, 0, len)
>>     ch.read(bb)
>>   }
>> }
>>
>> Adding new input types just requires implementing a function to read X
>> amount of bytes into an array and the BucketingDataInputStream handles
>> the bucketing algorithm and the DataInputStreamImpl handles converting
>> bytes to the appropriate type, so new input types should not require too
>> much effort.
>>
>> Finally, we can have an object to create the various types of
>> input streams, e.g.:
>>
>> object DaffodilInputData {
>>   def apply(ba: Array[Byte]) = new DaffodilInputDataArray(ba)
>>   def apply(is: InputStream) = new DaffodilInputDataStream(is)
>>   def apply(ch: ReadableByteChannel) = new DaffodilInputDataChannel(ch)
>>   ...
>> }
>>
>> This part could be all that is visible to the Java and Scala APIs and
>> used to support a streaming API mentioned in other discussions. Some
>> additional changes may be needed to support starting a new parse from
>> where a previous DaffodilInputData ended, but I think that mostly just
>> requires a new reset function to reset things like bitPosition and
>> bitLimit to zero, and clean up old bucket state. A reset() method
>> already exists for resettting back to a mark, so a different name is
>> needed, maybe something like the following is added to the
>> DataInputStream trait:
>>
>>   /**
>>    * Reset internal state of the DataInputStream so that a call to
>>    * parse() could continue parsing at the current byte position.
>>    */
>>   def resetForContinuedParse()
>>
>> I don't really like that name, but I think the concept is needed. This
>> function would be called at the beginning of each parse in the doParse()
>> method.
>>
>
>

Reply via email to