Currently, our I/O layer only supports parsing data in the form of a ByteBuffer. Note that although our API does support other inputs (e.g. Channels, InputStreams, Files), internally these are all converted to a ByteBuffer. This has a few problems. Due to ByteBuffer's use of integer positions, it can only support a maximum size of 2GB. This also prevents streaming data as well as discarding old data that we can determine won't ever be needed again during a parse to reduce memory usage. So coming up with changes to our I/O layer to not rely on ByteBuffer is important if we want to support larger files and improve performance.
The features we currently use on the ByteBuffer are: 1. Set and get byte position 2. Set and get byte limit 3. Set and get byte order 4. Convert buffer to a FloatBuffer to get a float 5. Convert buffer to a DoubleBuffer to get a double 6. Decode into a char buffer 7. Get a single byte starting at current position 8. Get byte array of length starting at current position So we need to replace these features. The ByteBufferDataInputStream already understands byte/bit position and byte/bitLimit (features 1 and 2), so we do not need to do much here aside from perhaps add some additional checks if we rely on the ByteBuffer to tell us about when the limit was reached, for example. Byte order (feature 3) is essentially just flipping bytes around, so that's something we can easily handle ourselves. Converting to a float, double, or char (features 4, 5, 6) is straightforward enough once we have the right bytes. So all that really needs to be modified is how to get a single byte or byte array (features 7 and 8). To support this, I propose that the DataInputStream trait needs new abstract methods that concrete classes must implement for how best to get bytes from the input data, they are: trait DataInputStream { ... /** * Return a single byte at the current byte position */ def getByte(): Int /** * Read a given length of bytes starting at the current byte position, * filling in a pre-allocated array starting at an offset. Returns the * number of bytes read. */ def getBytes(b: Array[Byte], off: Int, len: Int): Int ... } These are based off of the InputStream read() methods. So our ByteBufferDataInpustream (renamed to DataInputStreamImpl since it no longer depends on a ByteBuffer?) only needs to implement these two functions, and make some modifications to use these when getting floats/double/chars/etc. But we still need to get rid of the ByteBuffer as the input data for reasons mentioned above. The data could come from something simple like an Array[Byte] (implementing these functions would be fairly straightforward) or something more complex like an InputStream or Channel. In such cases, we need to implement some sort of caching mechanism to allow for backtracking the streams. A bucketing algorithm allows this, as well as allows files greater than 2GB and optimizations to remove old data that are no longer needed. To support this, I propose the following: A new BucketingDataInputStream trait is created which implements a bucketing algorithm used for streaming inputs. This includes the getByte/getBytes methods to read from the stream and cache into buckets and return the appropriate values. So I imagine something like this: trait BucketingDataInputStream { self: DataInputStream private val buckets = new ArrayBuffer[Array[Byte]] abstract def getBytesForBucket(b: Array[Byte], len: Int): Int final override def getByte(): Int = { val currentPos = self.bytePosition0b if (!byteAtPositionIsInBuckets(currentPos)) { // get more bytes for a new bucket val newBucket = new Array(bucketSize) val read = getBytesForBucket(newBucket, bucketSize) buckets += newBucket } // return the cached byte out of the bucket } final override def getBytes(...) = ... } Then we can create various implementations based on the input type, e.g. class DaffodilDataInputStream(is: InputStream) extends DataInputStreamImpl with BucketingDataInputStream { override def getBytesForBucket(b: Array[Byte], len: Int): Int = { is.read(arr, 0, len) } } class DaffodilDataInputChannel(ch: ReadableByteChannel) extends DaffodilDataInputImpl with BucketingDaffodilDataInput { override def getBytesForBucket(b: Array[Byte], len: Int): Int = { var bb = ByteBuffer.wrap(b, 0, len) ch.read(bb) } } Adding new input types just requires implementing a function to read X amount of bytes into an array and the BucketingDataInputStream handles the bucketing algorithm and the DataInputStreamImpl handles converting bytes to the appropriate type, so new input types should not require too much effort. Finally, we can have an object to create the various types of input streams, e.g.: object DaffodilInputData { def apply(ba: Array[Byte]) = new DaffodilInputDataArray(ba) def apply(is: InputStream) = new DaffodilInputDataStream(is) def apply(ch: ReadableByteChannel) = new DaffodilInputDataChannel(ch) ... } This part could be all that is visible to the Java and Scala APIs and used to support a streaming API mentioned in other discussions. Some additional changes may be needed to support starting a new parse from where a previous DaffodilInputData ended, but I think that mostly just requires a new reset function to reset things like bitPosition and bitLimit to zero, and clean up old bucket state. A reset() method already exists for resettting back to a mark, so a different name is needed, maybe something like the following is added to the DataInputStream trait: /** * Reset internal state of the DataInputStream so that a call to * parse() could continue parsing at the current byte position. */ def resetForContinuedParse() I don't really like that name, but I think the concept is needed. This function would be called at the beginning of each parse in the doParse() method.