Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/9143#discussion_r44244797 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala --- @@ -190,283 +281,150 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter { } reader.close() } +} - test("FileBasedWriteAheadLog - write rotating logs") { - // Write data with rotation using WriteAheadLog class - val dataToWrite = generateRandomData() - writeDataUsingWriteAheadLog(testDir, dataToWrite) +abstract class CloseFileAfterWriteTests(allowBatching: Boolean, testTag: String) + extends CommonWriteAheadLogTests(allowBatching, closeFileAfterWrite = true, testTag) { - // Read data manually to verify the written data - val logFiles = getLogFilesInDirectory(testDir) - assert(logFiles.size > 1) - val writtenData = logFiles.flatMap { file => readDataManually(file)} - assert(writtenData === dataToWrite) - } - - test("FileBasedWriteAheadLog - close after write flag") { + import WriteAheadLogSuite._ + test(testPrefix + "close after write flag") { // Write data with rotation using WriteAheadLog class val numFiles = 3 val dataToWrite = Seq.tabulate(numFiles)(_.toString) // total advance time is less than 1000, therefore log shouldn't be rolled, but manually closed writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, clockAdvanceTime = 100, - closeFileAfterWrite = true) + closeFileAfterWrite = true, allowBatching = allowBatching) // Read data manually to verify the written data val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size === numFiles) - val writtenData = logFiles.flatMap { file => readDataManually(file)} + val writtenData: Seq[String] = readAndDeserializeDataManually(logFiles, allowBatching) assert(writtenData === dataToWrite) } +} - test("FileBasedWriteAheadLog - read rotating logs") { - // Write data manually for testing reading through WriteAheadLog - val writtenData = (1 to 10).map { i => - val data = generateRandomData() - val file = testDir + s"/log-$i-$i" - writeDataManually(data, file) - data - }.flatten - - val logDirectoryPath = new Path(testDir) - val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - assert(fileSystem.exists(logDirectoryPath) === true) +class FileBasedWriteAheadLogWithFileCloseAfterWriteSuite + extends CloseFileAfterWriteTests(allowBatching = false, "FileBasedWriteAheadLog") - // Read data using manager and verify - val readData = readDataUsingWriteAheadLog(testDir) - assert(readData === writtenData) - } +class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( + allowBatching = true, + closeFileAfterWrite = false, + "BatchedWriteAheadLog") with MockitoSugar with BeforeAndAfterEach with PrivateMethodTester { - test("FileBasedWriteAheadLog - recover past logs when creating new manager") { - // Write data with manager, recover with new manager and verify - val dataToWrite = generateRandomData() - writeDataUsingWriteAheadLog(testDir, dataToWrite) - val logFiles = getLogFilesInDirectory(testDir) - assert(logFiles.size > 1) - val readData = readDataUsingWriteAheadLog(testDir) - assert(dataToWrite === readData) - } + import BatchedWriteAheadLog._ + import WriteAheadLogSuite._ - test("FileBasedWriteAheadLog - clean old logs") { - logCleanUpTest(waitForCompletion = false) - } + private var fileBasedWAL: FileBasedWriteAheadLog = _ + private var walHandle: FileBasedWriteAheadLogSegment = _ + private var walBatchingThreadPool: ExecutionContextExecutorService = _ + private val sparkConf = new SparkConf() - test("FileBasedWriteAheadLog - clean old logs synchronously") { - logCleanUpTest(waitForCompletion = true) + override def beforeEach(): Unit = { + fileBasedWAL = mock[FileBasedWriteAheadLog] + walHandle = mock[FileBasedWriteAheadLogSegment] + walBatchingThreadPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(8, "wal-test-thread-pool")) } - private def logCleanUpTest(waitForCompletion: Boolean): Unit = { - // Write data with manager, recover with new manager and verify - val manualClock = new ManualClock - val dataToWrite = generateRandomData() - writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, manualClock, closeLog = false) - val logFiles = getLogFilesInDirectory(testDir) - assert(logFiles.size > 1) - - writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion) - - if (waitForCompletion) { - assert(getLogFilesInDirectory(testDir).size < logFiles.size) - } else { - eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(testDir).size < logFiles.size) - } + override def afterEach(): Unit = { + if (walBatchingThreadPool != null) { + walBatchingThreadPool.shutdownNow() } } - test("FileBasedWriteAheadLog - handling file errors while reading rotating logs") { - // Generate a set of log files - val manualClock = new ManualClock - val dataToWrite1 = generateRandomData() - writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock) - val logFiles1 = getLogFilesInDirectory(testDir) - assert(logFiles1.size > 1) - + test("BatchedWriteAheadLog - serializing and deserializing batched records") { + val events = Seq( + BlockAdditionEvent(ReceivedBlockInfo(0, None, None, null)), + BatchAllocationEvent(null, null), + BatchCleanupEvent(Nil) + ) - // Recover old files and generate a second set of log files - val dataToWrite2 = generateRandomData() - manualClock.advance(100000) - writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock) - val logFiles2 = getLogFilesInDirectory(testDir) - assert(logFiles2.size > logFiles1.size) + val buffers = events.map(e => Record(ByteBuffer.wrap(Utils.serialize(e)), 0L, null)) + val batched = BatchedWriteAheadLog.aggregate(buffers) + val deaggregate = BatchedWriteAheadLog.deaggregate(batched).map(buffer => + Utils.deserialize[ReceivedBlockTrackerLogEvent](buffer.array())) - // Read the files and verify that all the written data can be read - val readData1 = readDataUsingWriteAheadLog(testDir) - assert(readData1 === (dataToWrite1 ++ dataToWrite2)) - - // Corrupt the first set of files so that they are basically unreadable - logFiles1.foreach { f => - val raf = new FileOutputStream(f, true).getChannel() - raf.truncate(1) - raf.close() - } - - // Verify that the corrupted files do not prevent reading of the second set of data - val readData = readDataUsingWriteAheadLog(testDir) - assert(readData === dataToWrite2) - } - - test("FileBasedWriteAheadLog - do not create directories or files unless write") { - val nonexistentTempPath = File.createTempFile("test", "") - nonexistentTempPath.delete() - assert(!nonexistentTempPath.exists()) - - val writtenSegment = writeDataManually(generateRandomData(), testFile) - val wal = new FileBasedWriteAheadLog(new SparkConf(), tempDir.getAbsolutePath, - new Configuration(), 1, 1, closeFileAfterWrite = false) - assert(!nonexistentTempPath.exists(), "Directory created just by creating log object") - wal.read(writtenSegment.head) - assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment") + assert(deaggregate.toSeq === events) } -} -object WriteAheadLogSuite { + test("BatchedWriteAheadLog - failures in wrappedLog get bubbled up") { + when(fileBasedWAL.write(any[ByteBuffer], anyLong)).thenThrow(new RuntimeException("Hello!")) + // the BatchedWriteAheadLog should bubble up any exceptions that may have happened during writes + val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf) - class MockWriteAheadLog0() extends WriteAheadLog { - override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { null } - override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { null } - override def readAll(): util.Iterator[ByteBuffer] = { null } - override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { } - override def close(): Unit = { } + intercept[RuntimeException] { + val buffer = mock[ByteBuffer] + wal.write(buffer, 2L) + } } - class MockWriteAheadLog1(val conf: SparkConf) extends MockWriteAheadLog0() - - class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends MockWriteAheadLog0() - - - private val hadoopConf = new Configuration() - - /** Write data to a file directly and return an array of the file segments written. */ - def writeDataManually(data: Seq[String], file: String): Seq[FileBasedWriteAheadLogSegment] = { - val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]() - val writer = HdfsUtils.getOutputStream(file, hadoopConf) - data.foreach { item => - val offset = writer.getPos - val bytes = Utils.serialize(item) - writer.writeInt(bytes.size) - writer.write(bytes) - segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size) - } - writer.close() - segments + // we make the write requests in separate threads so that we don't block the test thread + private def eventFuture( + wal: WriteAheadLog, + event: String, + time: Long, + numSuccess: AtomicInteger = null, + numFail: AtomicInteger = null): Unit = { + val f = Future(wal.write(event, time))(walBatchingThreadPool) + f.onComplete { + case Success(v) => + assert(v === walHandle) // return our mock handle after the write + if (numSuccess != null) numSuccess.incrementAndGet() + case Failure(v) => if (numFail != null) numFail.incrementAndGet() + }(walBatchingThreadPool) } /** - * Write data to a file using the writer class and return an array of the file segments written. + * In order to block the writes on the writer thread, we mock the write method, and block it + * for some time with a promise. */ - def writeDataUsingWriter( - filePath: String, - data: Seq[String] - ): Seq[FileBasedWriteAheadLogSegment] = { - val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf) - val segments = data.map { - item => writer.write(item) - } - writer.close() - segments - } - - /** Write data to rotating files in log directory using the WriteAheadLog class. */ - def writeDataUsingWriteAheadLog( - logDirectory: String, - data: Seq[String], - manualClock: ManualClock = new ManualClock, - closeLog: Boolean = true, - clockAdvanceTime: Int = 500, - closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = { - if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000) - val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1, - closeFileAfterWrite) - - // Ensure that 500 does not get sorted after 2000, so put a high base value. - data.foreach { item => - manualClock.advance(clockAdvanceTime) - wal.write(item, manualClock.getTimeMillis()) - } - if (closeLog) wal.close() - wal - } - - /** Read data from a segments of a log file directly and return the list of byte buffers. */ - def readDataManually(segments: Seq[FileBasedWriteAheadLogSegment]): Seq[String] = { - segments.map { segment => - val reader = HdfsUtils.getInputStream(segment.path, hadoopConf) - try { - reader.seek(segment.offset) - val bytes = new Array[Byte](segment.length) - reader.readInt() - reader.readFully(bytes) - val data = Utils.deserialize[String](bytes) - reader.close() - data - } finally { - reader.close() + private def writeBlockingPromise(wal: WriteAheadLog): Promise[Any] = { + // we would like to block the write so that we can queue requests + val promise = Promise[Any]() + when(wal.write(any[ByteBuffer], any[Long])).thenAnswer( + new Answer[FileBasedWriteAheadLogSegment] { + override def answer(invocation: InvocationOnMock): FileBasedWriteAheadLogSegment = { + Await.ready(promise.future, 4.seconds) + walHandle + } } - } + ) + promise } - /** Read all the data from a log file directly and return the list of byte buffers. */ - def readDataManually(file: String): Seq[String] = { - val reader = HdfsUtils.getInputStream(file, hadoopConf) - val buffer = new ArrayBuffer[String] - try { - while (true) { - // Read till EOF is thrown - val length = reader.readInt() - val bytes = new Array[Byte](length) - reader.read(bytes) - buffer += Utils.deserialize[String](bytes) - } - } catch { - case ex: EOFException => - } finally { - reader.close() + test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") { + val wal = new BatchedWriteAheadLog(fileBasedWAL, sparkConf) + // block the write so that we can batch some records + val promise = writeBlockingPromise(fileBasedWAL) + + val event1 = "hello" + val event2 = "world" + val event3 = "this" + val event4 = "is" + val event5 = "doge" + + eventFuture(wal, event1, 3L) // 3 will automatically be flushed for the first write --- End diff -- Aaah, right. That was totally not intutive. Can you add that in the comments of the unit test?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org