The ref counting sounds like a good way to manage this.
Small issue, but will your arraybuffer grow without bound here? I.e., size N where N is total number of buckets used? This is still going to be massively less than current input buffering, but it is a slow leak. ________________________________ From: Steve Lawrence <slawre...@apache.org> Sent: Wednesday, December 6, 2017 11:54:40 AM To: dev@daffodil.apache.org; Mike Beckerle Subject: Re: Removal of ByteBuffer backing in the I/O layer Would commit() be called whenever the oldest mark is returned? So the stack you mention is essentially the stack of given out marks? Doesn't seem unreasonable. Maybe adds a some complexity to manage that stack? Another option, which I think might avoid the stack would be to just take advantage of the existing mark capability + some refcounting to determine when to clean things up. When a mark is taken, whatever bucket that mark is in increments a refcount. And when the mark is returned (either via a reset or discard), the refcount is decremented. When a refcount is zero, we can remove the bucket and any others with a zero refcount. So something like this: class Bucket { val data = new ArrayBuffer[Array[Byte]](bucketSize) var refCount = 0 } trait BucketingDataInputStream { self: DataInputStream private val buckets = new ArrayBuffer[Bucket] override def mark(requestorID: String): DataInputStream.Mark = { val mark = super.mark(requestorID) val markedBucket = // figure out bucket used by mark buckets(markedBucket).refCount += 1 mark } override def discard(mark: DataInputStream.Mark) = { val markedBucket = // figured out bucket used by mark buckets(markedBucket).refCount -= 1 while (...) { // remove all buckets from the front of the array until we reach // one with a refCount > 0 or has the current bytePosition in it } super.discard(mark) } ... } Makes the logic a bit more complex since it needs to know about mark/discard/reset/etc., but means things are a little less complex regarding the stack method and managing it and knowing when to commit. Not sure which one is better without putting more thought into it. But I agree that this capability is needed, and I don't think it would require a tone of extra effort. On 12/06/2017 10:36 AM, Mike Beckerle wrote: > Additional operation is to support backtracking. The current single buffer > design makes this easy - you just save/restore the position and limit, and > restore after. With a bucketing algorithm the backtracking is trickier, > because it is resolution of a point of uncertainty (PoU) that allows one to > drop a mark and thereby free up storage. > > > Unlike a java BufferedInputStream we need a quasi-stack of marks. I say Quasi > because we also have a commit operation which is used when a PoU is resolved. > > > So I think there is a mark/reset and this "commit" (no more going back to > this mark point) operation as well. > > ________________________________ > From: Steve Lawrence <slawre...@apache.org> > Sent: Wednesday, December 6, 2017 8:49:55 AM > To: dev@daffodil.apache.org > Subject: Removal of ByteBuffer backing in the I/O layer > > Currently, our I/O layer only supports parsing data in the form of a > ByteBuffer. Note that although our API does support other inputs (e.g. > Channels, InputStreams, Files), internally these are all > converted to a ByteBuffer. This has a few problems. Due to ByteBuffer's > use of integer positions, it can only support a maximum size of 2GB. > This also prevents streaming data as well as discarding old data that > we can determine won't ever be needed again during a parse to reduce > memory usage. So coming up with changes to our I/O layer to not rely on > ByteBuffer is important if we want to support larger files and improve > performance. > > The features we currently use on the ByteBuffer are: > > 1. Set and get byte position > 2. Set and get byte limit > 3. Set and get byte order > 4. Convert buffer to a FloatBuffer to get a float > 5. Convert buffer to a DoubleBuffer to get a double > 6. Decode into a char buffer > 7. Get a single byte starting at current position > 8. Get byte array of length starting at current position > > So we need to replace these features. The ByteBufferDataInputStream > already understands byte/bit position and byte/bitLimit (features 1 and > 2), so we do not need to do much here aside from perhaps add some > additional checks if we rely on the ByteBuffer to tell us about > when the limit was reached, for example. Byte order (feature 3) is > essentially just flipping bytes around, so that's something we can > easily handle ourselves. Converting to a float, double, or char > (features 4, 5, 6) is straightforward enough once we have the right > bytes. So all that really needs to be modified is how to get a single > byte or byte array (features 7 and 8). To support this, I propose that > the DataInputStream trait needs new abstract methods that concrete > classes must implement for how best to get bytes from the input data, > they are: > > trait DataInputStream { > > ... > > /** > * Return a single byte at the current byte position > */ > def getByte(): Int > > /** > * Read a given length of bytes starting at the current byte position, > * filling in a pre-allocated array starting at an offset. Returns the > * number of bytes read. > */ > def getBytes(b: Array[Byte], off: Int, len: Int): Int > > ... > } > > These are based off of the InputStream read() methods. So our > ByteBufferDataInpustream (renamed to DataInputStreamImpl since it no > longer depends on a ByteBuffer?) only needs to implement these two > functions, and make some modifications to use these when getting > floats/double/chars/etc. > > But we still need to get rid of the ByteBuffer as the input data for > reasons mentioned above. The data could come from something simple like > an Array[Byte] (implementing these functions would be fairly > straightforward) or something more complex like an InputStream or > Channel. In such cases, we need to implement some sort of caching > mechanism to allow for backtracking the streams. A bucketing algorithm > allows this, as well as allows files greater than 2GB and optimizations > to remove old data that are no longer needed. To support this, I propose > the following: > > A new BucketingDataInputStream trait is created which implements a > bucketing algorithm used for streaming inputs. This includes the > getByte/getBytes methods to read from the stream and cache into buckets > and return the appropriate values. So I imagine something like this: > > trait BucketingDataInputStream { self: DataInputStream > > private val buckets = new ArrayBuffer[Array[Byte]] > > abstract def getBytesForBucket(b: Array[Byte], len: Int): Int > > final override def getByte(): Int = { > val currentPos = self.bytePosition0b > if (!byteAtPositionIsInBuckets(currentPos)) { > // get more bytes for a new bucket > val newBucket = new Array(bucketSize) > val read = getBytesForBucket(newBucket, bucketSize) > buckets += newBucket > } > > // return the cached byte out of the bucket > } > > final override def getBytes(...) = ... > } > > Then we can create various implementations based on the input type, e.g. > > class DaffodilDataInputStream(is: InputStream) > extends DataInputStreamImpl > with BucketingDataInputStream { > > override def getBytesForBucket(b: Array[Byte], len: Int): Int = { > is.read(arr, 0, len) > } > } > > class DaffodilDataInputChannel(ch: ReadableByteChannel) > extends DaffodilDataInputImpl > with BucketingDaffodilDataInput { > > override def getBytesForBucket(b: Array[Byte], len: Int): Int = { > var bb = ByteBuffer.wrap(b, 0, len) > ch.read(bb) > } > } > > Adding new input types just requires implementing a function to read X > amount of bytes into an array and the BucketingDataInputStream handles > the bucketing algorithm and the DataInputStreamImpl handles converting > bytes to the appropriate type, so new input types should not require too > much effort. > > Finally, we can have an object to create the various types of > input streams, e.g.: > > object DaffodilInputData { > def apply(ba: Array[Byte]) = new DaffodilInputDataArray(ba) > def apply(is: InputStream) = new DaffodilInputDataStream(is) > def apply(ch: ReadableByteChannel) = new DaffodilInputDataChannel(ch) > ... > } > > This part could be all that is visible to the Java and Scala APIs and > used to support a streaming API mentioned in other discussions. Some > additional changes may be needed to support starting a new parse from > where a previous DaffodilInputData ended, but I think that mostly just > requires a new reset function to reset things like bitPosition and > bitLimit to zero, and clean up old bucket state. A reset() method > already exists for resettting back to a mark, so a different name is > needed, maybe something like the following is added to the > DataInputStream trait: > > /** > * Reset internal state of the DataInputStream so that a call to > * parse() could continue parsing at the current byte position. > */ > def resetForContinuedParse() > > I don't really like that name, but I think the concept is needed. This > function would be called at the beginning of each parse in the doParse() > method. >