[ 
https://issues.apache.org/jira/browse/SPARK-5147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276870#comment-14276870
 ] 

Saisai Shao commented on SPARK-5147:
------------------------------------


As far as I understood, 2 copies of received data (by default) has two 
considerations:

1. fault tolerance. As we already enabled WAL, the data will be written into 
HDFS, so the fault tolerance will be guaranteed by HDFS (3 copies by default), 
so your meaning of {{which will mean data loss in the case where the receiver 
fails}} is really not a problem both in driver failure and receiver failure.

2. locality of task running. I admitted that 2 copies will increase the 
locality of running tasks, and 1 copy may introduce some remote fetching. But I 
think this can be improved by considering the locality of HDFS backed WAL block 
partitions.

Also you mentioned that {{delete old batches at a configuration-specified 
time}}, actually it is not configured but computed by {{rememeber}} function, 
so the deletion of old WAL batches actually has the same same semantics of 
current {{clearMetadata()}} mechanism, it's not configuration-specified time.

BTW, the consideration of SPARK-4671 is trying to increase the throughput (2 
copies of BM and 3 copies of HDFS).

Sorry I really cannot catch what you means, please correct me if misunderstood 
it.

> write ahead logs from streaming receiver are not purged because 
> cleanupOldBlocks in WriteAheadLogBasedBlockHandler is never called
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5147
>                 URL: https://issues.apache.org/jira/browse/SPARK-5147
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>            Priority: Blocker
>
> Hi all,
> We are running a Spark streaming application with ReliableKafkaReceiver. We 
> have "spark.streaming.receiver.writeAheadLog.enable" set to true so write 
> ahead logs (WALs) for received data are created under receivedData/streamId 
> folder in the checkpoint directory. 
> However, old WALs are never purged by time. receivedBlockMetadata and 
> checkpoint files are purged correctly though. I went through the code, 
> WriteAheadLogBasedBlockHandler class in ReceivedBlockHandler.scala is 
> responsible for cleaning up the old blocks. It has method cleanupOldBlocks, 
> which is never called by any class. ReceiverSupervisorImpl class holds a 
> WriteAheadLogBasedBlockHandler  instance. However, it only calls storeBlock 
> method to create WALs but never calls cleanupOldBlocks method to purge old 
> WALs.
> The size of the WAL folder increases constantly on HDFS. This is preventing 
> us from running the ReliableKafkaReceiver 24x7. Can somebody please take a 
> look.
> Thanks,
> Max



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to