[ 
https://issues.apache.org/jira/browse/SPARK-5147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276777#comment-14276777
 ] 

François Garillot commented on SPARK-5147:
------------------------------------------

While I agree this is a serious bug, why delete old batches at a 
configuration-specified time ? Since 
[SPARK-4671|https://issues.apache.org/jira/browse/SPARK-4671], such a WAL 
deletion can (with a fresh enough time) lower the number of existing copies in 
the system to just one, which will mean data loss in the case where the 
receiver fails. What's worse is this would specifically happen in contexts 
where the WAL is activated, meaning contexts where fault tolerance is important.

As an alternative, how about reverting 
[SPARK-4671|https://issues.apache.org/jira/browse/SPARK-4671] (replicating 
blocks twice when the WAL is active), AND making the {{BlockManager}} react to 
{{markReady}} / replication, thus keeping track of the number of copies of 
blocks. The only value it really needs to keep current is the date of the 
oldest block for which there isn't two copies in the system yet. Indeed, all 
WAL info strictly before that date can be deleted.

Then it can contact the {{ReceiverTracker}}, which, through the 
{{ReceivedBlockTracker}}, has access to the {{WriteAheadLogManager}}, and can 
call {{cleanupOldBlocks}} with a time for which it is *known* that all blocks 
are replicated twice.

The advantage here is that IIUC, with two copies of a block in the system, the 
locality of where computation on the RDD any block is going to eventually be a 
part of is less predictable (in particular, it has less chances of being on the 
{{Executor}} on which the {{Receiver}} of that block ran, avoiding some fight 
for resources). For that matter, this is by and of itself an argument for the 
reversal of [SPARK-4671|https://issues.apache.org/jira/browse/SPARK-4671]

> write ahead logs from streaming receiver are not purged because 
> cleanupOldBlocks in WriteAheadLogBasedBlockHandler is never called
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5147
>                 URL: https://issues.apache.org/jira/browse/SPARK-5147
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>            Priority: Blocker
>
> Hi all,
> We are running a Spark streaming application with ReliableKafkaReceiver. We 
> have "spark.streaming.receiver.writeAheadLog.enable" set to true so write 
> ahead logs (WALs) for received data are created under receivedData/streamId 
> folder in the checkpoint directory. 
> However, old WALs are never purged by time. receivedBlockMetadata and 
> checkpoint files are purged correctly though. I went through the code, 
> WriteAheadLogBasedBlockHandler class in ReceivedBlockHandler.scala is 
> responsible for cleaning up the old blocks. It has method cleanupOldBlocks, 
> which is never called by any class. ReceiverSupervisorImpl class holds a 
> WriteAheadLogBasedBlockHandler  instance. However, it only calls storeBlock 
> method to create WALs but never calls cleanupOldBlocks method to purge old 
> WALs.
> The size of the WAL folder increases constantly on HDFS. This is preventing 
> us from running the ReliableKafkaReceiver 24x7. Can somebody please take a 
> look.
> Thanks,
> Max



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to