stevedlawrence closed pull request #82: Support the --stream option with the 
unparse subcommand
URL: https://github.com/apache/incubator-daffodil/pull/82
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt 
b/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt
new file mode 100644
index 000000000..7b62ed6e9
Binary files /dev/null and 
b/daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt differ
diff --git 
a/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
 
b/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
index 3c12f9d90..a3328b5e5 100644
--- 
a/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
+++ 
b/daffodil-cli/src/it/scala/org/apache/daffodil/unparsing/TestCLIUnparsing.scala
@@ -410,4 +410,21 @@ class TestCLIunparsing {
     }
   }
 
+  @Test def test_XXX_CLI_Unparsing_Stream_01() {
+    val schemaFile = 
Util.daffodilPath("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/cli_schema_02.dfdl.xsd")
+    val inputFile = 
Util.daffodilPath("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt")
+    val (testSchemaFile, testInputFile) = if (Util.isWindows) 
(Util.cmdConvert(schemaFile), Util.cmdConvert(inputFile)) else (schemaFile, 
inputFile)
+
+    val shell = Util.start("")
+
+    try {
+      val cmd = String.format("%s unparse --stream -s %s %s", Util.binPath, 
testSchemaFile, testInputFile)
+      shell.sendLine(cmd)
+      shell.expect(contains("123"))
+      shell.send("exit\n")
+    } finally {
+      shell.close()
+    }
+  }
+
 }
diff --git a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala 
b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
index f12476e28..451cdc100 100644
--- a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
@@ -29,6 +29,8 @@ import java.io.ByteArrayInputStream
 import java.io.File
 import java.nio.channels.Channels
 import java.net.URI
+import java.util.Scanner
+
 import scala.xml.SAXParseException
 import org.rogach.scallop
 import org.apache.daffodil.debugger.{ InteractiveDebugger, 
TraceDebuggerRunner, CLIDebuggerRunner }
@@ -341,7 +343,7 @@ class CLIConf(arguments: Array[String]) extends 
scallop.ScallopConf(arguments)
     val tunables = props[String]('T', keyName = "tunable", valueName = 
"value", descr = "daffodil tunable to be used when parsing.")
     val config = opt[String](short = 'c', argName = "file", descr = "path to 
file containing configuration items.")
     val infosetType = opt[String](short = 'I', argName = "infoset_type", descr 
= "infoset type to output. Must be one of 'xml', 'scala-xml', 'json', 'jdom', 
'w3cdom', or 'null'.", default = Some("xml")).map { _.toLowerCase }
-    val stream = toggle(noshort = true, default = Some(false), descrYes = 
"when left over data exists, parse again with remaining data", descrNo = "stop 
after the first parse, even if left over data exists")
+    val stream = toggle(noshort = true, default = Some(false), descrYes = 
"when left over data exists, parse again with remaining data, separating 
infosets by a NUL character", descrNo = "stop after the first parse, even if 
left over data exists")
     val infile = trailArg[String](required = false, descr = "input file to 
parse. If not specified, or a value of -, reads from stdin.")
 
     validateOpt(debug, infile) {
@@ -434,6 +436,7 @@ class CLIConf(arguments: Array[String]) extends 
scallop.ScallopConf(arguments)
               |                        [--validate [mode]]
               |                        [-D[{namespace}]<variable>=<value>...] 
[-c <file>]
               |                        [-I <infoset_type>]
+              |                        [--stream]
               |                        [-o <output>] [infile]
               |
               |Unparse an infoset file, using either a DFDL schema or a saved 
parser
@@ -453,6 +456,7 @@ class CLIConf(arguments: Array[String]) extends 
scallop.ScallopConf(arguments)
     val tunables = props[String]('T', keyName = "tunable", valueName = 
"value", descr = "daffodil tunable to be used when parsing.")
     val config = opt[String](short = 'c', argName = "file", descr = "path to 
file containing configuration items.")
     val infosetType = opt[String](short = 'I', argName = "infoset_type", descr 
= "infoset type to unparse. Must be one of 'xml', 'scala-xml', 'json', 'jdom', 
or 'w3cdom'.", default = Some("xml")).map { _.toLowerCase }
+    val stream = toggle(noshort = true, default = Some(false), descrYes = 
"split the input data on the NUL character, and unparse each chuck separately", 
descrNo = "treat the entire input data as one infoset")
     val infile = trailArg[String](required = false, descr = "input file to 
unparse. If not specified, or a value of -, reads from stdin.")
 
     validateOpt(debug, infile) {
@@ -906,6 +910,7 @@ object Main extends Logging {
                       lastParseBitPosition = loc.bitPos0b
                       keepParsing = true
                       error = false
+                      writer.write("\u0000")
                     }
                   } else {
                     // not streaming, show left over data warning
@@ -1100,15 +1105,49 @@ object Main extends Logging {
           case None => 1
           case Some(processor) => {
             setupDebugOrTrace(processor.asInstanceOf[DataProcessor], conf)
-            val data = IOUtils.toByteArray(is)
-            val inputterData = 
infosetDataToInputterData(unparseOpts.infosetType.toOption.get, data)
-            val inputter = 
getInfosetInputter(unparseOpts.infosetType.toOption.get, inputterData)
-            val unparseResult = Timer.getResult("unparsing", 
processor.unparse(inputter, outChannel))
-            output.close()
-            displayDiagnostics(unparseResult)
-            if (unparseResult.isError) 1 else 0
+
+            val maybeScanner =
+              if (unparseOpts.stream.toOption.get) {
+                val scnr = new Scanner(is)
+                scnr.useDelimiter("\u0000")
+                Some(scnr)
+              } else {
+                None
+              }
+
+            var keepUnparsing = maybeScanner.isEmpty || 
maybeScanner.get.hasNext
+            var error = false
+
+            while (keepUnparsing) {
+
+              val data =
+                if (maybeScanner.isDefined) {
+                  maybeScanner.get.next().getBytes()
+                } else {
+                  IOUtils.toByteArray(is)
+                }
+
+              val inputterData = 
infosetDataToInputterData(unparseOpts.infosetType.toOption.get, data)
+              val inputter = 
getInfosetInputter(unparseOpts.infosetType.toOption.get, inputterData)
+              val unparseResult = Timer.getResult("unparsing", 
processor.unparse(inputter, outChannel))
+              displayDiagnostics(unparseResult)
+
+              if (unparseResult.isError) {
+                keepUnparsing = false
+                error = true
+              } else {
+                keepUnparsing = maybeScanner.isDefined && 
maybeScanner.get.hasNext
+                error = false
+              }
+            }
+
+            if (error) 1 else 0
           }
         }
+
+        is.close()
+        outChannel.close()
+
         rc
       }
 
diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
index b07c0effa..077db6bb8 100644
--- 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
+++ 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
@@ -81,8 +81,15 @@ private[io] class ByteArrayOutputStreamWithGetBuf() extends 
java.io.ByteArrayOut
  * Has two modes of operation, buffering or direct. When buffering, all output 
goes into a
  * buffer. When direct, all output goes into a "real" DataOutputStream.
  *
+ * The isLayer parameter defines whether or not this instance originated from a
+ * layer or not. This is important to specify because this class is reponsible
+ * for closing the associated Java OutputStream, ultimately being written to
+ * the underlying underlying DataOutputStream. However, if the DataOutputStream
+ * is not related to a layer, that means the associated Java OutputStream came
+ * from the user and it is the users responsibility to close it. The isLayer
+ * provides the flag to know which streams should be closed or not.
  */
-final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: 
DirectOrBufferedDataOutputStream)
+final class DirectOrBufferedDataOutputStream private[io] (var splitFrom: 
DirectOrBufferedDataOutputStream, val isLayer: Boolean = false)
   extends DataOutputStreamImplMixin {
   type ThisType = DirectOrBufferedDataOutputStream
 
@@ -212,7 +219,7 @@ final class DirectOrBufferedDataOutputStream private[io] 
(var splitFrom: DirectO
    */
   def addBuffered: DirectOrBufferedDataOutputStream = {
     Assert.usage(_following.isEmpty)
-    val newBufStr = new DirectOrBufferedDataOutputStream(this)
+    val newBufStr = new DirectOrBufferedDataOutputStream(this, isLayer)
     _following = One(newBufStr)
     //
     // TODO: PERFORMANCE: This is very pessimistic. It's making a complete 
clone of the state
@@ -324,9 +331,14 @@ final class DirectOrBufferedDataOutputStream private[io] 
(var splitFrom: DirectO
             // zero out so we don't end up thinking it is still there
             directStream.cst.setFragmentLastByte(0, 0)
           }
-          // now flush/close the whole data output stream
-          // propagate the closed-ness by closing the underlying java stream.
-          directStream.getJavaOutputStream().close()
+          // Now flush the whole data output stream. Note that we only want to
+          // close the java output stream if it was one we created for
+          // layering. If it was not from a layer, then it is the underlying
+          // OutputStream from a user and they are responsible for closing it.
+          directStream.getJavaOutputStream().flush()
+          if (directStream.isLayer) {
+            directStream.getJavaOutputStream().close()
+          }
           directStream.setDOSState(Uninitialized) // not just finished. We're 
dead now.
         } else {
           // the last stream we merged forward into was not finished.
@@ -805,8 +817,8 @@ object DirectOrBufferedDataOutputStream {
    * Factory for creating new ones/
    * Passing creator as null indicates no other stream created this one.
    */
-  def apply(jos: java.io.OutputStream, creator: 
DirectOrBufferedDataOutputStream) = {
-    val dbdos = new DirectOrBufferedDataOutputStream(creator)
+  def apply(jos: java.io.OutputStream, creator: 
DirectOrBufferedDataOutputStream, isLayer: Boolean = false) = {
+    val dbdos = new DirectOrBufferedDataOutputStream(creator, isLayer)
     dbdos.setJavaOutputStream(jos)
 
     if (creator eq null) {
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
index 7cc6afd6a..3687b5afc 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
@@ -153,7 +153,7 @@ abstract class LayerTransformer()
     val jos = wrapJavaOutputStream(s, state)
     val limitedJOS = wrapLimitingStream(jos, state)
     val encodedOutputStream = wrapLayerEncoder(limitedJOS)
-    val newDOS = DirectOrBufferedDataOutputStream(encodedOutputStream, null)
+    val newDOS = DirectOrBufferedDataOutputStream(encodedOutputStream, null, 
isLayer = true)
     newDOS.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
     newDOS.setAbsStartingBitPos0b(ULong(0L))
     newDOS.setDebugging(s.areDebugging)
diff --git a/project/Rat.scala b/project/Rat.scala
index 888517ecc..1ec9df48c 100644
--- a/project/Rat.scala
+++ b/project/Rat.scala
@@ -62,6 +62,7 @@ object Rat {
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input15.txt"),
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input16.txt"),
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input18.txt"),
+    
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input19.txt"),
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input1.txt"),
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input2.txt"),
     
file("daffodil-cli/src/it/resources/org/apache/daffodil/CLI/input/input3.txt"),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to