Hi Dirceu,

For the append issue we are setting "hdfs.append.support" (from Spark code 
which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed to 
have solved the issue. Of course we are using HDFS which does support append. I 
think the actual configuration Spark should check is "dfs.support.append".


I believe failure is intermittent since in most cases a new file is created to 
store the block addition event. I need to look into the code again to see when 
these files are created new and when they are appended.


Thanks, Arijit


________________________________
From: Dirceu Semighini Filho <dirceu.semigh...@gmail.com>
Sent: Thursday, November 17, 2016 6:50:28 AM
To: Arijit
Cc: Tathagata Das; user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, 
but here the error happens only a few times, so our hdfs does support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] 
WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 
failures




2016-11-08 14:47 GMT-02:00 Arijit <arij...@live.com<mailto:arij...@live.com>>:

Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent 
configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support 
append what are our options? I am not familiar with the code yet but is it 
possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit

________________________________
From: Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>>
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must 
support file appends. Contact your HDFS package/installation provider to figure 
out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit 
<arij...@live.com<mailto:arij...@live.com>> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    val dfsPath = new Path(path)
    val dfs = getFileSystemForPath(dfsPath, conf)
    // If the file exists and we have append support, append instead of 
creating a new file
    val stream: FSDataOutputStream = {
      if (dfs.isFile(dfsPath)) {
        if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
          dfs.append(dfsPath)
        } else {
          throw new IllegalStateException("File exists and there is no append 
support!")
        }
      } else {
        dfs.create(dfsPath)
      }
    }
    stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597))))
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs:
//mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473))))
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory 
on 10.0.0.11:42316<http://10.0.0.11:42316> (size: 283.1 KB, free: 2.6 GB)

I am sure Spark Streaming is not expected to lose data when WAL is enabled. So 
what are we doing wrong here?

Thanks, Arijit



Reply via email to