Ross M. Lodge created SPARK-26734: ------------------------------------- Summary: StackOverflowError on WAL serialization caused by large receivedBlockQueue Key: SPARK-26734 URL: https://issues.apache.org/jira/browse/SPARK-26734 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.4.0, 2.3.2, 2.3.1 Environment: spark 2.4.0 streaming job
java 1.8 scala 2.11.12 Reporter: Ross M. Lodge We encountered an intermittent StackOverflowError with a stack trace similar to: {noformat} Exception in thread "JobGenerator" java.lang.StackOverflowError at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat} The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs. It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0. Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker: {code:java} /** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { streamIds.foreach(getReceivedBlockQueue(_).clear()) timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } {code} Prior to 2.3.1, this code did {code:java} getReceivedBlockQueue(streamId).dequeueAll(x => true){code} but it was changed as part of SPARK-23991 to {code:java} getReceivedBlockQueue(streamId).clone(){code} We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue: {code:java} class SerializationFailureTest extends FunSpec { private val logger = LoggerFactory.getLogger(getClass) private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] describe("Queue") { it("should be serializable") { runTest(1062) } it("should not be serializable") { runTest(1063) } it("should DEFINITELY not be serializable") { runTest(199952) } } private def runTest(mx: Int): Array[Byte] = { try { val random = new scala.util.Random() val queue = new ReceivedBlockQueue() for (_ <- 0 until mx) { queue += ReceivedBlockInfo( streamId = 0, numRecords = Some(random.nextInt(5)), metadataOption = None, blockStoreResult = WriteAheadLogBasedStoreResult( blockId = StreamBlockId(0, random.nextInt()), numRecords = Some(random.nextInt(5)), walRecordHandle = FileBasedWriteAheadLogSegment( path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""", offset = random.nextLong(), length = random.nextInt() ) ) ) } val record = BatchAllocationEvent( Time(1548320400000L), AllocatedBlocks( Map( 0 -> queue ) ) ) Utils.serialize(record) } catch { case t: Throwable => fail(t) } } } {code} In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue. I'm _assuming_ that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects. I expect this could be solved by transforming the cloned queue into a different type of Seq. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org