[ https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985040#comment-15985040 ]
Steve Loughran commented on SPARK-7481: --------------------------------------- (This is a fairly long comment, but it tries to summarise the entire state of interaction with object stores, esp. S3A on Hadoop 2.8+. Azure is simpler, GCS: google's problem. Swift. not used very much). If you look at object store & Spark (or indeed, any code which uses a filesystem as the source and dest of work), there are problems which can generally be grouped into various categories. h3. Foundational: talking to the object stores classpath & execution: can you wire the JARs up? Longstanding issue in ASF Spark releases (SPARK-5348, SPARK-12557). This was exacerbated by the movement of S3n:// to the hadoop-aws-package (FWIW, I hadn't noticed that move, I'd have blocked it if I'd been paying attention). This includes transitive problems (SPARK-11413) Credential propagation. Spark's env var propagation is pretty cute here; SPARK-19739 picks up {{AWS_SESSION_TOKEN}} too. Diagnostics on failure is a real pain. h3. Observable Inconsistencies leading to Data loss Generally where the metaphor "it's just a filesystem" fail. These are bad because they often "just work", especially in dev & Test with small datasets, and when they go wrong, they can fail by generating bad results *and nobody notices*. * Expectations of consistent listing of "directories" S3Guard deals with this, HADOOP-13345, as can Netflix's S3mper and AWS's premium Dynamo backed S3 storage. * Expectations on the transacted nature of Directory renames, the core atomic commit operations against full filesystems. * Expectations that when things are deleted they go away. This does become visible sometimes, usually in checks for a destination not existing (SPARK-19013) * Expectations that write-in-progress data is visible/flushed, that {{close()}} is low cost. SPARK-19111. Committing pretty much combines all of these, see below for more details. h3. Aggressively bad performance That's the mismatch between what the object store offers, what the apps expect, and the metaphor work in the Hadooop FileSystem implementations, which, in trying to hide the conceptual mismatch can actually amplify the problem. Example: Directory tree scanning at the start of a query. The mock directory structure allows callers to do treewalks, when really a full list of all children can be done as a direct O(1) call. SPARK-17159 covers some of this for scanning directories in Spark Streaming, but there's a hidden tree walk in every call to {{FileSystem.globStatus()}} (HADOOP-13371). Given how S3Guard transforms this treewalk, and you need it for consistency, that's probably the best solution for now. Although I have a PoC which does a full List **/* followed by a filter, that's not viable when you have a wide deep tree and do need to prune aggressively. Checkpointing to object stores is similar: it's generally not dangerous to do the write+rename, just adds the copy overhead, consistency issues notwithstanding. h3. Suboptimal code. There's opportunities for speedup, but if it's not on the critical path, not worth the hassle. That said, as every call to {{getFileStatus()}} can take hundreds of millis, they get onto the critical path quite fast. Example checks for a file existing before calling {{fs.delete(path)}} (this is always a no-op if the dest path isn't there), and the equivalent on mkdirs: {{if (!fs.exists(dir) fs.mkdirs(path)}}. Hadoop 3.0 will help steer people on the path of righteousness there by deprecating a couple of methods which encourage inefficiencies (isFile/isDir). h3. The commit problem The full commit problem combines all of these: you need a consistent list of source data, your deleted destination path musn't appear in listings, the commit of each task must promote a task's work to the pending output of the job; an abort must leave no trace of it. The final job commit must place data into the final destination, again, job abort not make any output visible. There's some ambiguity about what happens if task and job commits fails; generally the safest is "abort everything". Futhermore nobody has any idea what to do if an {{abort()}} raises exceptions. Oh, and all of this must be fast. Spark is no better or worse than the core MapReduce committers here, or that of Hive. Spark generally uses the Hadoop {{FileOutputFormat}} via the {{HadoopMapReduceCommitProtocol}}, directly or indirectly (e.g {{ParquetOutputFormat}}), extracting its committer and casting it to {{FileOutputCommitter}}, primarily to get a working directory. This committer assumes the destination is a consistent FS, uses renames when promoting task and job output, assuming that is so fast it doesn't even bother to log a message "about to rename". Hence the recurrent Stack Overflow question "why does my S3 job hang at the end with no data being uploaded and nothing logged". It's blocked while data is copied inside S3 at 10MB/s. The direct output committer addressed the rename delays, but at the expense of the requirements of "Commit": nothing visible until the job is completed, hence is deletion SPARK-10063. Using the "v2 commit algorithm" is somewhat faster (SPARK-20107), but still prone to generating bad results in the presence of list inconsistency. Hadoop's {{FileOutputCommitter}} is horribly convoluted code with two different commit strategies intermingled into a co-recursive nightmare that doesn't make sense, even when stepped through line by line. Believe me. Trying to subclass it is an exercise in pain and suffering. Netflix have had to do exactly that with their [S3 committer|https://github.com/rdblue/s3committer]. This is in use in production, save minutes on queries and is resilient to failure. Here work is written to a local dir, copied in a multipart upload direct to the destination directory, an upload which is only completed when the job is committed. There's the delay for uploads in task commit, but job commit is a parallelizable set of short POSTs. The (incomplete) S3Guard "magic" committer does some things in the FS, namely remaps a path of the form {{s3a://data/.magic/job_0001_001/task001}} to a multipart write to {{s3a://dest}}, saving the data for the final commit to into that .magic dir and then completing things in task commit. I've stopped work on that, pulling in the Netfix committer as the "Staging Committer" instead. Why? they've been using it, so has the lovely attribute "works". Also, it doesn't need the consistent FS of S3Guard, so works with an inconsistent S3 bucket as a destination of work (not a source though, due to listing inconsistencies). See [S3A Committer|https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md] for the details, follow HADOOP-13786 for the development work. If you can get your classpath right, this already works at "demo scale"" for Spark writing everything but Parquet data (ORC, avro, CSV, ...). There's s bit of hard-codedness in the {{ParquetFileFormat}} setup which stops Parquet here, but that's tractable in about 10 LoC. That is: if you get your CP right and have Hadoop trunk + patches, you get O(1) zero-rename commits to S3 in Spark without touching any of Spark's source code. You still have to deal with the consistency problem on followup queries though, so either run against a consistent S3 implementation, or run with S3Guard. Other strategies for addressing the commit problem: * Databricks are doing commit-as-a-service within AWS; I've got not details there other than the talk @ Spark Summit East. * Hive: see HIVE-14269. Long term they're thinking of moving off simple "listFiles() as index of record" and use some manifest file which can be written in a short PUT, read similarly (though it needs a story w.r.t update inconsistency) * Azure WASB is a consistent store with fast rename. Works out the box :) h3. Putting it all together Don't use S3 as a destination for your work, not unless you have something supporting the commit. For that: get involved with HADOOP-13786. Classpaths: yes, needs to be done somehow. > Add spark-hadoop-cloud module to pull in object store support > ------------------------------------------------------------- > > Key: SPARK-7481 > URL: https://issues.apache.org/jira/browse/SPARK-7481 > Project: Spark > Issue Type: Improvement > Components: Build > Affects Versions: 2.1.0 > Reporter: Steve Loughran > > To keep the s3n classpath right, to add s3a, swift & azure, the dependencies > of spark in a 2.6+ profile need to add the relevant object store packages > (hadoop-aws, hadoop-openstack, hadoop-azure) -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org