Nice, thank you I'll test this property to see if the error stop;
2016-11-17 14:48 GMT-02:00 Arijit <arij...@live.com>: > Hi Dirceu, > > > For the append issue we are setting "hdfs.append.support" (from Spark code > which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed > to have solved the issue. Of course we are using HDFS which does support > append. I think the actual configuration Spark should check is > "dfs.support.append". > > > I believe failure is intermittent since in most cases a new file is > created to store the block addition event. I need to look into the code > again to see when these files are created new and when they are appended. > > > Thanks, Arijit > > > ------------------------------ > *From:* Dirceu Semighini Filho <dirceu.semigh...@gmail.com> > *Sent:* Thursday, November 17, 2016 6:50:28 AM > *To:* Arijit > *Cc:* Tathagata Das; user@spark.apache.org > > *Subject:* Re: Spark Streaming Data loss on failure to write > BlockAdditionEvent failure to WAL > > Hi Arijit, > Have you find a solution for this? I'm facing the same problem in Spark > 1.6.1, but here the error happens only a few times, so our hdfs does > support append. > This is what I can see in the logs: > 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] > WriteAheadLogManager for Thread: Failed to write to write ahead log after > 3 failures > > > > > 2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com>: > >> Thanks TD. >> >> >> Is "hdfs.append.support" a standard configuration? I see a seemingly >> equivalent configuration "dfs.support.append" that is used in our >> version of HDFS. >> >> >> In case we want to use a pseudo file-system (like S3) which does not >> support append what are our options? I am not familiar with the code yet >> but is it possible to generate a new file whenever conflict of this sort >> happens? >> >> >> Thanks again, Arijit >> ------------------------------ >> *From:* Tathagata Das <tathagata.das1...@gmail.com> >> *Sent:* Monday, November 7, 2016 7:59:06 PM >> *To:* Arijit >> *Cc:* user@spark.apache.org >> *Subject:* Re: Spark Streaming Data loss on failure to write >> BlockAdditionEvent failure to WAL >> >> For WAL in Spark to work with HDFS, the HDFS version you are running must >> support file appends. Contact your HDFS package/installation provider to >> figure out whether this is supported by your HDFS installation. >> >> On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote: >> >>> Hello All, >>> >>> >>> We are using Spark 1.6.2 with WAL enabled and encountering data loss >>> when the following exception/warning happens. We are using HDFS as our >>> checkpoint directory. >>> >>> >>> Questions are: >>> >>> >>> 1. Is this a bug in Spark or issue with our configuration? Source looks >>> like the following. Which file already exist or who is suppose to set >>> hdfs.append.support configuration? Why doesn't it happen all the time? >>> >>> >>> private[streaming] object HdfsUtils { >>> >>> def getOutputStream(path: String, conf: Configuration): >>> FSDataOutputStream = { >>> val dfsPath = new Path(path) >>> val dfs = getFileSystemForPath(dfsPath, conf) >>> // If the file exists and we have append support, append instead of >>> creating a new file >>> val stream: FSDataOutputStream = { >>> if (dfs.isFile(dfsPath)) { >>> if (conf.getBoolean("hdfs.append.support", false) || >>> dfs.isInstanceOf[RawLocalFileSystem]) { >>> dfs.append(dfsPath) >>> } else { >>> throw new IllegalStateException("File exists and there is no >>> append support!") >>> } >>> } else { >>> dfs.create(dfsPath) >>> } >>> } >>> stream >>> } >>> >>> >>> 2. Why does the job not retry and eventually fail when this error >>> occurs? The job skips processing the exact number of events dumped in the >>> log. For this particular example I see 987 + 4686 events were not processed >>> and are lost for ever (does not recover even on restart). >>> >>> >>> 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to >>> write to write ahead log after 3 failures >>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer >>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 >>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul >>> tPromise@5ce88cb6), Record( >>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala. >>> concurrent.impl.Promise$DefaultPromise@6d8f1feb)) >>> java.lang.IllegalStateException: File exists and there is no append >>> support! >>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H >>> dfsUtils.scala:35) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .<init>(FileBasedWriteAheadLogWriter.scala:41) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >>> gWriter(FileBasedWriteAheadLog.scala:217) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:86) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:48) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >>> BatchedWriteAheadLog.scala:173) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1 >>> .run(BatchedWriteAheadLog.scala:140) >>> at java.lang.Thread.run(Thread.java:745) >>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while >>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987 >>> ),None,WriteAheadLogBasedStoreResult(input- >>> 2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs:// >>> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785 >>> 53818621-1478553878621,0,41597)))) to the WriteAheadLog. >>> java.lang.IllegalStateException: File exists and there is no append >>> support! >>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H >>> dfsUtils.scala:35) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .<init>(FileBasedWriteAheadLogWriter.scala:41) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >>> gWriter(FileBasedWriteAheadLog.scala:217) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:86) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:48) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >>> BatchedWriteAheadLog.scala:173) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1 >>> .run(BatchedWriteAheadLog.scala:140) >>> at java.lang.Thread.run(Thread.java:745) >>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while >>> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686 >>> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi >>> leBasedWriteAheadLogSegment(hdfs: >>> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147 >>> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog. >>> java.lang.IllegalStateException: File exists and there is no append >>> support! >>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H >>> dfsUtils.scala:35) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >>> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >>> .<init>(FileBasedWriteAheadLogWriter.scala:41) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >>> gWriter(FileBasedWriteAheadLog.scala:217) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:86) >>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >>> (FileBasedWriteAheadLog.scala:48) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa >>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( >>> BatchedWriteAheadLog.scala:173) >>> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1 >>> .run(BatchedWriteAheadLog.scala:140) >>> at java.lang.Thread.run(Thread.java:745) >>> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in >>> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB) >>> >>> I am sure Spark Streaming is not expected to lose data when WAL is >>> enabled. So what are we doing wrong here? >>> >>> Thanks, Arijit >>> >>> >> >