Nice, thank you I'll test this property to see if the error stop;

2016-11-17 14:48 GMT-02:00 Arijit <arij...@live.com>:

> Hi Dirceu,
>
>
> For the append issue we are setting "hdfs.append.support" (from Spark code
> which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed
> to have solved the issue. Of course we are using HDFS which does support
> append. I think the actual configuration Spark should check is
> "dfs.support.append".
>
>
> I believe failure is intermittent since in most cases a new file is
> created to store the block addition event. I need to look into the code
> again to see when these files are created new and when they are appended.
>
>
> Thanks, Arijit
>
>
> ------------------------------
> *From:* Dirceu Semighini Filho <dirceu.semigh...@gmail.com>
> *Sent:* Thursday, November 17, 2016 6:50:28 AM
> *To:* Arijit
> *Cc:* Tathagata Das; user@spark.apache.org
>
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> Hi Arijit,
> Have you find a solution for this? I'm facing the same problem in Spark
> 1.6.1, but here the error happens only a few times, so our hdfs does
> support append.
> This is what I can see in the logs:
> 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
> WriteAheadLogManager  for Thread: Failed to write to write ahead log after
> 3 failures
>
>
>
>
> 2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com>:
>
>> Thanks TD.
>>
>>
>> Is "hdfs.append.support" a standard configuration? I see a seemingly
>> equivalent configuration "dfs.support.append" that is used in our
>> version of HDFS.
>>
>>
>> In case we want to use a pseudo file-system (like S3)  which does not
>> support append what are our options? I am not familiar with the code yet
>> but is it possible to generate a new file whenever conflict of this sort
>> happens?
>>
>>
>> Thanks again, Arijit
>> ------------------------------
>> *From:* Tathagata Das <tathagata.das1...@gmail.com>
>> *Sent:* Monday, November 7, 2016 7:59:06 PM
>> *To:* Arijit
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Spark Streaming Data loss on failure to write
>> BlockAdditionEvent failure to WAL
>>
>> For WAL in Spark to work with HDFS, the HDFS version you are running must
>> support file appends. Contact your HDFS package/installation provider to
>> figure out whether this is supported by your HDFS installation.
>>
>> On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote:
>>
>>> Hello All,
>>>
>>>
>>> We are using Spark 1.6.2 with WAL enabled and encountering data loss
>>> when the following exception/warning happens. We are using HDFS as our
>>> checkpoint directory.
>>>
>>>
>>> Questions are:
>>>
>>>
>>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>>> like the following. Which file already exist or who is suppose to set
>>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>>
>>>
>>> private[streaming] object HdfsUtils {
>>>
>>>   def getOutputStream(path: String, conf: Configuration): 
>>> FSDataOutputStream = {
>>>     val dfsPath = new Path(path)
>>>     val dfs = getFileSystemForPath(dfsPath, conf)
>>>     // If the file exists and we have append support, append instead of 
>>> creating a new file
>>>     val stream: FSDataOutputStream = {
>>>       if (dfs.isFile(dfsPath)) {
>>>         if (conf.getBoolean("hdfs.append.support", false) || 
>>> dfs.isInstanceOf[RawLocalFileSystem]) {
>>>           dfs.append(dfsPath)
>>>         } else {
>>>           throw new IllegalStateException("File exists and there is no 
>>> append support!")
>>>         }
>>>       } else {
>>>         dfs.create(dfsPath)
>>>       }
>>>     }
>>>     stream
>>>   }
>>>
>>>
>>> 2. Why does the job not retry and eventually fail when this error
>>> occurs? The job skips processing the exact number of events dumped in the
>>> log. For this particular example I see 987 + 4686 events were not processed
>>> and are lost for ever (does not recover even on restart).
>>>
>>>
>>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to
>>> write to write ahead log after 3 failures
>>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>>> tPromise@5ce88cb6), Record(
>>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987
>>> ),None,WriteAheadLogBasedStoreResult(input-
>>> 2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
>>> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-14785
>>> 53818621-1478553878621,0,41597)))) to the WriteAheadLog.
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>>> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686
>>> ),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),Fi
>>> leBasedWriteAheadLogSegment(hdfs:
>>> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-147
>>> 8553818624-1478553878624,0,197473)))) to the WriteAheadLog.
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>>         at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .<init>(FileBasedWriteAheadLogWriter.scala:41)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>>> gWriter(FileBasedWriteAheadLog.scala:217)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:86)
>>>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>>> (FileBasedWriteAheadLog.scala:48)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>>> BatchedWriteAheadLog.scala:173)
>>>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1
>>> .run(BatchedWriteAheadLog.scala:140)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
>>> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>>>
>>> I am sure Spark Streaming is not expected to lose data when WAL is
>>> enabled. So what are we doing wrong here?
>>>
>>> Thanks, Arijit
>>>
>>>
>>
>

Reply via email to