Currently, our I/O layer only supports parsing data in the form of a
ByteBuffer. Note that although our API does support other inputs (e.g.
Channels, InputStreams, Files), internally these are all
converted to a ByteBuffer. This has a few problems. Due to ByteBuffer's
use of integer positions, it can only support a maximum size of 2GB.
This also prevents streaming data as well as discarding old data that
we can determine won't ever be needed again during a parse to reduce
memory usage. So coming up with changes to our I/O layer to not rely on
ByteBuffer is important if we want to support larger files and improve
performance.

The features we currently use on the ByteBuffer are:

  1. Set and get byte position
  2. Set and get byte limit
  3. Set and get byte order
  4. Convert buffer to a FloatBuffer to get a float
  5. Convert buffer to a DoubleBuffer to get a double
  6. Decode into a char buffer
  7. Get a single byte starting at current position
  8. Get byte array of length starting at current position

So we need to replace these features. The ByteBufferDataInputStream
already understands byte/bit position and byte/bitLimit (features 1 and
2), so we do not need to do much here aside from perhaps add some
additional checks if we rely on the ByteBuffer to tell us about
when the limit was reached, for example. Byte order (feature 3) is
essentially just flipping bytes around, so that's something we can
easily handle ourselves. Converting to a float, double, or char
(features 4, 5, 6) is straightforward enough once we have the right
bytes. So all that really needs to be modified is how to get a single
byte or byte array (features 7 and  8). To support this, I propose that
the DataInputStream trait needs new abstract methods that concrete
classes must implement for how best to get bytes from the input data,
they are:

trait DataInputStream {

  ...

  /**
   * Return a single byte at the current byte position
   */
  def getByte(): Int

  /**
   * Read a given length of bytes starting at the current byte position,
   * filling in a pre-allocated array starting at an offset. Returns the
   * number of bytes read.
   */
  def getBytes(b: Array[Byte], off: Int, len: Int): Int

  ...
}

These are based off of the InputStream read() methods. So our
ByteBufferDataInpustream (renamed to DataInputStreamImpl since it no
longer depends on a ByteBuffer?) only needs to implement these two
functions, and make some modifications to use these when getting
floats/double/chars/etc.

But we still need to get rid of the ByteBuffer as the input data for
reasons mentioned above. The data could come from something simple like
an Array[Byte] (implementing these functions would be fairly
straightforward) or something more complex like an InputStream or
Channel. In such cases, we need to implement some sort of caching
mechanism to allow for backtracking the streams. A bucketing algorithm
allows this, as well as allows files greater than 2GB and optimizations
to remove old data that are no longer needed. To support this, I propose
the following:

A new BucketingDataInputStream trait is created which implements a
bucketing algorithm used for streaming inputs. This includes the
getByte/getBytes methods to read from the stream and cache into buckets
and return the appropriate values. So I imagine something like this:

trait BucketingDataInputStream { self: DataInputStream

  private val buckets = new ArrayBuffer[Array[Byte]]

  abstract def getBytesForBucket(b: Array[Byte], len: Int): Int

  final override def getByte(): Int = {
    val currentPos = self.bytePosition0b
    if (!byteAtPositionIsInBuckets(currentPos))  {
      // get more bytes for a new bucket
      val newBucket = new Array(bucketSize)
      val read = getBytesForBucket(newBucket, bucketSize)
      buckets += newBucket
    }

    // return the cached byte out of the bucket
  }

  final override def getBytes(...) = ...
}

Then we can create various implementations based on the input type, e.g.

class DaffodilDataInputStream(is: InputStream)
  extends DataInputStreamImpl
  with BucketingDataInputStream {

  override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
    is.read(arr, 0, len)
  }
}

class DaffodilDataInputChannel(ch: ReadableByteChannel)
  extends DaffodilDataInputImpl
  with BucketingDaffodilDataInput {

  override def getBytesForBucket(b: Array[Byte], len: Int): Int = {
    var bb = ByteBuffer.wrap(b, 0, len)
    ch.read(bb)
  }
}

Adding new input types just requires implementing a function to read X
amount of bytes into an array and the BucketingDataInputStream handles
the bucketing algorithm and the DataInputStreamImpl handles converting
bytes to the appropriate type, so new input types should not require too
much effort.

Finally, we can have an object to create the various types of
input streams, e.g.:

object DaffodilInputData {
  def apply(ba: Array[Byte]) = new DaffodilInputDataArray(ba)
  def apply(is: InputStream) = new DaffodilInputDataStream(is)
  def apply(ch: ReadableByteChannel) = new DaffodilInputDataChannel(ch)
  ...
}

This part could be all that is visible to the Java and Scala APIs and
used to support a streaming API mentioned in other discussions. Some
additional changes may be needed to support starting a new parse from
where a previous DaffodilInputData ended, but I think that mostly just
requires a new reset function to reset things like bitPosition and
bitLimit to zero, and clean up old bucket state. A reset() method
already exists for resettting back to a mark, so a different name is
needed, maybe something like the following is added to the
DataInputStream trait:

  /**
   * Reset internal state of the DataInputStream so that a call to
   * parse() could continue parsing at the current byte position.
   */
  def resetForContinuedParse()

I don't really like that name, but I think the concept is needed. This
function would be called at the beginning of each parse in the doParse()
method.

Reply via email to