Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)? Trying to answer this question, I looked into Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would probably be calling the S3 LIST operation. S3 LIST is prone to eventual consistency [2]. What would happen when getCheckpointFiles retrieves an incomplete list of files to Checkpoint.read [1]?
The pluggable WAL interface allows me to work around the eventual consistency of S3 by storing an index of filenames in DynamoDB. However it seems that something similar is required for checkpoints as well. I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is there something I can borrow from DirectKafkaInputDStream? After a DStream computes an RDD, is there a way for the DStream to tell when processing of that RDD has been finished and only after that delete the SQS messages. I was also considering Amazon EFS, but it is only available in a single region for a preview. EBS could be an option, but it cannot be used across multiple Availability Zones. [1]: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala [2]: http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel On 22 September 2015 at 21:09, Tathagata Das <t...@databricks.com> wrote: > You can keep the checkpoints in the Hadoop-compatible file system and the > WAL somewhere else using your custom WAL implementation. Yes, cleaning up > the stuff gets complicated as it is not as easy as deleting off the > checkpoint directory - you will have to clean up checkpoint directory as > well as the whatever other storage that your custom WAL uses. However, if I > remember correctly, the WAL information is used only when the Dstreams are > recovered correctly from checkpoints. > > Note that, there are further details here that require deeper > understanding. There are actually two uses of WALs in the system - > > 1. Data WAL for received data - This is what is usually referred to as > the WAL everywhere. Each receiver writes to a different WAL. This deals > with bulk data. > 2. Metadata WAL - This is used by the driver to save metadata information > like block to data WAL segment mapping, etc. I usually skip mentioning > this. This WAL is automatically used when data WAL is enabled. And this > deals with small data. > > If you have to get around S3's limitations, you will have to plugin both > WALs (see this > <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala> > for SparkConfs, but not that we havent made these confs public). While the > system supports plugging them in, we havent made this information public > yet because of such complexities in working with it. And we have invested > time in making common sources like Kafka not require WALs (e.g. Direct > Kafka approach). In future, we do hope to have a better solution for > general receivers + WALs + S3 (personally, I really wish S3's semantics > improve and fixes this issue). > > Another alternative direction may be Amazon EFS. Since it based on EBS, it > may give the necessary semantics. But I havent given that a spin, so its > uncharted territory :) > > TD > > > On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia <mici...@gmail.com> > wrote: > >> My understanding of pluggable WAL was that it eliminates the need for >> having a Hadoop-compatible file system [1]. >> >> What is the use of pluggable WAL when it can be only used together with >> checkpointing which still requires a Hadoop-compatible file system? >> >> [1]: https://issues.apache.org/jira/browse/SPARK-7056 >> >> >> >> On 22 September 2015 at 19:57, Tathagata Das <tathagata.das1...@gmail.com >> > wrote: >> >>> 1. Currently, the WAL can be used only with checkpointing turned on, >>> because it does not make sense to recover from WAL if there is not >>> checkpoint information to recover from. >>> >>> 2. Since the current implementation saves the WAL in the checkpoint >>> directory, they share the fate -- if checkpoint directory is deleted, then >>> both checkpoint info and WAL info is deleted. >>> >>> 3. Checkpointing is currently not pluggable. Why do do you want that? >>> >>> >>> >>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia <mici...@gmail.com> >>> wrote: >>> >>>> I am trying to use pluggable WAL, but it can be used only with >>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file >>>> system. >>>> >>>> Is there something like pluggable checkpointing? >>>> >>>> Or can WAL be used without checkpointing? What happens when WAL is >>>> available but the checkpoint directory is lost? >>>> >>>> Thanks! >>>> >>>> >>>> On 18 September 2015 at 05:47, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> I dont think it would work with multipart upload either. The file is >>>>> not visible until the multipart download is explicitly closed. So even if >>>>> each write a part upload, all the parts are not visible until the multiple >>>>> download is closed. >>>>> >>>>> TD >>>>> >>>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran < >>>>> ste...@hortonworks.com> wrote: >>>>> >>>>>> >>>>>> > On 17 Sep 2015, at 21:40, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> > >>>>>> > Actually, the current WAL implementation (as of Spark 1.5) does not >>>>>> work with S3 because S3 does not support flushing. Basically, the current >>>>>> implementation assumes that after write + flush, the data is immediately >>>>>> durable, and readable if the system crashes without closing the WAL file. >>>>>> This does not work with S3 as data is durable only and only if the S3 >>>>>> file >>>>>> output stream is cleanly closed. >>>>>> > >>>>>> >>>>>> >>>>>> more precisely, unless you turn multipartition uploads on, the >>>>>> S3n/s3a clients Spark uses *doesn't even upload anything to s3*. >>>>>> >>>>>> It's not a filesystem, and you have to bear that in mind. >>>>>> >>>>>> Amazon's own s3 client used in EMR behaves differently; it may be >>>>>> usable as a destination (I haven't tested) >>>>>> >>>>>> >>>>> >>>> >>> >> >