Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1658#discussion_r18613097
  
    --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
    @@ -224,6 +226,128 @@ class FileSuite extends FunSuite with 
LocalSparkContext {
         assert(output.map(_.toString).collect().toList === List("(1,a)", 
"(2,aa)", "(3,aaa)"))
       }
     
    +  test("binary file input as byte array") {
    +    sc = new SparkContext("local", "test")
    +    val outFile = new File(tempDir, "record-bytestream-00000.bin")
    +    val outFileName = outFile.getAbsolutePath()
    +
    +    // create file
    +    val testOutput = Array[Byte](1,2,3,4,5,6)
    +    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
    +    // write data to file
    +    val file = new java.io.FileOutputStream(outFile)
    +    val channel = file.getChannel
    +    channel.write(bbuf)
    +    channel.close()
    +    file.close()
    +
    +    val inRdd = sc.binaryFiles(outFileName)
    +    val (infile: String, indata: PortableDataStream) = inRdd.first
    +
    +    // Make sure the name and array match
    +    assert(infile.contains(outFileName)) // a prefix may get added
    +    assert(indata.toArray === testOutput)
    +  }
    +
    +  test("portabledatastream caching tests") {
    +    sc = new SparkContext("local", "test")
    +    val outFile = new File(tempDir, "record-bytestream-00000.bin")
    +    val outFileName = outFile.getAbsolutePath()
    +
    +    // create file
    +    val testOutput = Array[Byte](1,2,3,4,5,6)
    +    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
    +    // write data to file
    +    val file = new java.io.FileOutputStream(outFile)
    +    val channel = file.getChannel
    +    channel.write(bbuf)
    +    channel.close()
    +    file.close()
    +
    +    val inRdd = sc.binaryFiles(outFileName).cache()
    +    inRdd.foreach{
    +      curData: (String, PortableDataStream) =>
    +       curData._2.toArray() // force the file to read
    +    }
    +    val mappedRdd = inRdd.map{
    +      curData: (String, PortableDataStream) =>
    +        (curData._2.getPath(),curData._2)
    +    }
    +    val (infile: String, indata: PortableDataStream) = mappedRdd.first
    +
    +    // Try reading the output back as an object file
    +
    +    assert(indata.toArray === testOutput)
    +  }
    +
    +  test("portabledatastream flatmap tests") {
    +    sc = new SparkContext("local", "test")
    +    val outFile = new File(tempDir, "record-bytestream-00000.bin")
    +    val outFileName = outFile.getAbsolutePath()
    +
    +    // create file
    +    val testOutput = Array[Byte](1,2,3,4,5,6)
    +    val numOfCopies = 3
    +    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
    +    // write data to file
    +    val file = new java.io.FileOutputStream(outFile)
    +    val channel = file.getChannel
    +    channel.write(bbuf)
    +    channel.close()
    +    file.close()
    +
    +    val inRdd = sc.binaryFiles(outFileName)
    +    val mappedRdd = inRdd.map{
    +      curData: (String, PortableDataStream) =>
    +        (curData._2.getPath(),curData._2)
    +    }
    +    val copyRdd = mappedRdd.flatMap{
    +      curData: (String, PortableDataStream) =>
    +        for(i <- 1 to numOfCopies) yield (i,curData._2)
    +    }
    +
    +    val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
    +
    +    // Try reading the output back as an object file
    +    assert(copyArr.length == numOfCopies)
    +    copyArr.foreach{
    +      cEntry: (Int, PortableDataStream) =>
    +        assert(cEntry._2.toArray === testOutput)
    +    }
    +
    +  }
    +
    +  test("fixed record length binary file as byte array") {
    +    // a fixed length of 6 bytes
    +
    +    sc = new SparkContext("local", "test")
    +
    +    val outFile = new File(tempDir, "record-bytestream-00000.bin")
    +    val outFileName = outFile.getAbsolutePath()
    +
    +    // create file
    +    val testOutput = Array[Byte](1,2,3,4,5,6)
    +    val testOutputCopies = 10
    +
    +    // write data to file
    +    val file = new java.io.FileOutputStream(outFile)
    +    val channel = file.getChannel
    +    for(i <- 1 to testOutputCopies) {
    +      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
    +      channel.write(bbuf)
    +    }
    +    channel.close()
    +    file.close()
    +
    +    val inRdd = sc.binaryRecords(outFileName, testOutput.length)
    +    // make sure there are enough elements
    +    assert(inRdd.count == testOutputCopies)
    +
    +    // now just compare the first one
    +    val indata: Array[Byte] = inRdd.first
    +    assert(indata === testOutput)
    +  }
    --- End diff --
    
    Add a test where you try to read records with 0 or negative size too, which 
should raise an exception in the driver program


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to