[ https://issues.apache.org/jira/browse/SPARK-5147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276988#comment-14276988 ]
François Garillot commented on SPARK-5147: ------------------------------------------ 1. Yes, you're right I had forgotten the WAL gave 3 copies by default. As for 2 copies being driver-fault-tolerant : ok, the right approach to deleting the WAL is at checkpointing, then, right ? 2. I'm not sure how WAL-backed blocks will fit into this : when looking for a RDD, existing blocks ( != WAL), registered at the {{BlockManagerMaster}} will *always* be what is considered, or am I missing something ? I.e. the only place I see the WAL being *read* is in {{recoverFromWriteAheadLogs}} : the locality of the 3 copies of the WAL may be good, but that's useless if they're not actually used. I get the concern about throughput, but isn't writing to the WAL usually much longer than replicating a block in memory through Spark ? At least that's the impression I got from mailing list users' reports, feel free to tell me you measure the converse. But with being done in parallel (and blocking on the WAL write), the additional block copy, if it's really the fastest operation, should not introduce any delay. > write ahead logs from streaming receiver are not purged because > cleanupOldBlocks in WriteAheadLogBasedBlockHandler is never called > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-5147 > URL: https://issues.apache.org/jira/browse/SPARK-5147 > Project: Spark > Issue Type: Sub-task > Components: Streaming > Affects Versions: 1.2.0 > Reporter: Max Xu > Priority: Blocker > > Hi all, > We are running a Spark streaming application with ReliableKafkaReceiver. We > have "spark.streaming.receiver.writeAheadLog.enable" set to true so write > ahead logs (WALs) for received data are created under receivedData/streamId > folder in the checkpoint directory. > However, old WALs are never purged by time. receivedBlockMetadata and > checkpoint files are purged correctly though. I went through the code, > WriteAheadLogBasedBlockHandler class in ReceivedBlockHandler.scala is > responsible for cleaning up the old blocks. It has method cleanupOldBlocks, > which is never called by any class. ReceiverSupervisorImpl class holds a > WriteAheadLogBasedBlockHandler instance. However, it only calls storeBlock > method to create WALs but never calls cleanupOldBlocks method to purge old > WALs. > The size of the WAL folder increases constantly on HDFS. This is preventing > us from running the ReliableKafkaReceiver 24x7. Can somebody please take a > look. > Thanks, > Max -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org