Ross M. Lodge created SPARK-26734:
-------------------------------------

             Summary: StackOverflowError on WAL serialization caused by large 
receivedBlockQueue
                 Key: SPARK-26734
                 URL: https://issues.apache.org/jira/browse/SPARK-26734
             Project: Spark
          Issue Type: Bug
          Components: Block Manager
    Affects Versions: 2.4.0, 2.3.2, 2.3.1
         Environment: spark 2.4.0 streaming job

java 1.8

scala 2.11.12
            Reporter: Ross M. Lodge


We encountered an intermittent StackOverflowError with a stack trace similar to:

 
{noformat}
Exception in thread "JobGenerator" java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){noformat}
The name of the thread has been seen to be either "JobGenerator" or 
"streaming-start", depending on when in the lifecycle of the job the problem 
occurs.  It appears to only occur in streaming jobs with checkpointing and WAL 
enabled; this has prevented us from upgrading to v2.4.0.

 

Via debugging, we tracked this down to allocateBlocksToBatch in 
ReceivedBlockTracker:
{code:java}
/**
 * Allocate all unallocated blocks to the given batch.
 * This event will get written to the write ahead log (if enabled).
 */
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    val streamIdToBlocks = streamIds.map { streamId =>
      (streamId, getReceivedBlockQueue(streamId).clone())
    }.toMap
    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      streamIds.foreach(getReceivedBlockQueue(_).clear())
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
    }
  } else {
    // This situation occurs when:
    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    // possibly processed batch job or half-processed batch job need to be 
processed again,
    // so the batchTime will be equal to lastAllocatedBatchTime.
    // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    // lastAllocatedBatchTime.
    // This situation will only occurs in recovery time.
    logInfo(s"Possibly processed batch $batchTime needs to be processed again 
in WAL recovery")
  }
}
{code}
Prior to 2.3.1, this code did
{code:java}
getReceivedBlockQueue(streamId).dequeueAll(x => true){code}
but it was changed as part of SPARK-23991 to
{code:java}
getReceivedBlockQueue(streamId).clone(){code}
We've not been able to reproduce this in a test of the actual above method, but 
we've been able to produce a test that reproduces it by putting a lot of values 
into the queue:

 
{code:java}
class SerializationFailureTest extends FunSpec {

  private val logger = LoggerFactory.getLogger(getClass)

  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

  describe("Queue") {
    it("should be serializable") {
      runTest(1062)
    }
    it("should not be serializable") {
      runTest(1063)
    }
    it("should DEFINITELY not be serializable") {
      runTest(199952)
    }
  }

  private def runTest(mx: Int): Array[Byte] = {
    try {
      val random = new scala.util.Random()
      val queue = new ReceivedBlockQueue()
      for (_ <- 0 until mx) {
        queue += ReceivedBlockInfo(
          streamId = 0,
          numRecords = Some(random.nextInt(5)),
          metadataOption = None,
          blockStoreResult = WriteAheadLogBasedStoreResult(
            blockId = StreamBlockId(0, random.nextInt()),
            numRecords = Some(random.nextInt(5)),
            walRecordHandle = FileBasedWriteAheadLogSegment(
              path = 
s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
              offset = random.nextLong(),
              length = random.nextInt()
            )
          )
        )
      }
      val record = BatchAllocationEvent(
        Time(1548320400000L), AllocatedBlocks(
          Map(
            0 -> queue
          )
        )
      )
      Utils.serialize(record)
    } catch {
      case t: Throwable =>
        fail(t)
    }
  }
}

{code}
In my tests it seemed like the serialization would fail if there were ~1064 
elements in the queue.  I'm _assuming_ that this is actually a scala bug, 
though I haven't tried reproducing it without the involvement of the spark 
objects.

I expect this could be solved by transforming the cloned queue into a different 
type of Seq.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to