Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9143#discussion_r44244198
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
    @@ -190,149 +284,165 @@ class WriteAheadLogSuite extends SparkFunSuite with 
BeforeAndAfter {
         }
         reader.close()
       }
    +}
     
    -  test("FileBasedWriteAheadLog - write rotating logs") {
    -    // Write data with rotation using WriteAheadLog class
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    -
    -    // Read data manually to verify the written data
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    -    assert(writtenData === dataToWrite)
    -  }
    +abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag: 
String)
    +  extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite = 
true, testTag) {
     
    -  test("FileBasedWriteAheadLog - close after write flag") {
    +  import WriteAheadLogSuite._
    +  test(testPrefix + "close after write flag") {
         // Write data with rotation using WriteAheadLog class
         val numFiles = 3
         val dataToWrite = Seq.tabulate(numFiles)(_.toString)
         // total advance time is less than 1000, therefore log shouldn't be 
rolled, but manually closed
         writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, 
clockAdvanceTime = 100,
    -      closeFileAfterWrite = true)
    +      closeFileAfterWrite = true, allowBatching = allowBatching)
     
         // Read data manually to verify the written data
         val logFiles = getLogFilesInDirectory(testDir)
         assert(logFiles.size === numFiles)
    -    val writtenData = logFiles.flatMap { file => readDataManually(file)}
    +    val writtenData: Seq[String] = 
readAndDeserializeDataManually(logFiles, allowBatching)
         assert(writtenData === dataToWrite)
       }
    +}
     
    -  test("FileBasedWriteAheadLog - read rotating logs") {
    -    // Write data manually for testing reading through WriteAheadLog
    -    val writtenData = (1 to 10).map { i =>
    -      val data = generateRandomData()
    -      val file = testDir + s"/log-$i-$i"
    -      writeDataManually(data, file)
    -      data
    -    }.flatten
    +class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
    +  extends CloseFileAfterWriteTests(allowBatching = false, 
"FileBasedWriteAheadLog")
     
    -    val logDirectoryPath = new Path(testDir)
    -    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, 
hadoopConf)
    -    assert(fileSystem.exists(logDirectoryPath) === true)
    +class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
    +    allowBatching = true,
    +    closeFileAfterWrite = false,
    +    "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with 
Eventually {
     
    -    // Read data using manager and verify
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === writtenData)
    -  }
    +  import BatchedWriteAheadLog._
    +  import WriteAheadLogSuite._
     
    -  test("FileBasedWriteAheadLog - recover past logs when creating new 
manager") {
    -    // Write data with manager, recover with new manager and verify
    -    val dataToWrite = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(dataToWrite === readData)
    +  private var wal: WriteAheadLog = _
    +  private var walHandle: WriteAheadLogRecordHandle = _
    +  private var walBatchingThreadPool: ThreadPoolExecutor = _
    +  private var walBatchingExecutionContext: ExecutionContextExecutorService 
= _
    +  private val sparkConf = new SparkConf()
    +
    +  override def beforeEach(): Unit = {
    +    wal = mock[WriteAheadLog]
    +    walHandle = mock[WriteAheadLogRecordHandle]
    +    walBatchingThreadPool = ThreadUtils.newDaemonFixedThreadPool(8, 
"wal-test-thread-pool")
    +    walBatchingExecutionContext = 
ExecutionContext.fromExecutorService(walBatchingThreadPool)
       }
     
    -  test("FileBasedWriteAheadLog - clean old logs") {
    -    logCleanUpTest(waitForCompletion = false)
    +  override def afterEach(): Unit = {
    +    if (walBatchingExecutionContext != null) {
    +      walBatchingExecutionContext.shutdownNow()
    +    }
       }
     
    -  test("FileBasedWriteAheadLog - clean old logs synchronously") {
    -    logCleanUpTest(waitForCompletion = true)
    -  }
    +  test("BatchedWriteAheadLog - serializing and deserializing batched 
records") {
    +    val events = Seq(
    +      BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)),
    +      BatchAllocationEvent(null, null),
    +      BatchCleanupEvent(Nil)
    +    )
     
    -  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
    -    // Write data with manager, recover with new manager and verify
    -    val manualClock = new ManualClock
    -    val dataToWrite = generateRandomData()
    -    writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, 
manualClock, closeLog = false)
    -    val logFiles = getLogFilesInDirectory(testDir)
    -    assert(logFiles.size > 1)
    +    val buffers = events.map(e => 
Record(ByteBuffer.wrap(Utils.serialize(e)), 0L, null))
    +    val batched = BatchedWriteAheadLog.aggregate(buffers)
    +    val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer 
=>
    +      Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array()))
     
    -    writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
    +    assert(deaggregate.toSeq === events)
    +  }
     
    -    if (waitForCompletion) {
    -      assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -    } else {
    -      eventually(timeout(1 second), interval(10 milliseconds)) {
    -        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
    -      }
    +  test("BatchedWriteAheadLog - failures in wrappedLog get bubbled up") {
    +    when(wal.write(any[ByteBuffer], anyLong)).thenThrow(new 
RuntimeException("Hello!"))
    +    // the BatchedWriteAheadLog should bubble up any exceptions that may 
have happened during writes
    +    val batchedWal = new BatchedWriteAheadLog(wal, sparkConf)
    +
    +    intercept[RuntimeException] {
    +      val buffer = mock[ByteBuffer]
    +      batchedWal.write(buffer, 2L)
         }
       }
     
    -  test("FileBasedWriteAheadLog - handling file errors while reading 
rotating logs") {
    -    // Generate a set of log files
    -    val manualClock = new ManualClock
    -    val dataToWrite1 = generateRandomData()
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
    -    val logFiles1 = getLogFilesInDirectory(testDir)
    -    assert(logFiles1.size > 1)
    +  // we make the write requests in separate threads so that we don't block 
the test thread
    +  private def writeEventWithFuture(
    +      wal: WriteAheadLog,
    +      event: String,
    +      time: Long,
    +      numSuccess: AtomicInteger = null,
    +      numFail: AtomicInteger = null): Unit = {
    +    val f = Future(wal.write(event, time))(walBatchingExecutionContext)
    +    f.onComplete {
    +      case Success(v) =>
    +        assert(v === walHandle) // return our mock handle after the write
    +        if (numSuccess != null) numSuccess.incrementAndGet()
    +      case Failure(v) => if (numFail != null) numFail.incrementAndGet()
    +    }(walBatchingExecutionContext)
    +  }
     
    +  /**
    +   * In order to block the writes on the writer thread, we mock the write 
method, and block it
    +   * for some time with a promise.
    +   */
    +  private def writeBlockingPromise(wal: WriteAheadLog): Promise[Any] = {
    +    // we would like to block the write so that we can queue requests
    +    val promise = Promise[Any]()
    +    when(wal.write(any[ByteBuffer], any[Long])).thenAnswer(
    +      new Answer[WriteAheadLogRecordHandle] {
    +        override def answer(invocation: InvocationOnMock): 
WriteAheadLogRecordHandle = {
    +          Await.ready(promise.future, 4.seconds)
    +          walHandle
    +        }
    +      }
    +    )
    +    promise
    +  }
     
    -    // Recover old files and generate a second set of log files
    -    val dataToWrite2 = generateRandomData()
    -    manualClock.advance(100000)
    -    writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
    -    val logFiles2 = getLogFilesInDirectory(testDir)
    -    assert(logFiles2.size > logFiles1.size)
    +  test("BatchedWriteAheadLog - name log with aggregated entries with the 
timestamp of last entry") {
    +    val batchedWal = new BatchedWriteAheadLog(wal, sparkConf)
    +    // block the write so that we can batch some records
    +    val promise = writeBlockingPromise(wal)
    +
    +    val event1 = "hello"
    +    val event2 = "world"
    +    val event3 = "this"
    +    val event4 = "is"
    +    val event5 = "doge"
    +
    +    // 3 will automatically be flushed for the first write
    +    writeEventWithFuture(batchedWal, event1, 3L)
    +    // rest of the records will be batched while it takes 3 to get written
    +    writeEventWithFuture(batchedWal, event2, 5L)
    +    writeEventWithFuture(batchedWal, event3, 8L)
    +    writeEventWithFuture(batchedWal, event4, 12L)
    +    writeEventWithFuture(batchedWal, event5, 10L)
    +    eventually(timeout(1 second)) {
    +      assert(walBatchingThreadPool.getActiveCount === 5)
    +    }
    +    promise.success(true)
     
    -    // Read the files and verify that all the written data can be read
    -    val readData1 = readDataUsingWriteAheadLog(testDir)
    -    assert(readData1 === (dataToWrite1 ++ dataToWrite2))
    +    val buffer1 = wrapArrayArrayByte(Array(event1))
    +    val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
     
    -    // Corrupt the first set of files so that they are basically unreadable
    -    logFiles1.foreach { f =>
    -      val raf = new FileOutputStream(f, true).getChannel()
    -      raf.truncate(1)
    -      raf.close()
    +    eventually(timeout(1 second)) {
    +      verify(wal, times(1)).write(meq(buffer1), meq(3L))
    +      // the file name should be the timestamp of the last record, as 
events should be naturally
    +      // in order of timestamp, and we need the last element.
    +      verify(wal, times(1)).write(meq(buffer2), meq(10L))
         }
    -
    -    // Verify that the corrupted files do not prevent reading of the 
second set of data
    -    val readData = readDataUsingWriteAheadLog(testDir)
    -    assert(readData === dataToWrite2)
       }
     
    -  test("FileBasedWriteAheadLog - do not create directories or files unless 
write") {
    -    val nonexistentTempPath = File.createTempFile("test", "")
    -    nonexistentTempPath.delete()
    -    assert(!nonexistentTempPath.exists())
    +  test("BatchedWriteAheadLog - shutdown properly") {
    +    val batchedWal = new BatchedWriteAheadLog(wal, sparkConf)
    +    batchedWal.close()
    +    verify(wal, times(1)).close()
     
    -    val writtenSegment = writeDataManually(generateRandomData(), testFile)
    -    val wal = new FileBasedWriteAheadLog(new SparkConf(), 
tempDir.getAbsolutePath,
    -      new Configuration(), 1, 1, closeFileAfterWrite = false)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
creating log object")
    -    wal.read(writtenSegment.head)
    -    assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
    +    intercept[IllegalStateException](batchedWal.write(mock[ByteBuffer], 
12L))
    --- End diff --
    
    this is good but not sufficient. sorry if I wasnt clear earlier. I want to 
make sure that we test the following behavior
    
    - batched wal active
    - write1, write2 concurrently waiting to complete
    - batched wal stopped, wrapped wal asked to stop
    - but right before the wrapped wal stops, the write of the wrapped wal 
completes
    - the 2 threads waiting for writes to complete should get exceptions, 
instead of getting success or blocked indefinitely
    
    If they get success after the system has been asked to stop, that may lead 
to hard-to-reason cases about data loss. So better to test that this behavior 
does not break.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to