Hi Arijit, Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, but here the error happens only a few times, so our hdfs does support append. This is what I can see in the logs: 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures
2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com>: > Thanks TD. > > > Is "hdfs.append.support" a standard configuration? I see a seemingly > equivalent configuration "dfs.support.append" that is used in our version > of HDFS. > > > In case we want to use a pseudo file-system (like S3) which does not > support append what are our options? I am not familiar with the code yet > but is it possible to generate a new file whenever conflict of this sort > happens? > > > Thanks again, Arijit > ------------------------------ > *From:* Tathagata Das <tathagata.das1...@gmail.com> > *Sent:* Monday, November 7, 2016 7:59:06 PM > *To:* Arijit > *Cc:* user@spark.apache.org > *Subject:* Re: Spark Streaming Data loss on failure to write > BlockAdditionEvent failure to WAL > > For WAL in Spark to work with HDFS, the HDFS version you are running must > support file appends. Contact your HDFS package/installation provider to > figure out whether this is supported by your HDFS installation. > > On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote: > >> Hello All, >> >> >> We are using Spark 1.6.2 with WAL enabled and encountering data loss when >> the following exception/warning happens. We are using HDFS as our >> checkpoint directory. >> >> >> Questions are: >> >> >> 1. Is this a bug in Spark or issue with our configuration? Source looks >> like the following. Which file already exist or who is suppose to set >> hdfs.append.support configuration? Why doesn't it happen all the time? >> >> >> private[streaming] object HdfsUtils { >> >> def getOutputStream(path: String, conf: Configuration): FSDataOutputStream >> = { >> val dfsPath = new Path(path) >> val dfs = getFileSystemForPath(dfsPath, conf) >> // If the file exists and we have append support, append instead of >> creating a new file >> val stream: FSDataOutputStream = { >> if (dfs.isFile(dfsPath)) { >> if (conf.getBoolean("hdfs.append.support", false) || >> dfs.isInstanceOf[RawLocalFileSystem]) { >> dfs.append(dfsPath) >> } else { >> throw new IllegalStateException("File exists and there is no >> append support!") >> } >> } else { >> dfs.create(dfsPath) >> } >> } >> stream >> } >> >> >> 2. Why does the job not retry and eventually fail when this error occurs? >> The job skips processing the exact number of events dumped in the log. For >> this particular example I see 987 + 4686 events were not processed and are >> lost for ever (does not recover even on restart). >> >> >> 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write >> to write ahead log after 3 failures >> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer >> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 >> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul >> tPromise@5ce88cb6), Record( >> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala. >> concurrent.impl.Promise$DefaultPromise@6d8f1feb)) >> java.lang.IllegalStateException: File exists and there is no append >> support! >> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream( >> HdfsUtils.scala:35) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .<init>(FileBasedWriteAheadLogWriter.scala:41) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >> gWriter(FileBasedWriteAheadLog.scala:217) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:86) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:48) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >> BatchedWriteAheadLog.scala:173) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$ >> 1.run(BatchedWriteAheadLog.scala:140) >> at java.lang.Thread.run(Thread.java:745) >> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while >> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987 >> ),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),File >> BasedWriteAheadLogSegment(hdfs:// >> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785 >> 53818621-1478553878621,0,41597)))) to the WriteAheadLog. >> java.lang.IllegalStateException: File exists and there is no append >> support! >> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream( >> HdfsUtils.scala:35) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .<init>(FileBasedWriteAheadLogWriter.scala:41) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >> gWriter(FileBasedWriteAheadLog.scala:217) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:86) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:48) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >> BatchedWriteAheadLog.scala:173) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$ >> 1.run(BatchedWriteAheadLog.scala:140) >> at java.lang.Thread.run(Thread.java:745) >> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while >> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686 >> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi >> leBasedWriteAheadLogSegment(hdfs: >> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147 >> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog. >> java.lang.IllegalStateException: File exists and there is no append >> support! >> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream( >> HdfsUtils.scala:35) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .<init>(FileBasedWriteAheadLogWriter.scala:41) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >> gWriter(FileBasedWriteAheadLog.scala:217) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:86) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:48) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >> BatchedWriteAheadLog.scala:173) >> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$ >> 1.run(BatchedWriteAheadLog.scala:140) >> at java.lang.Thread.run(Thread.java:745) >> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in >> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB) >> >> I am sure Spark Streaming is not expected to lose data when WAL is >> enabled. So what are we doing wrong here? >> >> Thanks, Arijit >> >> >