[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23991: ------------------------------------ Assignee: (was: Apache Spark) > data loss when allocateBlocksToBatch > ------------------------------------ > > Key: SPARK-23991 > URL: https://issues.apache.org/jira/browse/SPARK-23991 > Project: Spark > Issue Type: Bug > Components: DStreams, Input/Output > Affects Versions: 2.2.0 > Environment: spark 2.11 > Reporter: kevin fu > Priority: Major > > with checkpoint and WAL enabled, driver will write the allocation of blocks > to batch into hdfs. however, if it fails as following, the blocks of this > batch cannot be computed by the DAG. Because the blocks have been dequeued > from the receivedBlockQueue and get lost. > {panel:title=error log} > 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing > record: BatchAllocationEvent(1523765480000 ms,AllocatedBlocks(Map(0 -> > ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException: > Exception thrown in awaitResult: at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at > org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused > by: java.util.concurrent.TimeoutException: Futures timed out after [5000 > milliseconds] at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 > more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch > 1523765480000 ms needs to be processed again in WAL recovery{panel} > the concerning codes are showed below: > {code} > /** > * Allocate all unallocated blocks to the given batch. > * This event will get written to the write ahead log (if enabled). > */ > def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { > if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) > { > val streamIdToBlocks = streamIds.map { streamId => > (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) > }.toMap > val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) > if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { > timeToAllocatedBlocks.put(batchTime, allocatedBlocks) > lastAllocatedBatchTime = batchTime > } else { > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } else { > // This situation occurs when: > // 1. WAL is ended with BatchAllocationEvent, but without > BatchCleanupEvent, > // possibly processed batch job or half-processed batch job need to be > processed again, > // so the batchTime will be equal to lastAllocatedBatchTime. > // 2. Slow checkpointing makes recovered batch time older than WAL > recovered > // lastAllocatedBatchTime. > // This situation will only occurs in recovery time. > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org