Actually, the current WAL implementation (as of Spark 1.5) does not work
with S3 because S3 does not support flushing. Basically, the current
implementation assumes that after write + flush, the data is immediately
durable, and readable if the system crashes without closing the WAL file.
This does not work with S3 as data is durable only and only if the S3 file
output stream is cleanly closed.




On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I assume you don't use Kinesis.
>
> Are you running Spark 1.5.0 ?
> If you must use S3, is switching to Kinesis possible ?
>
> Cheers
>
> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia <mici...@gmail.com>
> wrote:
>
>> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>>
>> It seems as a known issue:
>> https://issues.apache.org/jira/browse/SPARK-9215
>>
>> I am getting this exception when reading write ahead log:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
>> org.apache.spark.SparkException: Could not read data from write ahead log
>> record
>> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
>>         at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>         at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>>         at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
>>         at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>         ... 13 more
>>
>>
>>
>

Reply via email to