For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation.
On Mon, Nov 7, 2016 at 2:04 PM, Arijit <arij...@live.com> wrote: > Hello All, > > > We are using Spark 1.6.2 with WAL enabled and encountering data loss when > the following exception/warning happens. We are using HDFS as our > checkpoint directory. > > > Questions are: > > > 1. Is this a bug in Spark or issue with our configuration? Source looks > like the following. Which file already exist or who is suppose to set > hdfs.append.support configuration? Why doesn't it happen all the time? > > > private[streaming] object HdfsUtils { > > def getOutputStream(path: String, conf: Configuration): FSDataOutputStream > = { > val dfsPath = new Path(path) > val dfs = getFileSystemForPath(dfsPath, conf) > // If the file exists and we have append support, append instead of > creating a new file > val stream: FSDataOutputStream = { > if (dfs.isFile(dfsPath)) { > if (conf.getBoolean("hdfs.append.support", false) || > dfs.isInstanceOf[RawLocalFileSystem]) { > dfs.append(dfsPath) > } else { > throw new IllegalStateException("File exists and there is no append > support!") > } > } else { > dfs.create(dfsPath) > } > } > stream > } > > > 2. Why does the job not retry and eventually fail when this error occurs? > The job skips processing the exact number of events dumped in the log. For > this particular example I see 987 + 4686 events were not processed and are > lost for ever (does not recover even on restart). > > > 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write > to write ahead log after 3 failures > 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer > failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 > lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$ > DefaultPromise@5ce88cb6), Record( > java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala. > concurrent.impl.Promise$DefaultPromise@6d8f1feb)) > java.lang.IllegalStateException: File exists and there is no append > support! > at org.apache.spark.streaming.util.HdfsUtils$. > getOutputStream(HdfsUtils.scala:35) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$ > stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream( > FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.< > init>(FileBasedWriteAheadLogWriter.scala:41) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > getLogWriter(FileBasedWriteAheadLog.scala:217) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:86) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:48) > at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$ > apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( > BatchedWriteAheadLog.scala:173) > at org.apache.spark.streaming.util.BatchedWriteAheadLog$$ > anon$1.run(BatchedWriteAheadLog.scala:140) > at java.lang.Thread.run(Thread.java:745) > 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while > writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None, > WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987), > FileBasedWriteAheadLogSegment(hdfs:// > mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log- > 1478553818621-1478553878621,0,41597)))) to the WriteAheadLog. > java.lang.IllegalStateException: File exists and there is no append > support! > at org.apache.spark.streaming.util.HdfsUtils$. > getOutputStream(HdfsUtils.scala:35) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$ > stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream( > FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.< > init>(FileBasedWriteAheadLogWriter.scala:41) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > getLogWriter(FileBasedWriteAheadLog.scala:217) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:86) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:48) > at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$ > apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( > BatchedWriteAheadLog.scala:173) > at org.apache.spark.streaming.util.BatchedWriteAheadLog$$ > anon$1.run(BatchedWriteAheadLog.scala:140) > at java.lang.Thread.run(Thread.java:745) > 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while > writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None, > WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686), > FileBasedWriteAheadLogSegment(hdfs: > //mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log- > 1478553818624-1478553878624,0,197473)))) to the WriteAheadLog. > java.lang.IllegalStateException: File exists and there is no append > support! > at org.apache.spark.streaming.util.HdfsUtils$. > getOutputStream(HdfsUtils.scala:35) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$ > stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream( > FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.< > init>(FileBasedWriteAheadLogWriter.scala:41) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > getLogWriter(FileBasedWriteAheadLog.scala:217) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:86) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:48) > at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$ > apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( > BatchedWriteAheadLog.scala:173) > at org.apache.spark.streaming.util.BatchedWriteAheadLog$$ > anon$1.run(BatchedWriteAheadLog.scala:140) > at java.lang.Thread.run(Thread.java:745) > 16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in > memory on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB) > > I am sure Spark Streaming is not expected to lose data when WAL is > enabled. So what are we doing wrong here? > > Thanks, Arijit > >