This is an automated email from the ASF dual-hosted git repository.

olabusayo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-daffodil.git


The following commit(s) were added to refs/heads/master by this push:
     new c38a551  Add SAX Unparse Event Batching
c38a551 is described below

commit c38a551f353aed53479c60d38c4929743f82daf7
Author: olabusayoT <[email protected]>
AuthorDate: Fri Nov 6 17:04:55 2020 -0500

    Add SAX Unparse Event Batching
    
    - add saxUnparseEventBatchSize tunable with default 100
    - update coroutine to not expect a generic type wrapped in a Try, but any 
generic event; the implementation can then pass in whatever they wish
    - Add tests for tunables and batching tests
    
    DAFFODIL-2383
---
 .../processor/TestSAXParseUnparseAPI.scala         |   7 +-
 .../daffodil/processor/TestSAXUnparseAPI.scala     |  24 +++
 .../org/apache/daffodil/util/Coroutines.scala      |  26 ++--
 .../resources/org/apache/daffodil/xsd/dafext.xsd   |  12 ++
 .../apache/daffodil/api/DFDLParserUnparser.scala   |  17 ++-
 .../daffodil/infoset/SAXInfosetInputter.scala      | 125 ++++++++-------
 .../processors/DaffodilUnparseContentHandler.scala | 170 ++++++++++++++-------
 .../section00/general/testUnparserSAX.tdml         | 157 +++++++++++++++++++
 .../section00/general/TestUnparserSAX.scala        |  39 +++++
 9 files changed, 449 insertions(+), 128 deletions(-)

diff --git 
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
 
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
index 4126cf2..388fc5f 100644
--- 
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
+++ 
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
@@ -53,10 +53,11 @@ object TestSAXParseUnparseAPI {
   val testInfosetString: String = testInfoset.toString()
   val testData = "910"
 
-  lazy val dp: DataProcessor = testDataprocessor(testSchema)
+  lazy val dp: DataProcessor = testDataProcessor(testSchema)
+
+  def testDataProcessor(testSchema: scala.xml.Elem, tunablesArg: Map[String, 
String] = Map.empty): DataProcessor = {
+    val schemaCompiler = Compiler().withTunables(tunablesArg)
 
-  def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
-    val schemaCompiler = Compiler()
     val pf = schemaCompiler.compileNode(testSchema)
     if (pf.isError) {
       val msgs = pf.getDiagnostics.map { _.getMessage() }.mkString("\n")
diff --git 
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
 
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
index 198fd51..de6f263 100644
--- 
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
+++ 
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
@@ -20,7 +20,10 @@ package org.apache.daffodil.processor
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 
+import scala.xml.SAXException
+
 import javax.xml.parsers.SAXParserFactory
+import org.apache.daffodil.Implicits.intercept
 import org.apache.daffodil.xml.XMLUtils
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertTrue
@@ -46,6 +49,27 @@ class TestSAXUnparseAPI {
     assertEquals(testData, bao.toString)
   }
 
+  /**
+   * Test the case when a user supplies 0 as the batch size. Minimum batchsize 
must be 1
+   */
+  @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0(): 
Unit = {
+    val dpT = testDataProcessor(testSchema, Map("saxUnparseEventBatchSize" -> 
"0"))
+    val xmlReader: XMLReader = 
SAXParserFactory.newInstance.newSAXParser.getXMLReader
+    val bao = new ByteArrayOutputStream()
+    val wbc = java.nio.channels.Channels.newChannel(bao)
+    val unparseContentHandler = dpT.newContentHandlerInstance(wbc)
+    xmlReader.setContentHandler(unparseContentHandler)
+    xmlReader.setFeature(XMLUtils.SAX_NAMESPACES_FEATURE, true)
+    xmlReader.setFeature(XMLUtils.SAX_NAMESPACE_PREFIXES_FEATURE, true)
+    val bai = new ByteArrayInputStream(testInfosetString.getBytes)
+    val e = intercept[SAXException] {
+      xmlReader.parse(new InputSource(bai))
+    }
+    val eMsg = e.getMessage
+    assertTrue(eMsg.contains("invalid saxUnparseEventBatchSize"))
+    assertTrue(eMsg.contains("minimum value is 1"))
+  }
+
   @Test def testUnparseContentHandler_unparse_namespace_feature(): Unit = {
     val xmlReader: XMLReader = 
SAXParserFactory.newInstance.newSAXParser.getXMLReader
     val bao = new ByteArrayOutputStream()
diff --git 
a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala 
b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
index b652915..4b06b8f 100644
--- a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
+++ b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
@@ -34,11 +34,16 @@
   *
   * Definition of Coroutine - separate stacks, but NO CONCURRENCY. Only one
   * of a set of coroutines is running at any given time.
+ *
+  * The queueCapacity being set to 1 ensures the NO CONCURRENCY property of 
this trait, so to reduce
+  * the context-switching overhead is implementation specific, and can be done 
by passing in a
+  * larger data structure containing multiple event for the Coroutine generic 
Type, rather than
+  * enlarging the queue size
   */
  trait Coroutine[T] {
 
    private val queueCapacity: Int = 1
-   private val inboundQueue = new ArrayBlockingQueue[Try[T]](queueCapacity)
+   private val inboundQueue = new ArrayBlockingQueue[T](queueCapacity)
 
    private val self = this
 
@@ -69,25 +74,25 @@
     * and then terminates. The coroutine calling this must return from the 
run()
     * method immediately after calling this.
     */
-   final def resumeFinal(coroutine: Coroutine[T], in: Try[T]): Unit = {
+   final def resumeFinal(coroutine: Coroutine[T], in: T): Unit = {
      coroutine.init()
-     coroutine.inboundQueue.put(in) // allows other to run  final
+     coroutine.inboundQueue.put(in) // allows other to run final
    }
 
    /**
-    * Call when one co-routine wants to resume another, tranmitting a
+    * Call when one co-routine wants to resume another, transmitting a
     * argument value to it.
     *
     * The current co-routine will be suspended until it is resumed later.
     */
-   final def resume(coroutine: Coroutine[T], in: Try[T]): Try[T] = {
+   final def resume(coroutine: Coroutine[T], in: T): T = {
      resumeFinal(coroutine, in)
      val res = waitForResume() // blocks until it is resumed
      res
    }
 
-   final def waitForResume(): Try[T] = {
-     inboundQueue.take
+   final def waitForResume(): T = {
+     inboundQueue.take()
    }
 
    protected def run(): Unit
@@ -113,7 +118,7 @@
   * 
https://scalaenthusiast.wordpress.com/2013/06/12/transform-a-callback-function-to-an-iteratorlist-in-scala/
   */
 
- final class InvertControl[S](body: => Unit) extends Iterator[S] with 
Coroutine[S] {
+ final class InvertControl[S](body: => Unit) extends Iterator[S] with 
Coroutine[Try[S]] {
 
    private object EndMarker extends Throwable
    private val EndOfData = Failure(EndMarker)
@@ -127,7 +132,7 @@
     * After the last value is produced, the consumer is resumed with EndOfData
     * and the producer terminates.
     */
-   class Producer(val consumer: Coroutine[S]) extends Coroutine[S] {
+   class Producer(val consumer: Coroutine[Try[S]]) extends Coroutine[Try[S]] {
      override final def run(): Unit = {
        try {
          waitForResume()
@@ -171,8 +176,7 @@
    private lazy val iterator = gen.toIterator
 
    override def hasNext: Boolean = {
-     if (failed) false
-     else iterator.hasNext
+     !failed && iterator.hasNext
    }
    override def next(): S = {
      if (failed) throw new IllegalStateException()
diff --git 
a/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd 
b/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
index f7540c7..6a7548c 100644
--- a/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
+++ b/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
@@ -390,6 +390,18 @@
             </xs:documentation>
           </xs:annotation>
         </xs:element>
+        <xs:element name="saxUnparseEventBatchSize" type="xs:int" 
default="100" minOccurs="0">
+          <xs:annotation>
+            <xs:documentation>
+              Daffodil's SAX Unparse API allows events to be batched in memory 
to minimize the
+              frequency of context switching between the SAXInfosetInputter 
thread that processes
+              the events, and the DaffodilUnparseContentHandler thread that 
generates the events.
+              Setting this value to a low number will increase the frequency 
of context switching,
+              but will reduce the memory footprint. Swtting it to a high 
number will decrease the
+              frequency of context switching, but increase the memory 
footprint.
+            </xs:documentation>
+          </xs:annotation>
+        </xs:element>
         <xs:element name="suppressSchemaDefinitionWarnings" 
type="daf:TunableSuppressSchemaDefinitionWarnings" 
default="emptyElementParsePolicyError" minOccurs="0">
           <xs:annotation>
             <xs:documentation>
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
index a2b03a3..b0f27ca 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
@@ -269,14 +269,27 @@ object DFDL {
     }
   }
 
-  trait ProducerCoroutine extends Coroutine[SAXInfosetEvent] {
+  object SAXInfosetEvent {
+    def copyEvent(source: DFDL.SAXInfosetEvent, dest: DFDL.SAXInfosetEvent): 
Unit = {
+      if (source == null) dest.clear()
+      else {
+        dest.eventType = source.eventType
+        dest.namespaceURI = source.namespaceURI
+        dest.localName = source.localName
+        dest.nilValue = source.nilValue
+        dest.simpleText = source.simpleText
+      }
+    }
+  }
+
+  trait ProducerCoroutine extends Coroutine[Array[SAXInfosetEvent]] {
     override def isMain = true
     override protected def run(): Unit = {
       throw new Error("Main thread co-routine run method should not be 
called.")
     }
   }
 
-  trait ConsumerCoroutine extends Coroutine[SAXInfosetEvent]
+  trait ConsumerCoroutine extends Coroutine[Array[SAXInfosetEvent]]
 
   trait ParseResult extends Result with WithDiagnostics {
     def resultState: State
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
index 433e474..da3f45d 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
@@ -20,11 +20,10 @@ package org.apache.daffodil.infoset
 import java.net.URI
 import java.net.URISyntaxException
 
-import scala.util.Try
-
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
+import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.dpath.NodeInfo
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
@@ -35,25 +34,30 @@ import org.apache.daffodil.xml.XMLUtils
 
 /**
  * The SAXInfosetInputter consumes SAXInfosetEvent objects from the 
DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. 
This class contains two
- * SAXInfosetEvent objects, the current event the unparse method is processing 
and the next event
- * to be processed later.
+ * class and converts them to events that the DataProcessor unparse can use. 
This class contains an
+ * array of batched SAXInfosetEvent objects that it receives from the 
contentHandler and the index
+ * of the current element being processed.
  *
- * This class together with the DaffodilUnparseContentHandler use coroutines 
to ensure that only one event,
- * at a time, is passed between the two classes. The following is the general 
process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
+ * The following is the general process:
  *
- * - the run method is called, with a StartDocument event already loaded on 
the inputter's queue.
- * This is collected and stored in the currentEvent member
+ * - the run method is called, with the first batch of events, starting with 
the StartDocument event,
+ * already loaded on the inputter's queue.
+ * This is collected and stored in the batchedInfosetEvents member, and the 
currentIndex is set to 0
  * - The dp.unparse method is called, and it calls hasNext to make sure an 
event exists to be
- * processed and then queries the currentEvent. The hasNext call also queues 
the nextEvent by
- * transferring control to the contentHandler so it can load the next event.
- * - After it is done with the currentEvent, it calls inputter.next to get the 
next event, and that
- * copies the queued nextEvent into the currentEvent
- * - This process continues until the currentEvent contains an EndDocument 
event, at which point, the
- * nextEvent is cleared, endDocumentReceived is set to true and hasNext will 
return false
- * - This ends the unparse process, and the unparseResult and/or any Errors 
are set on the event. We
- * call resumeFinal passing along that element, terminating this thread and 
resuming the
- * contentHandler for the last time.
+ * processed and then queries the event at currentIndex. The hasNext call also 
checks that there is
+ * a next event to be processed (currentIndex+1), and if not, queues the next 
batch of events by
+ * transferring control to the contentHandler so it can load them.
+ * - After it is done with the current event, it calls inputter.next to get 
the next event, and that
+ * increments the currentIndex and cleans out the event at the previous index
+ * - This process continues until the event at currentIndex either contains an 
EndDocument event or
+ * the currentIndex is the last in the batch. If it is the former, the 
endDocumentReceived flag is
+ * set to true and hasNext will return false. If it is the latter, the next 
batch of events will be
+ * queued by transferring control to the contentHandler so it can load them.
+ * - This ends the unparse process, and the unparseResult and/or any Errors 
are set on a single element
+ * array containing response events. We call resumeFinal passing along that 
array, terminating this
+ * thread and resuming the contentHandler for the last time.
  *
  * @param unparseContentHandler producer coroutine that sends the 
SAXInfosetEvent to this class
  * @param dp dataprocessor that we use to kickstart the unparse process and 
that consumes the
@@ -76,18 +80,19 @@ class SAXInfosetInputter(
   private var resolveRelativeInfosetBlobURIs: Boolean = false
 
   private var endDocumentReceived = false
-  private lazy val currentEvent: DFDL.SAXInfosetEvent = new 
DFDL.SAXInfosetEvent
-  private lazy val nextEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
+  private var currentIndex: Int = 0
+  private var batchedInfosetEvents: Array[SAXInfosetEvent] = _
+  private lazy val returnedInfosetEvent: Array[SAXInfosetEvent] = new 
Array[SAXInfosetEvent](1)
 
-  override def getEventType(): InfosetInputterEventType = 
currentEvent.eventType.orNull
+  override def getEventType(): InfosetInputterEventType = 
batchedInfosetEvents(currentIndex).eventType.orNull
 
-  override def getLocalName(): String = currentEvent.localName.orNull
+  override def getLocalName(): String = 
batchedInfosetEvents(currentIndex).localName.orNull
 
-  override def getNamespaceURI(): String = currentEvent.namespaceURI.orNull
+  override def getNamespaceURI(): String = 
batchedInfosetEvents(currentIndex).namespaceURI.orNull
 
   override def getSimpleText(primType: NodeInfo.Kind): String = {
-    val res = if (currentEvent.simpleText.isDefined) {
-      currentEvent.simpleText.get
+    val res = if (batchedInfosetEvents(currentIndex).simpleText.isDefined) {
+      batchedInfosetEvents(currentIndex).simpleText.get
     } else {
       throw new NonTextFoundInSimpleContentException(getLocalName())
     }
@@ -104,8 +109,8 @@ class SAXInfosetInputter(
   }
 
   override def isNilled(): MaybeBoolean = {
-    val _isNilled = if (currentEvent.nilValue.isDefined) {
-      val nilValue = currentEvent.nilValue.get
+    val _isNilled = if (batchedInfosetEvents(currentIndex).nilValue.isDefined) 
{
+      val nilValue = batchedInfosetEvents(currentIndex).nilValue.get
       if (nilValue == "true" || nilValue == "1") {
         MaybeBoolean(true)
       } else if (nilValue == "false" || nilValue == "0") {
@@ -121,38 +126,45 @@ class SAXInfosetInputter(
   }
 
   override def hasNext(): Boolean = {
-    if (endDocumentReceived) false
-    else if (!nextEvent.isEmpty) true
-    else {
-      val event = this.resume(unparseContentHandler, Try(currentEvent))
-      copyEvent(source = event, dest = nextEvent)
+    val nextIndex = currentIndex + 1
+    if (endDocumentReceived) {
+      // if the current Element is EndDocument, then there is no valid next
+      false
+    } else if (batchedInfosetEvents != null && nextIndex < 
batchedInfosetEvents.length) {
+      // if we have not yet reached the end of the array and endDocument has 
not yet been received
+      true
+    }  else  {
+      // there is no nextEvent or it was empty, but we still have no 
EndDocument. So load the next
+      // batch from the contentHandler
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      batchedInfosetEvents = this.resume(unparseContentHandler, 
returnedInfosetEvent)
+      // we reset the index to 0 to guarantee that the last element we were 
looking at when hasNext
+      // was called is still the event we'll be looking at, when we leave this 
function. This is
+      // guaranteed because the DaffodilUnparseContentHandler moves the last 
element into the first
+      // index when it resumed.
+      currentIndex = 0
       true
     }
   }
 
   override def next(): Unit = {
     if (hasNext()) {
-      copyEvent(source = Try(nextEvent), dest = currentEvent)
-      nextEvent.clear()
-      if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived = 
true
+      // clear element at current index as we're done with it, except in the 
case we just loaded the
+      // new elements, then do nothing
+      batchedInfosetEvents(currentIndex).clear()
+
+      // increment current index to the next index
+      currentIndex += 1
+
+      // check if new current index is EndDocument
+      if (batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+        endDocumentReceived = true
+      }
     } else {
-      // we should never call next() if hasNext() is false
       Assert.abort()
     }
   }
 
-  def copyEvent(source: Try[DFDL.SAXInfosetEvent], dest: 
DFDL.SAXInfosetEvent): Unit= {
-    if (source.isFailure) dest.clear()
-    else {
-      val src = source.get
-      dest.eventType = src.eventType
-      dest.namespaceURI = src.namespaceURI
-      dest.localName = src.localName
-      dest.nilValue = src.nilValue
-      dest.simpleText = src.simpleText
-    }
-  }
-
   def enableResolutionOfRelativeInfosetBlobURIs(): Unit = 
resolveRelativeInfosetBlobURIs = true
 
   /**
@@ -183,22 +195,23 @@ class SAXInfosetInputter(
 
   override protected def run(): Unit = {
     try {
-      // startDocument kicks off this entire process, so it should be on the 
queue so the
-      // waitForResume call can grab it. That is set to our current event, so 
when hasNext is called
-      // the nextEvent after the StartDocument can be queued
-      copyEvent(source = this.waitForResume(), dest = currentEvent)
+      // startDocument kicks off this entire process, so the first batch of 
events, of which
+      // startDocument is first, should be on the queue so the waitForResume 
call can grab it.
+      // This populates the inputter.batchedInfosetEvents var for use by the 
rest of the Inputter
+      batchedInfosetEvents = this.waitForResume()
       val unparseResult = dp.unparse(this, output)
-      currentEvent.unparseResult = One(unparseResult)
+      batchedInfosetEvents(currentIndex).unparseResult = One(unparseResult)
       if (unparseResult.isError) {
         // unparseError is contained within unparseResult
-        currentEvent.causeError = One(new 
DaffodilUnparseErrorSAXException(unparseResult))
+        batchedInfosetEvents(currentIndex).causeError = One(new 
DaffodilUnparseErrorSAXException(unparseResult))
       }
     } catch {
       case e: Exception => {
-        currentEvent.causeError = One(new 
DaffodilUnhandledSAXException(e.getMessage, e))
+        batchedInfosetEvents(currentIndex).causeError = One(new 
DaffodilUnhandledSAXException(e.getMessage, e))
       }
     } finally {
-      this.resumeFinal(unparseContentHandler, Try(currentEvent))
+      returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+      this.resumeFinal(unparseContentHandler, returnedInfosetEvent)
     }
   }
 }
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
index 88b3554..b0257d3 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
@@ -17,13 +17,14 @@
 
 package org.apache.daffodil.processors
 
-import scala.util.Try
 import scala.xml.NamespaceBinding
 
 import javax.xml.XMLConstants
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
+import org.apache.daffodil.api.DFDL.SAXInfosetEvent
+import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
@@ -39,26 +40,29 @@ import org.xml.sax.Locator
 
 /**
  * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to an event that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
+ * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
  * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
  * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
  * support for XML Namespaces
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that only one event,
- * at a time, is passed between the two classes. The following is the general 
process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
+ * The following is the general process:
  *
  * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that is sent to
- * the SAXInfosetInputter. That event is put on the inputter's queue, this 
thread is paused, and
- * that inputter's thread is run
- * - when the SAXInfosetInputter is done processing an event and is ready for 
a new event, it
- * sends the completed event via the coroutine system, and loads it on the 
contentHandler's
- * queue, which restarts this thread and pauses that one. In the expected 
case, the events will
- * contain no new information, until the unparse is completed, otherwise it 
will contain error
- * information
- * -  this process continues until the EndDocument method is called. Once that 
SAXInfosetEvent is
- * sent to the inputter, it signals the end of events coming from the 
contentHandler. This
- * ends the unparseProcess and returns the event with the unparseResult and/or 
any error
+ * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
+ * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
+ * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
+ * this thread is paused, and that inputter's thread is run
+ * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
+ * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
+ * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
+ * the single element array will contain no new information until the unparse 
complete. In the case of
+ * an unexpected error though, it will contain error information
+ * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
+ * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
+ * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
+ * the unparseResult and/or any error
  * information
  *
  * @param dp dataprocessor object that will be used to call the parse
@@ -70,10 +74,48 @@ class DaffodilUnparseContentHandler(
   extends DFDL.DaffodilUnparseContentHandler {
   private lazy val inputter = new SAXInfosetInputter(this, dp, output)
   private var unparseResult: DFDL.UnparseResult = _
-  private lazy val infosetEvent: DFDL.SAXInfosetEvent = new 
DFDL.SAXInfosetEvent
   private lazy val characterData = new StringBuilder
   private var prefixMapping: NamespaceBinding = _
   private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
+   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
+   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
+   * batchSize, under the hood we'll batch [event1, event2].
+   *
+   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
+   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
+   * it will return true since event2 exists.
+   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
+   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
+   * of the current event ..., and hasNext()
+   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
+   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
+   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
+   * more events
+   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
+   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
+   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
+   * begins again with a call to next
+   *
+   * Without us having the extra buffer, things would happen like this:
+   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
+   *
+   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
+   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
+   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
+   * which, would overwrite event1 before we get to process it.
+   */
+  private lazy val actualBatchSize = tunablesBatchSize + 1
+  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
+    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; 
minimum value is 1")
+    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  }
+  private var currentIndex: Int = 0
+
   /**
    * This is a flag that is set to true when startPrefixMapping is called. 
When true, we make
    * the assumption that we don't need to use the Attributes parameter from 
startElement to get the
@@ -94,13 +136,13 @@ class DaffodilUnparseContentHandler(
   }
 
   override def startDocument(): Unit = {
-    infosetEvent.eventType = One(StartDocument)
-    sendToInputter()
+    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
+    maybeSendToInputter()
   }
 
   override def endDocument(): Unit = {
-    infosetEvent.eventType = One(EndDocument)
-    sendToInputter()
+    batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
+    maybeSendToInputter()
   }
 
   override def startPrefixMapping(prefix: String, uri: String): Unit = {
@@ -164,14 +206,14 @@ class DaffodilUnparseContentHandler(
       mapPrefixMappingFromAttributesImpl(atts)
     }
 
-    if (!infosetEvent.isEmpty && infosetEvent.localName.isDefined) {
+    if (!batchedInfosetEvents(currentIndex).isEmpty && 
batchedInfosetEvents(currentIndex).localName.isDefined) {
       // we started another element while we were in the process of building a 
startElement
       // this means the first element was complex and we are ready for the 
inputter queue
-      sendToInputter()
+      maybeSendToInputter()
     }
     // use Attributes to determine xsi:nil value
     val nilIn = atts.getIndex(XMLConstants.W3C_XML_SCHEMA_INSTANCE_NS_URI, 
"nil")
-    infosetEvent.nilValue = if (nilIn >= 0) {
+    batchedInfosetEvents(currentIndex).nilValue = if (nilIn >= 0) {
       val nilValue = atts.getValue(nilIn)
       One(nilValue)
     } else {
@@ -181,62 +223,78 @@ class DaffodilUnparseContentHandler(
     // set localName and namespaceURI
     setLocalNameAndNamespaceUri(uri, localName, qName)
 
-    infosetEvent.eventType = One(StartElement)
+    batchedInfosetEvents(currentIndex).eventType = One(StartElement)
   }
 
   override def endElement(uri: String, localName: String, qName: String): Unit 
= {
     // if infosetEvent is a startElement, send that first
-    if (infosetEvent.eventType.contains(StartElement)) {
+    if (batchedInfosetEvents(currentIndex).eventType.contains(StartElement)) {
       // any characterData that exists at this point is valid data as padding 
data has been
       // taken care of in startElement
       val maybeNewStr = One(characterData.toString)
-      infosetEvent.simpleText = maybeNewStr
+      batchedInfosetEvents(currentIndex).simpleText = maybeNewStr
       characterData.setLength(0)
-      sendToInputter()
+      maybeSendToInputter()
     }
 
+    batchedInfosetEvents(currentIndex).eventType = One(EndElement)
+
     // set localName and namespaceURI
     setLocalNameAndNamespaceUri(uri, localName, qName)
 
-    infosetEvent.eventType = One(EndElement)
-
     if (!contentHandlerPrefixMappingUsed) {
       // always pops
       prefixMapping = prefixMappingTrackingStack.pop
     }
-    sendToInputter()
+    maybeSendToInputter()
   }
 
   override def characters(ch: Array[Char], start: Int, length: Int): Unit = {
     characterData.appendAll(ch, start, length)
   }
 
-  private def sendToInputter(): Unit = {
-    val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
-    infosetEvent.clear()
-    // if event is wrapped in a Try failure, we will not have an 
unparseResult, so we only set
-    // unparseResults for events wrapped in Try Success, including those 
events that have expected
-    // errors
-    if (infosetEventWithResponse.isSuccess && 
infosetEventWithResponse.get.unparseResult.isDefined) {
-      unparseResult = infosetEventWithResponse.get.unparseResult.get
-    }
-    // the exception from events wrapped in Try failures and events wrapped in 
Try Successes
-    // (with an unparse error state i.e unparseResult.isError) are collected 
and thrown to stop
-    // the execution of the xmlReader
-    if (infosetEventWithResponse.isFailure || 
infosetEventWithResponse.get.isError) {
-      val causeError = if(infosetEventWithResponse.isFailure) {
-        infosetEventWithResponse.failed.get
-      } else {
-        infosetEventWithResponse.get.causeError.get
+  /**
+   * we only context swtich to the InfosetInputter if batchedInfosetEvents is 
full or we hit an
+   * EndDocument event
+   */
+  private def maybeSendToInputter(): Unit = {
+    val nextIndex = currentIndex + 1
+    if (nextIndex < actualBatchSize &&
+      !batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+      // if we have room left on the batchedInfosetEvents array and the 
current element != EndDocument
+      currentIndex += 1
+      // at this point where we're loading the contents of the array, it 
should have been cleared
+      // mostly by the InfosetInputter, and the last element by us.
+      Assert.invariant(batchedInfosetEvents(currentIndex).isEmpty)
+    } else {
+      // ready to send it off
+      val infosetEventWithResponse = this.resume(inputter, 
batchedInfosetEvents).head
+      // we only ever return a one element array
+
+      // it is possible for unparseResult to be null, in the case of an 
DaffodilUnhandledSAXException
+      if (infosetEventWithResponse.unparseResult.isDefined) {
+        unparseResult = infosetEventWithResponse.unparseResult.get
       }
-      causeError match {
-        case unparseError: DaffodilUnparseErrorSAXException =>
-          // although this is an expected error, we need to throw it so we can 
stop the xmlReader
-          // parse and this thread
-          throw unparseError
-        case unhandled: DaffodilUnhandledSAXException => throw unhandled
-        case unknown => throw new DaffodilUnhandledSAXException("Unknown 
exception: ", new Exception(unknown))
+      // any exception is collected and thrown to stop the execution of the 
xmlReader
+      if (infosetEventWithResponse.isError) {
+        val causeError = infosetEventWithResponse.causeError.get
+        causeError match {
+          case unparseError: DaffodilUnparseErrorSAXException =>
+            // although this is an expected error, we need to throw it so we 
can stop the xmlReader
+            // parse and this thread
+            throw unparseError
+          case unhandled: DaffodilUnhandledSAXException => throw unhandled
+          case unknown => throw new DaffodilUnhandledSAXException("Unknown 
exception: ",
+            new Exception(unknown))
+        }
       }
+      // copy the last element into the first for use by inputter becuase that 
last element was
+      // its current element when we did the context switch. When done clear 
the last element,
+      // since the infosetinputter clears all elements except the last one, 
then set the index to
+      // 1 so we can start to load elements starting at the second element
+      SAXInfosetEvent.copyEvent(batchedInfosetEvents(currentIndex), 
batchedInfosetEvents(0))
+      batchedInfosetEvents(currentIndex).clear()
+      currentIndex = 1
     }
   }
 
@@ -276,8 +334,8 @@ class DaffodilUnparseContentHandler(
         Nope
       }
 
-    infosetEvent.localName = maybelocalName
-    infosetEvent.namespaceURI = maybeNamespaceURI
+    batchedInfosetEvents(currentIndex).localName = maybelocalName
+    batchedInfosetEvents(currentIndex).namespaceURI = maybeNamespaceURI
   }
 
   override def ignorableWhitespace(ch: Array[Char], start: Int, length: Int): 
Unit = {
diff --git 
a/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
 
b/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
new file mode 100644
index 0000000..05b481c
--- /dev/null
+++ 
b/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<tdml:testSuite xmlns:tdml="http://www.ibm.com/xmlns/dfdl/testData";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xmlns:dfdl="http://www.ogf.org/dfdl/dfdl-1.0/";
+  xmlns:xs="http://www.w3.org/2001/XMLSchema"; xmlns:ex="http://example.com";
+  xmlns:daf="urn:ogf:dfdl:2013:imp:daffodil.apache.org:2018:ext"
+  suiteName="saxUnparserTests">
+
+  <tdml:defineSchema name="saxUnparseSchema.embedded.dfdl.xsd">
+    <xs:include 
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd" />
+    <dfdl:format ref="ex:GeneralFormat" />
+
+    <xs:element name="record" type="ex:itemType"/>
+    <xs:complexType name="itemType">
+      <xs:sequence>
+        <xs:element name="item" type="xs:string" dfdl:lengthKind="explicit" 
dfdl:length="1" maxOccurs="unbounded"/>
+      </xs:sequence>
+    </xs:complexType>
+
+  </tdml:defineSchema>
+
+  <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_1">
+    <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema";
+      xmlns:xs="http://www.w3.org/2001/XMLSchema";>
+      <daf:saxUnparseEventBatchSize>1</daf:saxUnparseEventBatchSize>
+    </daf:tunables>
+  </tdml:defineConfig>
+
+  <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_5">
+    <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema";
+      xmlns:xs="http://www.w3.org/2001/XMLSchema";>
+      <daf:saxUnparseEventBatchSize>5</daf:saxUnparseEventBatchSize>
+    </daf:tunables>
+  </tdml:defineConfig>
+
+  <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_1000">
+    <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema";
+      xmlns:xs="http://www.w3.org/2001/XMLSchema";>
+      <daf:saxUnparseEventBatchSize>1000</daf:saxUnparseEventBatchSize>
+    </daf:tunables>
+  </tdml:defineConfig>
+
+  <tdml:unparserTestCase name="test_saxUnparseBatchSize_1" root="record"
+    model="saxUnparseSchema.embedded.dfdl.xsd"
+    roundTrip="true"
+    config="cfg_saxUnparseEventBatchSize_1">
+
+    <tdml:infoset>
+      <tdml:dfdlInfoset>
+        <record xmlns="http://example.com";>
+          <item>H</item>
+          <item>e</item>
+          <item>l</item>
+          <item>l</item>
+          <item>o</item>
+          <item>!</item>
+          <item>-</item>
+          <item>W</item>
+          <item>o</item>
+          <item>r</item>
+          <item>l</item>
+          <item>d</item>
+          <item>.</item>
+          <item>1</item>
+          <item>2</item>
+          <item>3</item>
+        </record>
+      </tdml:dfdlInfoset>
+    </tdml:infoset>
+
+
+    <tdml:document>Hello!-World.123</tdml:document>
+
+  </tdml:unparserTestCase>
+
+  <tdml:unparserTestCase name="test_saxUnparseBatchSize_5" root="record"
+    model="saxUnparseSchema.embedded.dfdl.xsd"
+    roundTrip="true"
+    config="cfg_saxUnparseEventBatchSize_5">
+
+    <tdml:infoset>
+      <tdml:dfdlInfoset>
+        <record xmlns="http://example.com";>
+          <item>H</item>
+          <item>e</item>
+          <item>l</item>
+          <item>l</item>
+          <item>o</item>
+          <item>!</item>
+          <item>-</item>
+          <item>W</item>
+          <item>o</item>
+          <item>r</item>
+          <item>l</item>
+          <item>d</item>
+          <item>.</item>
+          <item>1</item>
+          <item>2</item>
+          <item>3</item>
+        </record>
+      </tdml:dfdlInfoset>
+    </tdml:infoset>
+
+
+    <tdml:document>Hello!-World.123</tdml:document>
+
+  </tdml:unparserTestCase>
+
+  <tdml:unparserTestCase name="test_saxUnparseBatchSize_1000" root="record"
+    model="saxUnparseSchema.embedded.dfdl.xsd"
+    roundTrip="true"
+    config="cfg_saxUnparseEventBatchSize_1000">
+
+    <tdml:infoset>
+      <tdml:dfdlInfoset>
+        <record xmlns="http://example.com";>
+          <item>H</item>
+          <item>e</item>
+          <item>l</item>
+          <item>l</item>
+          <item>o</item>
+          <item>!</item>
+          <item>-</item>
+          <item>W</item>
+          <item>o</item>
+          <item>r</item>
+          <item>l</item>
+          <item>d</item>
+          <item>.</item>
+          <item>1</item>
+          <item>2</item>
+          <item>3</item>
+        </record>
+      </tdml:dfdlInfoset>
+    </tdml:infoset>
+
+    <tdml:document>Hello!-World.123</tdml:document>
+
+  </tdml:unparserTestCase>
+</tdml:testSuite>
+
diff --git 
a/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
 
b/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
new file mode 100644
index 0000000..0934ccd
--- /dev/null
+++ 
b/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.section00.general
+
+import org.apache.daffodil.tdml.Runner
+import org.junit.AfterClass
+import org.junit.Test
+
+object TestUnparserSAX {
+  val testDir = "/org/apache/daffodil/section00/general/"
+  val runner2 = Runner(testDir, "testUnparserSAX.tdml")
+
+  @AfterClass def shutDown: Unit = {
+    runner2.reset
+  }
+}
+
+class TestUnparserSAX {
+  import TestUnparserSAX._
+
+  @Test def test_saxUnparseBatchSize_1() = { 
runner2.runOneTest("test_saxUnparseBatchSize_1") }
+  @Test def test_saxUnparseBatchSize_5() = { 
runner2.runOneTest("test_saxUnparseBatchSize_5") }
+  @Test def test_saxUnparseBatchSize_1000() = { 
runner2.runOneTest("test_saxUnparseBatchSize_1000") }
+}

Reply via email to