Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark
1.6.1, but here the error happens only a few times, so our hdfs does
support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
WriteAheadLogManager  for Thread: Failed to write to write ahead log after
3 failures




2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com>:

> Thanks TD.
>
>
> Is "hdfs.append.support" a standard configuration? I see a seemingly
> equivalent configuration "dfs.support.append" that is used in our version
> of HDFS.
>
>
> In case we want to use a pseudo file-system (like S3)  which does not
> support append what are our options? I am not familiar with the code yet
> but is it possible to generate a new file whenever conflict of this sort
> happens?
>
>
> Thanks again, Arijit
> ------------------------------
> *From:* Tathagata Das <tathagata.das1...@gmail.com>
> *Sent:* Monday, November 7, 2016 7:59:06 PM
> *To:* Arijit
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> For WAL in Spark to work with HDFS, the HDFS version you are running must
> support file appends. Contact your HDFS package/installation provider to
> figure out whether this is supported by your HDFS installation.
>
> On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote:
>
>> Hello All,
>>
>>
>> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
>> the following exception/warning happens. We are using HDFS as our
>> checkpoint directory.
>>
>>
>> Questions are:
>>
>>
>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>> like the following. Which file already exist or who is suppose to set
>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>
>>
>> private[streaming] object HdfsUtils {
>>
>>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
>> = {
>>     val dfsPath = new Path(path)
>>     val dfs = getFileSystemForPath(dfsPath, conf)
>>     // If the file exists and we have append support, append instead of 
>> creating a new file
>>     val stream: FSDataOutputStream = {
>>       if (dfs.isFile(dfsPath)) {
>>         if (conf.getBoolean("hdfs.append.support", false) || 
>> dfs.isInstanceOf[RawLocalFileSystem]) {
>>           dfs.append(dfsPath)
>>         } else {
>>           throw new IllegalStateException("File exists and there is no 
>> append support!")
>>         }
>>       } else {
>>         dfs.create(dfsPath)
>>       }
>>     }
>>     stream
>>   }
>>
>>
>> 2. Why does the job not retry and eventually fail when this error occurs?
>> The job skips processing the exact number of events dumped in the log. For
>> this particular example I see 987 + 4686 events were not processed and are
>> lost for ever (does not recover even on restart).
>>
>>
>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
>> to write ahead log after 3 failures
>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>> tPromise@5ce88cb6), Record(
>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987
>> ),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),File
>> BasedWriteAheadLogSegment(hdfs://
>> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785
>> 53818621-1478553878621,0,41597)))) to the WriteAheadLog.
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686
>> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi
>> leBasedWriteAheadLogSegment(hdfs:
>> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147
>> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog.
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>>         at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
>> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>>
>> I am sure Spark Streaming is not expected to lose data when WAL is
>> enabled. So what are we doing wrong here?
>>
>> Thanks, Arijit
>>
>>
>

Reply via email to