The ref counting sounds like a good way to manage this.

Small issue, but will your arraybuffer grow without bound here? I.e., size N 
where N is total number of buckets used? This is still going to be massively 
less than current input buffering, but it is a slow leak.

________________________________
From: Steve Lawrence <slawre...@apache.org>
Sent: Wednesday, December 6, 2017 11:54:40 AM
To: dev@daffodil.apache.org; Mike Beckerle
Subject: Re: Removal of ByteBuffer backing in the I/O layer

Would commit() be called whenever the oldest mark is returned? So the
stack you mention is essentially the stack of given out marks? Doesn't
seem unreasonable. Maybe adds a some complexity to manage that stack?

Another option, which I think might avoid the stack would be to just
take advantage of the existing mark capability + some refcounting to
determine when to clean things up. When a mark is taken, whatever bucket
that mark is in increments a refcount. And when the mark is returned
(either via a reset or discard), the refcount is decremented. When a
refcount is zero, we can remove the bucket and any others with a zero
refcount. So something like this:

class Bucket {
  val data = new ArrayBuffer[Array[Byte]](bucketSize)
  var refCount = 0
}

trait BucketingDataInputStream { self: DataInputStream

   private val buckets = new ArrayBuffer[Bucket]

   override def mark(requestorID: String): DataInputStream.Mark = {
     val mark = super.mark(requestorID)
     val markedBucket = // figure out bucket used by mark
     buckets(markedBucket).refCount += 1
     mark
   }

   override def discard(mark: DataInputStream.Mark) = {
     val markedBucket = // figured out bucket used by mark
     buckets(markedBucket).refCount -= 1
     while (...) {
       // remove all buckets from the front of the array until we reach
       // one with a refCount > 0 or has the current bytePosition in it
     }

     super.discard(mark)
   }

   ...
}

Makes the logic a bit more complex since it needs to know about
mark/discard/reset/etc., but means things are a little less complex
regarding the stack method and managing it and knowing when to commit.
Not sure which one is better without putting more thought into it. But I
agree that this capability is needed, and I don't think it would require
a tone of extra effort.

On 12/06/2017 10:36 AM, Mike Beckerle wrote:
> Additional operation is to support backtracking. The current single buffer 
> design makes this easy - you just save/restore the position and limit, and 
> restore after. With a bucketing algorithm the backtracking is trickier, 
> because it is resolution of a point of uncertainty (PoU) that allows one to 
> drop a mark and thereby free up storage.
>
>
> Unlike a java BufferedInputStream we need a quasi-stack of marks. I say Quasi 
> because we also have a commit operation which is used when a PoU is resolved.
>
>
> So I think there is a mark/reset and this "commit" (no more going back to 
> this mark point) operation as well.
>
> ________________________________
> From: Steve Lawrence <slawre...@apache.org>
> Sent: Wednesday, December 6, 2017 8:49:55 AM
> To: dev@daffodil.apache.org
> Subject: Removal of ByteBuffer backing in the I/O layer
>
> Currently, our I/O layer only supports parsing data in the form of a
> ByteBuffer. Note that although our API does support other inputs (e.g.
> Channels, InputStreams, Files), internally these are all
> converted to a ByteBuffer. This has a few problems. Due to ByteBuffer's
> use of integer positions, it can only support a maximum size of 2GB.
> This also prevents streaming data as well as discarding old data that
> we can determine won't ever be needed again during a parse to reduce
> memory usage. So coming up with changes to our I/O layer to not rely on
> ByteBuffer is important if we want to support larger files and improve
> performance.
>
> The features we currently use on the ByteBuffer are:
>
>   1. Set and get byte position
>   2. Set and get byte limit
>   3. Set and get byte order
>   4. Convert buffer to a FloatBuffer to get a float
>   5. Convert buffer to a DoubleBuffer to get a double
>   6. Decode into a char buffer
>   7. Get a single byte starting at current position
>   8. Get byte array of length starting at current position
>
> So we need to replace these features. The ByteBufferDataInputStream
> already understands byte/bit position and byte/bitLimit (features 1 and
> 2), so we do not need to do much here aside from perhaps add some
> additional checks if we rely on the ByteBuffer to tell us about
> when the limit was reached, for example. Byte order (feature 3) is
> essentially just flipping bytes around, so that's something we can
> easily handle ourselves. Converting to a float, double, or char
> (features 4, 5, 6) is straightforward enough once we have the right
> bytes. So all that really needs to be modified is how to get a single
> byte or byte array (features 7 and  8). To support this, I propose that
> the DataInputStream trait needs new abstract methods that concrete
> classes must implement for how best to get bytes from the input data,
> they are:
>
> trait DataInputStream {
>
>   ...
>
>   /**
>    * Return a single byte at the current byte position
>    */
>   def getByte(): Int
>
>   /**
>    * Read a given length of bytes starting at the current byte position,
>    * filling in a pre-allocated array starting at an offset. Returns the
>    * number of bytes read.
>    */
>   def getBytes(b: Array[Byte], off: Int, len: Int): Int
>
>   ...
> }
>
> These are based off of the InputStream read() methods. So our
> ByteBufferDataInpustream (renamed to DataInputStreamImpl since it no
> longer depends on a ByteBuffer?) only needs to implement these two
> functions, and make some modifications to use these when getting
> floats/double/chars/etc.
>
> But we still need to get rid of the ByteBuffer as the input data for
> reasons mentioned above. The data could come from something simple like
> an Array[Byte] (implementing these functions would be fairly
> straightforward) or something more complex like an InputStream or
> Channel. In such cases, we need to implement some sort of caching
> mechanism to allow for backtracking the streams. A bucketing algorithm
> allows this, as well as allows files greater than 2GB and optimizations
> to remove old data that are no longer needed. To support this, I propose
> the following:
>
> A new BucketingDataInputStream trait is created which implements a
> bucketing algorithm used for streaming inputs. This includes the
> getByte/getBytes methods to read from the stream and cache into buckets
> and return the appropriate values. So I imagine something like this:
>
> trait BucketingDataInputStream { self: DataInputStream
>
>   private val buckets = new ArrayBuffer[Array[Byte]]
>
>   abstract def getBytesForBucket(b: Array[Byte], len: Int): Int
>
>   final override def getByte(): Int = {
>     val currentPos = self.bytePosition0b
>     if (!byteAtPositionIsInBuckets(currentPos))  {
>       // get more bytes for a new bucket
>       val newBucket = new Array(bucketSize)
>       val read = getBytesForBucket(newBucket, bucketSize)
>       buckets += newBucket
>     }
>
>     // return the cached byte out of the bucket
>   }
>
>   final override def getBytes(...) = ...
> }
>
> Then we can create various implementations based on the input type, e.g.
>
> class DaffodilDataInputStream(is: InputStream)
>   extends DataInputStreamImpl
>   with BucketingDataInputStream {
>
>   override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
>     is.read(arr, 0, len)
>   }
> }
>
> class DaffodilDataInputChannel(ch: ReadableByteChannel)
>   extends DaffodilDataInputImpl
>   with BucketingDaffodilDataInput {
>
>   override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
>     var bb = ByteBuffer.wrap(b, 0, len)
>     ch.read(bb)
>   }
> }
>
> Adding new input types just requires implementing a function to read X
> amount of bytes into an array and the BucketingDataInputStream handles
> the bucketing algorithm and the DataInputStreamImpl handles converting
> bytes to the appropriate type, so new input types should not require too
> much effort.
>
> Finally, we can have an object to create the various types of
> input streams, e.g.:
>
> object DaffodilInputData {
>   def apply(ba: Array[Byte]) = new DaffodilInputDataArray(ba)
>   def apply(is: InputStream) = new DaffodilInputDataStream(is)
>   def apply(ch: ReadableByteChannel) = new DaffodilInputDataChannel(ch)
>   ...
> }
>
> This part could be all that is visible to the Java and Scala APIs and
> used to support a streaming API mentioned in other discussions. Some
> additional changes may be needed to support starting a new parse from
> where a previous DaffodilInputData ended, but I think that mostly just
> requires a new reset function to reset things like bitPosition and
> bitLimit to zero, and clean up old bucket state. A reset() method
> already exists for resettting back to a mark, so a different name is
> needed, maybe something like the following is added to the
> DataInputStream trait:
>
>   /**
>    * Reset internal state of the DataInputStream so that a call to
>    * parse() could continue parsing at the current byte position.
>    */
>   def resetForContinuedParse()
>
> I don't really like that name, but I think the concept is needed. This
> function would be called at the beginning of each parse in the doParse()
> method.
>

Reply via email to