Thanks Johnny for sharing your experience.  Have you tried to use S3A committer?  Looks like this one is introduced in the latest Hadoop for solving problems with other committers.

https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html

- ND

On 6/22/21 6:41 PM, Johnny Burns wrote:
Hello.

I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been exploring using s3 committers.Currently we first write the data to HDFS and then upload it to S3. However, now with S3 offering strong consistency guarantees, we are evaluating if we can write data directly to S3.

We’re having some troubles with performance, so hoping someone might have some guidance which can unblock this.


    File Format

We are using parquetas the File Format. We do have icebergtables as well, and they are indeed able to commit directly to S3(withminimal local disk usage). We can’t migrate all of our jobs to icebergright now. Hence, we are looking for a committer that is performant and can directly write parquetfiles to S3(withminimal local disk usage).


    What have we tried?

We’ve tried using both the“magic”and“directory”committers. We're setting the following configs (in addition to the "magic/directory" committer.name <http://committer.name>).

"spark.hadoop.fs.s3a.committer.magic.enabled":"true",


"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",

"spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",

"spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",


Both committers have shown performance regressions on large jobs. We’re currently focused on trying to make the directory committer work because we’ve seen /fewer/slowdowns with that one, but I’ll describe the problems with each.

We’ve been testing the committers on a large job with 100k tasks(creating7.3TB of output).


    Observationsfor magic committer


Using the magic committer, we see slowdowns in two places:

  * *S3 Writing****(inside**the task)*

  * The slowdown seems to occur just after the s3 multipart write. The
    finishedWrite
    
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4253>function
    tries to do some cleanup and kicks off
    thisdeleteUnnecessaryFakeDirectories
    
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373>function
    
<https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L4350-L4373>.


  * This causes 503’s due to hitting AWS rate limits on
    com.amazonaws.services.s3.model.DeleteObjectsRequest

  * I'm not sure what directories are actually getting cleaned up here
    (I assume the _magic directories are still needed up until the job
    commit).

  * *Job Commit*

  * Have not dug down into the details here, but assume it is
    something similar to what we’re seeing in the directory committer
    case below.


    Observationsfor directory committer


We’ve observed that the“directory”s3committer performance is on-par with our existing HDFS commit for task execution and task commit. The slowdowns we’re seeing are in the job commit phase.

The job commit happens almost instantaneously in the HDFS case, vs taking about an hour for the s3 directory committer.

We’ve enabled DEBUG logging for the s3 committer. It seems like that hour is mostly spent doing things which you would expect(completing100k delayedComplete s3 uploads). I've attached an example of some of the logs we see repeated over-and-over during the 1 hour job commit (I redacted some of the directories and SHAs but the logs are otherwise unchanged).

One thing I notice is that we see object_delete_requests += 1in the logs. I’m not sure if that means it’s doing an s3 delete, or it is deleting the HDFS manifest files(toclean up the task).


    Alternatives - Should we check out directCommitter?

We’ve also considered using the directCommitter. We understand that the directCommitter is discouraged because it does not support speculative execution(andfor some failure cases). Given that we do not use speculative execution at Stripe, would the directCommitter be a viable option for us? What are the failure scenarios to consider?


    Alternatives - Can S3FileIO work well with parquet files?


Netflix has a tool called s3FileIO <https://iceberg.apache.org/aws/#s3-fileio>. We’re wondering if it can be used with spark, or only with Iceburg.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to