Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
I got this info. from a hadoop jira ticket: https://issues.apache.org/jira/browse/MAPREDUCE-5485 // maropu On Sat, Oct 1, 2016 at 7:14 PM, Igor Bermanwrote: > Takeshi, why are you saying this, how have you checked it's only used from > 2.7.3? > We use spark 2.0 which is shipped with hadoop dependency of 2.7.2 and we > use this setting. > We've sort of "verified" it's used by configuring log of file output > commiter > > On 30 September 2016 at 03:12, Takeshi Yamamuro > wrote: > >> Hi, >> >> FYI: Seems >> `sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)` >> is only available at hadoop-2.7.3+. >> >> // maropu >> >> >> On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal wrote: >> >>> You can use partition explicitly by adding "/=>> value>" to >>> the end of the path you are writing to and then use overwrite. >>> >>> BTW in Spark 2.0 you just need to use: >>> >>> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.al >>> gorithm.version","2”) >>> and use s3a:// >>> >>> and you can work with regular output committer (actually >>> DirectParquetOutputCommitter is no longer available in Spark 2.0) >>> >>> so if you are planning on upgrading this could be another motivation >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-Partit >>> ionBy-SaveMode-Append-tp26398p27810.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Takeshi, why are you saying this, how have you checked it's only used from 2.7.3? We use spark 2.0 which is shipped with hadoop dependency of 2.7.2 and we use this setting. We've sort of "verified" it's used by configuring log of file output commiter On 30 September 2016 at 03:12, Takeshi Yamamurowrote: > Hi, > > FYI: Seems > `sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)` > is only available at hadoop-2.7.3+. > > // maropu > > > On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal wrote: > >> You can use partition explicitly by adding "/=> value>" to >> the end of the path you are writing to and then use overwrite. >> >> BTW in Spark 2.0 you just need to use: >> >> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.al >> gorithm.version","2”) >> and use s3a:// >> >> and you can work with regular output committer (actually >> DirectParquetOutputCommitter is no longer available in Spark 2.0) >> >> so if you are planning on upgrading this could be another motivation >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-Partit >> ionBy-SaveMode-Append-tp26398p27810.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > --- > Takeshi Yamamuro >
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Hi, FYI: Seems `sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)` is only available at hadoop-2.7.3+. // maropu On Thu, Sep 29, 2016 at 9:28 PM, joffe.talwrote: > You can use partition explicitly by adding "/=" > to > the end of the path you are writing to and then use overwrite. > > BTW in Spark 2.0 you just need to use: > > sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter. > algorithm.version","2”) > and use s3a:// > > and you can work with regular output committer (actually > DirectParquetOutputCommitter is no longer available in Spark 2.0) > > so if you are planning on upgrading this could be another motivation > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter- > PartitionBy-SaveMode-Append-tp26398p27810.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- --- Takeshi Yamamuro
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
You can use partition explicitly by adding "/=" to the end of the path you are writing to and then use overwrite. BTW in Spark 2.0 you just need to use: sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”) and use s3a:// and you can work with regular output committer (actually DirectParquetOutputCommitter is no longer available in Spark 2.0) so if you are planning on upgrading this could be another motivation -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-PartitionBy-SaveMode-Append-tp26398p27810.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Thanks for the clarification, Gourav. > On Mar 6, 2016, at 3:54 AM, Gourav Senguptawrote: > > Hi Ted, > > There was no idle time after I changed the path to start with s3a and then > ensured that the number of executors writing were large. The writes start and > complete in about 5 mins or less. > > Initially the write used to complete by around 30 mins and we could see that > there were failure messages all over the place for another 20 mins after > which we killed jupyter application. > > > Regards, > Gourav Sengupta > >> On Sun, Mar 6, 2016 at 11:48 AM, Ted Yu wrote: >> Gourav: >> For the 3rd paragraph, did you mean the job seemed to be idle for about 5 >> minutes ? >> >> Cheers >> >>> On Mar 6, 2016, at 3:35 AM, Gourav Sengupta >>> wrote: >>> >>> Hi, >>> >>> This is a solved problem, try using s3a instead and everything will be fine. >>> >>> Besides that you might want to use coalesce or partitionby or repartition >>> in order to see how many executors are being used to write (that speeds >>> things up quite a bit). >>> >>> We had a write issue taking close to 50 min which is not running for lower >>> than 5 minutes. >>> >>> >>> Regards, >>> Gourav Sengupta >>> On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov wrote: Working on a streaming job with DirectParquetOutputCommitter to S3 I need to use PartitionBy and hence SaveMode.Append Apparently when using SaveMode.Append spark automatically defaults to the default parquet output committer and ignores DirectParquetOutputCommitter. My problems are: 1. the copying to _temporary takes alot of time 2. I get job failures with: java.io.FileNotFoundException: File s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does not exist. I have set: sparkConfig.set("spark.speculation", "false") sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") sc.hadoopConfiguration.set("mapreduce.reduce.speculative", "false") Any ideas? Opinions? Best practices? >
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Hi Ted, There was no idle time after I changed the path to start with s3a and then ensured that the number of executors writing were large. The writes start and complete in about 5 mins or less. Initially the write used to complete by around 30 mins and we could see that there were failure messages all over the place for another 20 mins after which we killed jupyter application. Regards, Gourav Sengupta On Sun, Mar 6, 2016 at 11:48 AM, Ted Yuwrote: > Gourav: > For the 3rd paragraph, did you mean the job seemed to be idle for about 5 > minutes ? > > Cheers > > On Mar 6, 2016, at 3:35 AM, Gourav Sengupta > wrote: > > Hi, > > This is a solved problem, try using s3a instead and everything will be > fine. > > Besides that you might want to use coalesce or partitionby or repartition > in order to see how many executors are being used to write (that speeds > things up quite a bit). > > We had a write issue taking close to 50 min which is not running for lower > than 5 minutes. > > > Regards, > Gourav Sengupta > > On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov > wrote: > >> Working on a streaming job with DirectParquetOutputCommitter to S3 >> I need to use PartitionBy and hence SaveMode.Append >> >> Apparently when using SaveMode.Append spark automatically defaults to the >> default parquet output committer and ignores DirectParquetOutputCommitter. >> >> My problems are: >> 1. the copying to _temporary takes alot of time >> 2. I get job failures with: java.io.FileNotFoundException: File >> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does >> not exist. >> >> I have set: >> sparkConfig.set("spark.speculation", "false") >> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") >> sc.hadoopConfiguration.set("mapreduce.reduce.speculative", >> "false") >> >> Any ideas? Opinions? Best practices? >> >> >
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Gourav: For the 3rd paragraph, did you mean the job seemed to be idle for about 5 minutes ? Cheers > On Mar 6, 2016, at 3:35 AM, Gourav Senguptawrote: > > Hi, > > This is a solved problem, try using s3a instead and everything will be fine. > > Besides that you might want to use coalesce or partitionby or repartition in > order to see how many executors are being used to write (that speeds things > up quite a bit). > > We had a write issue taking close to 50 min which is not running for lower > than 5 minutes. > > > Regards, > Gourav Sengupta > >> On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov wrote: >> Working on a streaming job with DirectParquetOutputCommitter to S3 >> I need to use PartitionBy and hence SaveMode.Append >> >> Apparently when using SaveMode.Append spark automatically defaults to the >> default parquet output committer and ignores DirectParquetOutputCommitter. >> >> My problems are: >> 1. the copying to _temporary takes alot of time >> 2. I get job failures with: java.io.FileNotFoundException: File >> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does >> not exist. >> >> I have set: >> sparkConfig.set("spark.speculation", "false") >> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") >> sc.hadoopConfiguration.set("mapreduce.reduce.speculative", "false") >> >> Any ideas? Opinions? Best practices? >
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Hi, This is a solved problem, try using s3a instead and everything will be fine. Besides that you might want to use coalesce or partitionby or repartition in order to see how many executors are being used to write (that speeds things up quite a bit). We had a write issue taking close to 50 min which is not running for lower than 5 minutes. Regards, Gourav Sengupta On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkovwrote: > Working on a streaming job with DirectParquetOutputCommitter to S3 > I need to use PartitionBy and hence SaveMode.Append > > Apparently when using SaveMode.Append spark automatically defaults to the > default parquet output committer and ignores DirectParquetOutputCommitter. > > My problems are: > 1. the copying to _temporary takes alot of time > 2. I get job failures with: java.io.FileNotFoundException: File > s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does > not exist. > > I have set: > sparkConfig.set("spark.speculation", "false") > sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") > sc.hadoopConfiguration.set("mapreduce.reduce.speculative", > "false") > > Any ideas? Opinions? Best practices? > >
Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
it's not safe to use direct committer with append mode, you may loose your data.. On 4 March 2016 at 22:59, Jelez Raditchkovwrote: > Working on a streaming job with DirectParquetOutputCommitter to S3 > I need to use PartitionBy and hence SaveMode.Append > > Apparently when using SaveMode.Append spark automatically defaults to the > default parquet output committer and ignores DirectParquetOutputCommitter. > > My problems are: > 1. the copying to _temporary takes alot of time > 2. I get job failures with: java.io.FileNotFoundException: File > s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does > not exist. > > I have set: > sparkConfig.set("spark.speculation", "false") > sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") > sc.hadoopConfiguration.set("mapreduce.reduce.speculative", > "false") > > Any ideas? Opinions? Best practices? > >