[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493293#comment-17493293 ] Itay Bittan commented on SPARK-23977: - Hi all, I follow the [recommendations|https://spark.apache.org/docs/latest/cloud-integration.html#parquet-io-settings] and getting the following warning: {code:java} 2022-02-16 15:22:03.292 WARN FlowThread0 ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level {code} What would be the recommended value for `parquet.summary.metadata.level ` ? > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437259#comment-17437259 ] Steve Loughran commented on SPARK-23977: [~gumartinm] can I draw your attention to Apache Iceberg? meanwhile MAPREDUCE-7341 adds a high performance targeting abfs and gcs; all tree scanning is in task commit, which is atomic; job commit aggressively parallelised and optimized for stores whose listStatusIterator calls are incremental with prefetching: we can start processing at the first page of task manifests found in a listing well the second Page is still being retrieved. Also in there: rate limiting, IO Statistics Collection. HADOOP-17833 I will pick up some of that work, including incremental loading and rate limiting. And if we can keep reads and writes below the S3 IOPS limits, we should be able to avoid situations where we have to start sleeping and re-trying. HADOOP-17981 is my homework this week -emergency work to deal with a rare but current failure in abfs under heavy load. > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436740#comment-17436740 ] Gustavo Martin commented on SPARK-23977: Thank you ver much [~ste...@apache.org] for your explanations. I am experiencing the same problems as [~danzhi] and your comments helped me a lot. Sad magic committer does not work with partition overwrite because it has an amazing performance when writing loads of partitioned data. > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314296#comment-17314296 ] Steve Loughran commented on SPARK-23977: the spark settings don't make it down from sql; you can do more at the RDD API level. The problem is that the spark partition insert logic all relies on renaming which has the O(data) performance penalty as well as the other commit correctness issues. Yes, something to do the pushdown could be done, or with the multipart APIs of HADOOP-13186 give Spark a standard API to implement a zero rename committer in its own code. However, focus is on things like Iceberg and Delta lake, which offer more in terms of : * atomic job commit * avoid the performance and cost issues of relying on directory tree scan as a way to identify source files.; the performance issues of doing all IO down a single shard of S3 storage. I do not disagree with the direction of that work; we have to view the S3A committers (and IBM's Stocator + AWS EMR spark committers) as the final attempts to maintain that "it's just a directory tree" model into a cloud world where directories don't always exist, and listing them is measurable in hundreds of milliseconds. > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311818#comment-17311818 ] Daniel Zhi commented on SPARK-23977: [~ste...@apache.org] Thanks for the info. Below are the related (key, value) we used: # spark.hadoop.fs.s3a.committer.name --- partitioned # spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a --- org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory # spark.sql.sources.commitProtocolClass --- org.apache.spark.internal.io.cloud.PathOutputCommitProtocol # spark.sql.parquet.output.committer.class --- org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter 3 & 4 appear to be necessary to ensure S3A committers being used by Spark for parquet outputs, except that "INSERT OVERWRITE" is blocked by the dynamicPartitionOverwrite exception. It will be helpful and appreciated if you can patiently elaborate on the proper way to "use the partitioned committer and configure it to do the right thing ..." in Spark. > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309397#comment-17309397 ] Steve Loughran commented on SPARK-23977: use the partitioned committer and configure it to do the right thing when updating partitions (merge, delete everything already there, fail) > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308984#comment-17308984 ] Daniel Zhi commented on SPARK-23977: [~ste...@apache.org] How to workaround following exception during the execution of "INSERT OVERWRITE" in spark.sql (spark 3.1.1 with hadoop 3.2)? ``` if (dynamicPartitionOverwrite) { // until there's explicit extensions to the PathOutputCommitProtocols // to support the spark mechanism, it's left to the individual committer // choice to handle partitioning. throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) } ``` > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 3.0.0 > > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609245#comment-16609245 ] Wenchen Fan commented on SPARK-23977: - I'm removing the target version, since we are not going to merge it to 2.4 > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Priority: Minor > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465917#comment-16465917 ] Steve Loughran commented on SPARK-23977: It will need the hadoop-aws module and deoendencies as that is where the core code is. This patch just does the binding to the InsertIntoHadoopFS relation (move to Hadoop MRv2 FileOutputFormat & expect the new superclass, PathOutputCommtter, rather than always a FileOutputcommitter, and for Parquet, something similar with a ParquetOutputCommitter. +its only in Hadoop 3.1, though you can backport to branch-2, especially if you are prepared to bump up the minimum java version to 8 in that branch. t should work on k8s, given it works standalone. All it needs is an endpoint supporting the multipart upload operation of S3, which includes some non-AWS object stores. Look @ the HADOOP-13786 work and the paper [a zero rename committer|https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf]. And there's some integration tests downstream in https://github.com/hortonworks-spark/cloud-integration . I can help set you up to run those, if you email me directly. Essentially: you need to choose which stores to test against from: s3, openstack, azure, and configure them Note that of the two variant committers, "staging" and "magic", the magic one needs a consistent S3 endpoint, which you only get on AWS S3 with an external services, usually dynamo DB based (S3mper, EMR consisent S3, S3Guard). The staging one needs enough local HDD to buffer the output of all active tasks, but doesn't need that consistency for its own query. You will need a plan for chaining together work though, which is inevitably one of "consistency layer" or "wait long enough between writer and reader that you expect the metadata to be consistent" Finally, if you are using spark to write directly to S3 today, without any consistency layer, then your commit algorithm had better not be mimicing directory rename by list + copy + delete. You need this code for safe as well as performant committing of work to S3. > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Priority: Minor > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465344#comment-16465344 ] Wade Jensen commented on SPARK-23977: - [~ste...@apache.org] is the intention of this ticket to incorporate the Hadoop libraries within Spark itself, ie. no Hadoop dependency? Trying to understand whether this is a viable solution for Spark on Kubernetes writing to S3 > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Priority: Minor > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23977) Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism
[ https://issues.apache.org/jira/browse/SPARK-23977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437288#comment-16437288 ] Apache Spark commented on SPARK-23977: -- User 'steveloughran' has created a pull request for this issue: https://github.com/apache/spark/pull/21066 > Add commit protocol binding to Hadoop 3.1 PathOutputCommitter mechanism > --- > > Key: SPARK-23977 > URL: https://issues.apache.org/jira/browse/SPARK-23977 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Steve Loughran >Priority: Minor > > Hadoop 3.1 adds a mechanism for job-specific and store-specific committers > (MAPREDUCE-6823, MAPREDUCE-6956), and one key implementation, S3A committers, > HADOOP-13786 > These committers deliver high-performance output of MR and spark jobs to S3, > and offer the key semantics which Spark depends on: no visible output until > job commit, a failure of a task at an stage, including partway through task > commit, can be handled by executing and committing another task attempt. > In contrast, the FileOutputFormat commit algorithms on S3 have issues: > * Awful performance because files are copied by rename > * FileOutputFormat v1: weak task commit failure recovery semantics as the > (v1) expectation: "directory renames are atomic" doesn't hold. > * S3 metadata eventual consistency can cause rename to miss files or fail > entirely (SPARK-15849) > Note also that FileOutputFormat "v2" commit algorithm doesn't offer any of > the commit semantics w.r.t observability of or recovery from task commit > failure, on any filesystem. > The S3A committers address these by way of uploading all data to the > destination through multipart uploads, uploads which are only completed in > job commit. > The new {{PathOutputCommitter}} factory mechanism allows applications to work > with the S3A committers and any other, by adding a plugin mechanism into the > MRv2 FileOutputFormat class, where it job config and filesystem configuration > options can dynamically choose the output committer. > Spark can use these with some binding classes to > # Add a subclass of {{HadoopMapReduceCommitProtocol}} which uses the MRv2 > classes and {{PathOutputCommitterFactory}} to create the committers. > # Add a {{BindingParquetOutputCommitter extends ParquetOutputCommitter}} > to wire up Parquet output even when code requires the committer to be a > subclass of {{ParquetOutputCommitter}} > This patch builds on SPARK-23807 for setting up the dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org