Hi Dirceu,
For the append issue we are setting "hdfs.append.support" (from Spark code which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed to have solved the issue. Of course we are using HDFS which does support append. I think the actual configuration Spark should check is "dfs.support.append". I believe failure is intermittent since in most cases a new file is created to store the block addition event. I need to look into the code again to see when these files are created new and when they are appended. Thanks, Arijit ________________________________ From: Dirceu Semighini Filho <dirceu.semigh...@gmail.com> Sent: Thursday, November 17, 2016 6:50:28 AM To: Arijit Cc: Tathagata Das; user@spark.apache.org Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL Hi Arijit, Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, but here the error happens only a few times, so our hdfs does support append. This is what I can see in the logs: 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures 2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com<mailto:arij...@live.com>>: Thanks TD. Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent configuration "dfs.support.append" that is used in our version of HDFS. In case we want to use a pseudo file-system (like S3) which does not support append what are our options? I am not familiar with the code yet but is it possible to generate a new file whenever conflict of this sort happens? Thanks again, Arijit ________________________________ From: Tathagata Das <tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> Sent: Monday, November 7, 2016 7:59:06 PM To: Arijit Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation. On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com<mailto:arij...@live.com>> wrote: Hello All, We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory. Questions are: 1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time? private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) { dfs.append(dfsPath) } else { throw new IllegalStateException("File exists and there is no append support!") } } else { dfs.create(dfsPath) } } stream } 2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart). 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record( java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb)) java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs:// mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597)))) to the WriteAheadLog. java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs: //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473)))) to the WriteAheadLog. java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory on 10.0.0.11:42316<http://10.0.0.11:42316> (size: 283.1 KB, free: 2.6 GB) I am sure Spark Streaming is not expected to lose data when WAL is enabled. So what are we doing wrong here? Thanks, Arijit