[ https://issues.apache.org/jira/browse/SPARK-26734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006211#comment-17006211 ]
Stephen commented on SPARK-26734: --------------------------------- Is this bug fixed? I still see the same error at Spark 2.4.3 > StackOverflowError on WAL serialization caused by large receivedBlockQueue > -------------------------------------------------------------------------- > > Key: SPARK-26734 > URL: https://issues.apache.org/jira/browse/SPARK-26734 > Project: Spark > Issue Type: Bug > Components: Block Manager, DStreams > Affects Versions: 2.3.1, 2.3.2, 2.4.0 > Environment: spark 2.4.0 streaming job > java 1.8 > scala 2.11.12 > Reporter: Ross M. Lodge > Assignee: Ross M. Lodge > Priority: Major > Fix For: 2.3.4, 2.4.1, 3.0.0 > > > We encountered an intermittent StackOverflowError with a stack trace similar > to: > > {noformat} > Exception in thread "JobGenerator" java.lang.StackOverflowError > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat} > The name of the thread has been seen to be either "JobGenerator" or > "streaming-start", depending on when in the lifecycle of the job the problem > occurs. It appears to only occur in streaming jobs with checkpointing and > WAL enabled; this has prevented us from upgrading to v2.4.0. > > Via debugging, we tracked this down to allocateBlocksToBatch in > ReceivedBlockTracker: > {code:java} > /** > * Allocate all unallocated blocks to the given batch. > * This event will get written to the write ahead log (if enabled). > */ > def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { > if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { > val streamIdToBlocks = streamIds.map { streamId => > (streamId, getReceivedBlockQueue(streamId).clone()) > }.toMap > val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) > if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { > streamIds.foreach(getReceivedBlockQueue(_).clear()) > timeToAllocatedBlocks.put(batchTime, allocatedBlocks) > lastAllocatedBatchTime = batchTime > } else { > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } else { > // This situation occurs when: > // 1. WAL is ended with BatchAllocationEvent, but without > BatchCleanupEvent, > // possibly processed batch job or half-processed batch job need to be > processed again, > // so the batchTime will be equal to lastAllocatedBatchTime. > // 2. Slow checkpointing makes recovered batch time older than WAL > recovered > // lastAllocatedBatchTime. > // This situation will only occurs in recovery time. > logInfo(s"Possibly processed batch $batchTime needs to be processed again > in WAL recovery") > } > } > {code} > Prior to 2.3.1, this code did > {code:java} > getReceivedBlockQueue(streamId).dequeueAll(x => true){code} > but it was changed as part of SPARK-23991 to > {code:java} > getReceivedBlockQueue(streamId).clone(){code} > We've not been able to reproduce this in a test of the actual above method, > but we've been able to produce a test that reproduces it by putting a lot of > values into the queue: > > {code:java} > class SerializationFailureTest extends FunSpec { > private val logger = LoggerFactory.getLogger(getClass) > private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] > describe("Queue") { > it("should be serializable") { > runTest(1062) > } > it("should not be serializable") { > runTest(1063) > } > it("should DEFINITELY not be serializable") { > runTest(199952) > } > } > private def runTest(mx: Int): Array[Byte] = { > try { > val random = new scala.util.Random() > val queue = new ReceivedBlockQueue() > for (_ <- 0 until mx) { > queue += ReceivedBlockInfo( > streamId = 0, > numRecords = Some(random.nextInt(5)), > metadataOption = None, > blockStoreResult = WriteAheadLogBasedStoreResult( > blockId = StreamBlockId(0, random.nextInt()), > numRecords = Some(random.nextInt(5)), > walRecordHandle = FileBasedWriteAheadLogSegment( > path = > s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""", > offset = random.nextLong(), > length = random.nextInt() > ) > ) > ) > } > val record = BatchAllocationEvent( > Time(1548320400000L), AllocatedBlocks( > Map( > 0 -> queue > ) > ) > ) > Utils.serialize(record) > } catch { > case t: Throwable => > fail(t) > } > } > } > {code} > In my tests it seemed like the serialization would fail if there were ~1064 > elements in the queue. I'm _assuming_ that this is actually a scala bug, > though I haven't tried reproducing it without the involvement of the spark > objects. > I expect this could be solved by transforming the cloned queue into a > different type of Seq. > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org