Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9143#discussion_r44244797
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
    @@ -190,283 +281,150 @@ class WriteAheadLogSuite extends SparkFunSuite with 
BeforeAndAfter {
         }
         reader.close()
       }
    +}
     
    -  test("FileBasedWriteAheadLog - write rotating logs") {
    -    // Write data with rotation using WriteAheadLog class
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    +abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag: 
String)
    +  extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite = 
true, testTag) {
     
    -    // Read data manually to verify the written data
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    -    assert(writtenData === dataToWrite)
    -  }
    -
    -  test("FileBasedWriteAheadLog - close after write flag") {
    +  import WriteAheadLogSuite._
    +  test(testPrefix + "close after write flag") {
         // Write data with rotation using WriteAheadLog class
         val numFiles = 3
         val dataToWrite = Seq.tabulate(numFiles)(_.toString)
         // total advance time is less than 1000, therefore log shouldn't be 
rolled, but manually closed
         writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, 
clockAdvanceTime = 100,
    -      closeFileAfterWrite = true)
    +      closeFileAfterWrite = true, allowBatching = allowBatching)
     
         // Read data manually to verify the written data
         val logFiles = getLogFilesInDirectory(testDir)
         assert(logFiles.size === numFiles)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    +    val writtenData: Seq[String] = 
readAndDeserializeDataManually(logFiles, allowBatching)
         assert(writtenData === dataToWrite)
       }
    +}
     
    -  test("FileBasedWriteAheadLog - read rotating logs") {
    -    // Write data manually for testing reading through WriteAheadLog
    -    val writtenData = (1 to 10).map { i =>
    -      val data = generateRandomData()
    -      val file = testDir + s"/log-$i-$i"
    -      writeDataManually(data, file)
    -      data
    -    }.flatten
    -
    -    val logDirectoryPath = new Path(testDir)
    -    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
    -    assert(fileSystem.exists(logDirectoryPath) === true)
    +class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
    +  extends CloseFileAfterWriteTests(allowBatching = false, 
"FileBasedWriteAheadLog")
     
    -    // Read data using manager and verify
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === writtenData)
    -  }
    +class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
    +    allowBatching = true,
    +    closeFileAfterWrite = false,
    +    "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with 
PrivateMethodTester {
     
    -  test("FileBasedWriteAheadLog - recover past logs when creating new 
manager") {
    -    // Write data with manager, recover with new manager and verify
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(dataToWrite === readData)
    -  }
    +  import BatchedWriteAheadLog._
    +  import WriteAheadLogSuite._
     
    -  test("FileBasedWriteAheadLog - clean old logs") {
    -    logCleanUpTest(waitForCompletion = false)
    -  }
    +  private var fileBasedWAL: FileBasedWriteAheadLog = _
    +  private var walHandle: FileBasedWriteAheadLogSegment = _
    +  private var walBatchingThreadPool: ExecutionContextExecutorService = _
    +  private val sparkConf = new SparkConf()
     
    -  test("FileBasedWriteAheadLog - clean old logs synchronously") {
    -    logCleanUpTest(waitForCompletion = true)
    +  override def beforeEach(): Unit = {
    +    fileBasedWAL = mock[FileBasedWriteAheadLog]
    +    walHandle = mock[FileBasedWriteAheadLogSegment]
    +    walBatchingThreadPool = ExecutionContext.fromExecutorService(
    +      ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool"))
       }
     
    -  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
    -    // Write data with manager, recover with new manager and verify
    -    val manualClock = new ManualClock
    -    val dataToWrite = generateRandomData()
    -    writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, 
manualClock, closeLog = false)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -
    -    writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
    -
    -    if (waitForCompletion) {
    -      assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -    } else {
    -      eventually(timeout(1 second), interval(10 milliseconds)) {
    -        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -      }
    +  override def afterEach(): Unit = {
    +    if (walBatchingThreadPool != null) {
    +      walBatchingThreadPool.shutdownNow()
         }
       }
     
    -  test("FileBasedWriteAheadLog - handling file errors while reading 
rotating logs") {
    -    // Generate a set of log files
    -    val manualClock = new ManualClock
    -    val dataToWrite1 = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
    -    val logFiles1 = getLogFilesInDirectory(testDir)
    -    assert(logFiles1.size > 1)
    -
    +  test("BatchedWriteAheadLog - serializing and deserializing batched 
records") {
    +    val events = Seq(
    +      BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)),
    +      BatchAllocationEvent(null, null),
    +      BatchCleanupEvent(Nil)
    +    )
     
    -    // Recover old files and generate a second set of log files
    -    val dataToWrite2 = generateRandomData()
    -    manualClock.advance(100000)
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
    -    val logFiles2 = getLogFilesInDirectory(testDir)
    -    assert(logFiles2.size > logFiles1.size)
    +    val buffers = events.map(e => 
Record(ByteBuffer.wrap(Utils.serialize(e)), 0L, null))
    +    val batched = BatchedWriteAheadLog.aggregate(buffers)
    +    val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer 
=>
    +      Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array()))
     
    -    // Read the files and verify that all the written data can be read
    -    val readData1 = readDataUsingWriteAheadLog(testDir)
    -    assert(readData1 === (dataToWrite1 ++ dataToWrite2))
    -
    -    // Corrupt the first set of files so that they are basically unreadable
    -    logFiles1.foreach { f =>
    -      val raf = new FileOutputStream(f, true).getChannel()
    -      raf.truncate(1)
    -      raf.close()
    -    }
    -
    -    // Verify that the corrupted files do not prevent reading of the 
second set of data
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === dataToWrite2)
    -  }
    -
    -  test("FileBasedWriteAheadLog - do not create directories or files unless 
write") {
    -    val nonexistentTempPath = File.createTempFile("test", "")
    -    nonexistentTempPath.delete()
    -    assert(!nonexistentTempPath.exists())
    -
    -    val writtenSegment = writeDataManually(generateRandomData(), testFile)
    -    val wal = new FileBasedWriteAheadLog(new SparkConf(), 
tempDir.getAbsolutePath,
    -      new Configuration(), 1, 1, closeFileAfterWrite = false)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
creating log object")
    -    wal.read(writtenSegment.head)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
    +    assert(deaggregate.toSeq === events)
       }
    -}
     
    -object WriteAheadLogSuite {
    +  test("BatchedWriteAheadLog - failures in wrappedLog get bubbled up") {
    +    when(fileBasedWAL.write(any[ByteBuffer], anyLong)).thenThrow(new 
RuntimeException("Hello!"))
    +    // the BatchedWriteAheadLog should bubble up any exceptions that may 
have happened during writes
    +    val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf)
     
    -  class MockWriteAheadLog0() extends WriteAheadLog {
    -    override def write(record: ByteBuffer, time: Long): 
WriteAheadLogRecordHandle = { null }
    -    override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { 
null }
    -    override def readAll(): util.Iterator[ByteBuffer] = { null }
    -    override def clean(threshTime: Long, waitForCompletion: Boolean): Unit 
= { }
    -    override def close(): Unit = { }
    +    intercept[RuntimeException] {
    +      val buffer = mock[ByteBuffer]
    +      wal.write(buffer, 2L)
    +    }
       }
     
    -  class MockWriteAheadLog1(val conf: SparkConf) extends 
MockWriteAheadLog0()
    -
    -  class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends 
MockWriteAheadLog0()
    -
    -
    -  private val hadoopConf = new Configuration()
    -
    -  /** Write data to a file directly and return an array of the file 
segments written. */
    -  def writeDataManually(data: Seq[String], file: String): 
Seq[FileBasedWriteAheadLogSegment] = {
    -    val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]()
    -    val writer = HdfsUtils.getOutputStream(file, hadoopConf)
    -    data.foreach { item =>
    -      val offset = writer.getPos
    -      val bytes = Utils.serialize(item)
    -      writer.writeInt(bytes.size)
    -      writer.write(bytes)
    -      segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size)
    -    }
    -    writer.close()
    -    segments
    +  // we make the write requests in separate threads so that we don't block 
the test thread
    +  private def eventFuture(
    +      wal: WriteAheadLog,
    +      event: String,
    +      time: Long,
    +      numSuccess: AtomicInteger = null,
    +      numFail: AtomicInteger = null): Unit = {
    +    val f = Future(wal.write(event, time))(walBatchingThreadPool)
    +    f.onComplete {
    +      case Success(v) =>
    +        assert(v === walHandle) // return our mock handle after the write
    +        if (numSuccess != null) numSuccess.incrementAndGet()
    +      case Failure(v) => if (numFail != null) numFail.incrementAndGet()
    +    }(walBatchingThreadPool)
       }
     
       /**
    -   * Write data to a file using the writer class and return an array of 
the file segments written.
    +   * In order to block the writes on the writer thread, we mock the write 
method, and block it
    +   * for some time with a promise.
        */
    -  def writeDataUsingWriter(
    -      filePath: String,
    -      data: Seq[String]
    -    ): Seq[FileBasedWriteAheadLogSegment] = {
    -    val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
    -    val segments = data.map {
    -      item => writer.write(item)
    -    }
    -    writer.close()
    -    segments
    -  }
    -
    -  /** Write data to rotating files in log directory using the 
WriteAheadLog class. */
    -  def writeDataUsingWriteAheadLog(
    -      logDirectory: String,
    -      data: Seq[String],
    -      manualClock: ManualClock = new ManualClock,
    -      closeLog: Boolean = true,
    -      clockAdvanceTime: Int = 500,
    -      closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
    -    if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
    -    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1,
    -      closeFileAfterWrite)
    -
    -    // Ensure that 500 does not get sorted after 2000, so put a high base 
value.
    -    data.foreach { item =>
    -      manualClock.advance(clockAdvanceTime)
    -      wal.write(item, manualClock.getTimeMillis())
    -    }
    -    if (closeLog) wal.close()
    -    wal
    -  }
    -
    -  /** Read data from a segments of a log file directly and return the list 
of byte buffers. */
    -  def readDataManually(segments: Seq[FileBasedWriteAheadLogSegment]): 
Seq[String] = {
    -    segments.map { segment =>
    -      val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
    -      try {
    -        reader.seek(segment.offset)
    -        val bytes = new Array[Byte](segment.length)
    -        reader.readInt()
    -        reader.readFully(bytes)
    -        val data = Utils.deserialize[String](bytes)
    -        reader.close()
    -        data
    -      } finally {
    -        reader.close()
    +  private def writeBlockingPromise(wal: WriteAheadLog): Promise[Any] = {
    +    // we would like to block the write so that we can queue requests
    +    val promise = Promise[Any]()
    +    when(wal.write(any[ByteBuffer], any[Long])).thenAnswer(
    +      new Answer[FileBasedWriteAheadLogSegment] {
    +        override def answer(invocation: InvocationOnMock): 
FileBasedWriteAheadLogSegment = {
    +          Await.ready(promise.future, 4.seconds)
    +          walHandle
    +        }
           }
    -    }
    +    )
    +    promise
       }
     
    -  /** Read all the data from a log file directly and return the list of 
byte buffers. */
    -  def readDataManually(file: String): Seq[String] = {
    -    val reader = HdfsUtils.getInputStream(file, hadoopConf)
    -    val buffer = new ArrayBuffer[String]
    -    try {
    -      while (true) {
    -        // Read till EOF is thrown
    -        val length = reader.readInt()
    -        val bytes = new Array[Byte](length)
    -        reader.read(bytes)
    -        buffer += Utils.deserialize[String](bytes)
    -      }
    -    } catch {
    -      case ex: EOFException =>
    -    } finally {
    -      reader.close()
    +  test("BatchedWriteAheadLog - name log with aggregated entries with the 
timestamp of last entry") {
    +    val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf)
    +    // block the write so that we can batch some records
    +    val promise = writeBlockingPromise(fileBasedWAL)
    +
    +    val event1 = "hello"
    +    val event2 = "world"
    +    val event3 = "this"
    +    val event4 = "is"
    +    val event5 = "doge"
    +
    +    eventFuture(wal, event1, 3L) // 3 will automatically be flushed for 
the first write
    --- End diff --
    
    Aaah, right. That was totally not intutive. Can you add that in the 
comments of the unit test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to