[ 
https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985040#comment-15985040
 ] 

Steve Loughran commented on SPARK-7481:
---------------------------------------

(This is a fairly long comment, but it tries to summarise the entire state of 
interaction with object stores, esp. S3A on Hadoop 2.8+. Azure is simpler, GCS: 
google's problem. Swift. not used very much).

If you look at object store & Spark (or indeed, any code which uses a 
filesystem as the source and dest of work), there are problems which can 
generally be grouped into various categories.

h3. Foundational: talking to the object stores

classpath & execution: can you wire the JARs up? Longstanding issue in ASF 
Spark releases (SPARK-5348, SPARK-12557). This was exacerbated by the movement 
of S3n:// to the hadoop-aws-package (FWIW, I hadn't noticed that move, I'd have 
blocked it if I'd been paying attention). This includes transitive problems 
(SPARK-11413)

Credential propagation. Spark's env var propagation is pretty cute here; 
SPARK-19739 picks up {{AWS_SESSION_TOKEN}} too. Diagnostics on failure is a 
real pain.


h3. Observable Inconsistencies leading to Data loss

Generally where the metaphor "it's just a filesystem" fail. These are bad 
because they often "just work", especially in dev & Test with small datasets, 
and when they go wrong, they can fail by generating bad results *and nobody 
notices*.

* Expectations of consistent listing of "directories" S3Guard deals with this, 
HADOOP-13345, as can Netflix's S3mper and AWS's premium Dynamo backed S3 
storage.
* Expectations on the transacted nature of Directory renames, the core atomic 
commit operations against full filesystems.
* Expectations that when things are deleted they go away. This does become 
visible sometimes, usually in checks for a destination not existing 
(SPARK-19013)
* Expectations that write-in-progress data is visible/flushed, that {{close()}} 
is low cost. SPARK-19111.

Committing pretty much combines all of these, see below for more details.

h3. Aggressively bad performance

That's the mismatch between what the object store offers, what the apps expect, 
and the metaphor work in the Hadooop FileSystem implementations, which, in 
trying to hide the conceptual mismatch can actually amplify the problem. 
Example: Directory tree scanning at the start of a query. The mock directory 
structure allows callers to do treewalks, when really a full list of all 
children can be done as a direct O(1) call. SPARK-17159 covers some of this for 
scanning directories in Spark Streaming, but there's a hidden tree walk in 
every call to {{FileSystem.globStatus()}} (HADOOP-13371). Given how S3Guard 
transforms this treewalk, and you need it for consistency, that's probably the 
best solution for now. Although I have a PoC which does a full List **/* 
followed by a filter, that's not viable when you have a wide deep tree and do 
need to prune aggressively.

Checkpointing to object stores is similar: it's generally not dangerous to do 
the write+rename, just adds the copy overhead, consistency issues 
notwithstanding.


h3. Suboptimal code. 

There's opportunities for speedup, but if it's not on the critical path, not 
worth the hassle. That said, as every call to {{getFileStatus()}} can take 
hundreds of millis, they get onto the critical path quite fast. Example checks 
for a file existing before calling {{fs.delete(path)}} (this is always a no-op 
if the dest path isn't there), and the equivalent on mkdirs: {{if 
(!fs.exists(dir) fs.mkdirs(path)}}. Hadoop 3.0 will help steer people on the 
path of righteousness there by deprecating a couple of methods which encourage 
inefficiencies (isFile/isDir).


h3. The commit problem 

The full commit problem combines all of these: you need a consistent list of 
source data, your deleted destination path musn't appear in listings, the 
commit of each task must promote a task's work to the pending output of the 
job; an abort must leave no trace of it. The final job commit must place data 
into the final destination, again, job abort not make any output visible. 
There's some ambiguity about what happens if task and job commits fails; 
generally the safest is "abort everything". Futhermore nobody has any idea what 
to do if an {{abort()}} raises exceptions. Oh, and all of this must be fast. 
Spark is no better or worse than the core MapReduce committers here, or that of 
Hive.

Spark generally uses the Hadoop {{FileOutputFormat}} via the 
{{HadoopMapReduceCommitProtocol}}, directly or indirectly (e.g 
{{ParquetOutputFormat}}), extracting its committer and casting it to 
{{FileOutputCommitter}}, primarily to get a working directory. This committer 
assumes the destination is a consistent FS, uses renames when promoting task 
and job output, assuming that is so fast it doesn't even bother to log a 
message "about to rename". Hence the recurrent Stack Overflow question "why 
does my S3 job hang at the end with no data being uploaded and nothing logged". 
It's blocked while data is copied inside S3 at 10MB/s. The direct output 
committer addressed the rename delays, but at the expense of the requirements 
of "Commit": nothing visible until the job is completed, hence is deletion 
SPARK-10063. Using the "v2 commit algorithm" is somewhat faster (SPARK-20107), 
but still prone to generating bad results in the presence of list inconsistency.

Hadoop's {{FileOutputCommitter}} is horribly convoluted code with two different 
commit strategies intermingled into a co-recursive nightmare that doesn't make 
sense, even when stepped through line by line. Believe me. Trying to subclass 
it is an exercise in pain and suffering. 

Netflix have had to do exactly that with their [S3 
committer|https://github.com/rdblue/s3committer]. This is in use in production, 
save minutes on queries and is resilient to failure. Here work is written to a 
local dir, copied in a multipart upload direct to the destination directory, an 
upload which is only completed when the job is committed. There's the delay for 
uploads in task commit, but job commit is a parallelizable set of short POSTs.

The (incomplete) S3Guard "magic" committer does some things in the FS, namely 
remaps a path of the form {{s3a://data/.magic/job_0001_001/task001}} to a 
multipart write to {{s3a://dest}}, saving the data for the final commit to into 
that .magic dir and then completing things in task commit. I've stopped work on 
that, pulling in the Netfix committer as the "Staging Committer" instead. Why? 
they've been using it, so has the lovely attribute "works". Also, it doesn't 
need the consistent FS of S3Guard, so works with an inconsistent S3 bucket as a 
destination of work (not a source though, due to listing inconsistencies). 

See [S3A 
Committer|https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md]
 for the details, follow HADOOP-13786 for the development work. If you can get 
your classpath right, this already works at "demo scale"" for Spark writing 
everything but Parquet data (ORC, avro, CSV, ...). There's s bit of 
hard-codedness in the {{ParquetFileFormat}} setup which stops Parquet here, but 
that's tractable in about 10 LoC.

That is: if you get your CP right and have Hadoop trunk + patches, you get O(1) 
zero-rename commits to S3 in Spark without touching any of Spark's source code. 
You still have to deal with the consistency problem on followup queries though, 
so either run against a consistent S3 implementation, or run with S3Guard.

Other strategies for addressing the commit problem:

* Databricks are doing commit-as-a-service within AWS; I've got not details 
there other than the talk @ Spark Summit East.
* Hive: see HIVE-14269. Long term they're thinking of moving off simple 
"listFiles() as index of record" and use some manifest file which can be 
written in a short PUT, read similarly (though it needs a story w.r.t update 
inconsistency)
* Azure WASB is a consistent store with fast rename. Works out the box :)

h3. Putting it all together

Don't use S3 as a destination for your work, not unless you have something 
supporting the commit. For that: get involved with HADOOP-13786. Classpaths: 
yes, needs to be done somehow. 


> Add spark-hadoop-cloud module to pull in object store support
> -------------------------------------------------------------
>
>                 Key: SPARK-7481
>                 URL: https://issues.apache.org/jira/browse/SPARK-7481
>             Project: Spark
>          Issue Type: Improvement
>          Components: Build
>    Affects Versions: 2.1.0
>            Reporter: Steve Loughran
>
> To keep the s3n classpath right, to add s3a, swift & azure, the dependencies 
> of spark in a 2.6+ profile need to add the relevant object store packages 
> (hadoop-aws, hadoop-openstack, hadoop-azure)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to