[ https://issues.apache.org/jira/browse/SPARK-5147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276870#comment-14276870 ]
Saisai Shao commented on SPARK-5147: ------------------------------------ As far as I understood, 2 copies of received data (by default) has two considerations: 1. fault tolerance. As we already enabled WAL, the data will be written into HDFS, so the fault tolerance will be guaranteed by HDFS (3 copies by default), so your meaning of {{which will mean data loss in the case where the receiver fails}} is really not a problem both in driver failure and receiver failure. 2. locality of task running. I admitted that 2 copies will increase the locality of running tasks, and 1 copy may introduce some remote fetching. But I think this can be improved by considering the locality of HDFS backed WAL block partitions. Also you mentioned that {{delete old batches at a configuration-specified time}}, actually it is not configured but computed by {{rememeber}} function, so the deletion of old WAL batches actually has the same same semantics of current {{clearMetadata()}} mechanism, it's not configuration-specified time. BTW, the consideration of SPARK-4671 is trying to increase the throughput (2 copies of BM and 3 copies of HDFS). Sorry I really cannot catch what you means, please correct me if misunderstood it. > write ahead logs from streaming receiver are not purged because > cleanupOldBlocks in WriteAheadLogBasedBlockHandler is never called > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-5147 > URL: https://issues.apache.org/jira/browse/SPARK-5147 > Project: Spark > Issue Type: Sub-task > Components: Streaming > Affects Versions: 1.2.0 > Reporter: Max Xu > Priority: Blocker > > Hi all, > We are running a Spark streaming application with ReliableKafkaReceiver. We > have "spark.streaming.receiver.writeAheadLog.enable" set to true so write > ahead logs (WALs) for received data are created under receivedData/streamId > folder in the checkpoint directory. > However, old WALs are never purged by time. receivedBlockMetadata and > checkpoint files are purged correctly though. I went through the code, > WriteAheadLogBasedBlockHandler class in ReceivedBlockHandler.scala is > responsible for cleaning up the old blocks. It has method cleanupOldBlocks, > which is never called by any class. ReceiverSupervisorImpl class holds a > WriteAheadLogBasedBlockHandler instance. However, it only calls storeBlock > method to create WALs but never calls cleanupOldBlocks method to purge old > WALs. > The size of the WAL folder increases constantly on HDFS. This is preventing > us from running the ReliableKafkaReceiver 24x7. Can somebody please take a > look. > Thanks, > Max -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org