For WAL in Spark to work with HDFS, the HDFS version you are running must
support file appends. Contact your HDFS package/installation provider to
figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote:

> Hello All,
>
>
> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
> the following exception/warning happens. We are using HDFS as our
> checkpoint directory.
>
>
> Questions are:
>
>
> 1. Is this a bug in Spark or issue with our configuration? Source looks
> like the following. Which file already exist or who is suppose to set
> hdfs.append.support configuration? Why doesn't it happen all the time?
>
>
> private[streaming] object HdfsUtils {
>
>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
> = {
>     val dfsPath = new Path(path)
>     val dfs = getFileSystemForPath(dfsPath, conf)
>     // If the file exists and we have append support, append instead of 
> creating a new file
>     val stream: FSDataOutputStream = {
>       if (dfs.isFile(dfsPath)) {
>         if (conf.getBoolean("hdfs.append.support", false) || 
> dfs.isInstanceOf[RawLocalFileSystem]) {
>           dfs.append(dfsPath)
>         } else {
>           throw new IllegalStateException("File exists and there is no append 
> support!")
>         }
>       } else {
>         dfs.create(dfsPath)
>       }
>     }
>     stream
>   }
>
>
> 2. Why does the job not retry and eventually fail when this error occurs?
> The job skips processing the exact number of events dumped in the log. For
> this particular example I see 987 + 4686 events were not processed and are
> lost for ever (does not recover even on restart).
>
>
> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
> to write ahead log after 3 failures
> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$
> DefaultPromise@5ce88cb6), Record(
> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,
> WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),
> FileBasedWriteAheadLogSegment(hdfs://
> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-
> 1478553818621-1478553878621,0,41597)))) to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,
> WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),
> FileBasedWriteAheadLogSegment(hdfs:
> //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-
> 1478553818624-1478553878624,0,197473)))) to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
>         at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
>         at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
>         at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in
> memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)
>
> I am sure Spark Streaming is not expected to lose data when WAL is
> enabled. So what are we doing wrong here?
>
> Thanks, Arijit
>
>

Reply via email to