Hi,

I created a JIRA and PR for supporting a s3 friendly output committer for
saveAsParquetFile:
https://issues.apache.org/jira/browse/SPARK-6352
https://github.com/apache/spark/pull/5042

My approach is add a DirectParquetOutputCommitter class in spark-sql
package and use a boolean config variable
spark.sql.parquet.useDirectParquetOutputCommitter to choose between default
output committer.
This may not be the smartest solution but it works for me.
Tested on spark 1.1, 1.3 with hadoop 1.0.4.


On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson <ilike...@gmail.com> wrote:

> Yes, unfortunately that direct dependency makes this injection much more
> difficult for saveAsParquetFile.
>
> On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee <pl...@appier.com> wrote:
>
>> Thanks for the DirectOutputCommitter example.
>> However I found it only works for saveAsHadoopFile. What about
>> saveAsParquetFile?
>> It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
>> of FileOutputCommitter.
>>
>> On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor <
>> thomas.dem...@amplidata.com>
>> wrote:
>>
>> > FYI. We're currently addressing this at the Hadoop level in
>> > https://issues.apache.org/jira/browse/HADOOP-9565
>> >
>> >
>> > Thomas Demoor
>> >
>> > On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath <
>> > ddmcbe...@yahoo.com.invalid> wrote:
>> >
>> >> Just to close the loop in case anyone runs into the same problem I had.
>> >>
>> >> By setting --hadoop-major-version=2 when using the ec2 scripts,
>> >> everything worked fine.
>> >>
>> >> Darin.
>> >>
>> >>
>> >> ----- Original Message -----
>> >> From: Darin McBeath <ddmcbe...@yahoo.com.INVALID>
>> >> To: Mingyu Kim <m...@palantir.com>; Aaron Davidson <ilike...@gmail.com
>> >
>> >> Cc: "u...@spark.apache.org" <u...@spark.apache.org>
>> >> Sent: Monday, February 23, 2015 3:16 PM
>> >> Subject: Re: Which OutputCommitter to use for S3?
>> >>
>> >> Thanks.  I think my problem might actually be the other way around.
>> >>
>> >> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
>> >> scripts, I don't specify a
>> >> -hadoop-major-version and the default is 1.   I'm guessing that if I
>> make
>> >> that a 2 that it might work correctly.  I'll try it and post a
>> response.
>> >>
>> >>
>> >> ----- Original Message -----
>> >> From: Mingyu Kim <m...@palantir.com>
>> >> To: Darin McBeath <ddmcbe...@yahoo.com>; Aaron Davidson <
>> >> ilike...@gmail.com>
>> >> Cc: "u...@spark.apache.org" <u...@spark.apache.org>
>> >> Sent: Monday, February 23, 2015 3:06 PM
>> >> Subject: Re: Which OutputCommitter to use for S3?
>> >>
>> >> Cool, we will start from there. Thanks Aaron and Josh!
>> >>
>> >> Darin, it¹s likely because the DirectOutputCommitter is compiled with
>> >> Hadoop 1 classes and you¹re running it with Hadoop 2.
>> >> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1,
>> and it
>> >> became an interface in Hadoop 2.
>> >>
>> >> Mingyu
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On 2/23/15, 11:52 AM, "Darin McBeath" <ddmcbe...@yahoo.com.INVALID>
>> >> wrote:
>> >>
>> >> >Aaron.  Thanks for the class. Since I'm currently writing Java based
>> >> >Spark applications, I tried converting your class to Java (it seemed
>> >> >pretty straightforward).
>> >> >
>> >> >I set up the use of the class as follows:
>> >> >
>> >> >SparkConf conf = new SparkConf()
>> >> >.set("spark.hadoop.mapred.output.committer.class",
>> >> >"com.elsevier.common.DirectOutputCommitter");
>> >> >
>> >> >And I then try and save a file to S3 (which I believe should use the
>> old
>> >> >hadoop apis).
>> >> >
>> >> >JavaPairRDD<Text, Text> newBaselineRDDWritable =
>> >> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>> >> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>> >> >Text.class, Text.class, SequenceFileOutputFormat.class,
>> >> >org.apache.hadoop.io.compress.GzipCodec.class);
>> >> >
>> >> >But, I get the following error message.
>> >> >
>> >> >Exception in thread "main" java.lang.IncompatibleClassChangeError:
>> Found
>> >> >class org.apache.hadoop.mapred.JobContext, but interface was expected
>> >> >at
>> >>
>> >>
>> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>> >> >java:68)
>> >> >at
>> >>
>> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>> >> >at
>> >>
>> >>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>> >> >.scala:1075)
>> >> >at
>> >>
>> >>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>> >> >ala:940)
>> >> >at
>> >>
>> >>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>> >> >ala:902)
>> >> >at
>> >>
>> >>
>> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>> >> >71)
>> >> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>> >> >
>> >> >In my class, JobContext is an interface of  type
>> >> >org.apache.hadoop.mapred.JobContext.
>> >> >
>> >> >Is there something obvious that I might be doing wrong (or messed up
>> in
>> >> >the translation from Scala to Java) or something I should look into?
>> I'm
>> >> >using Spark 1.2 with hadoop 2.4.
>> >> >
>> >> >
>> >> >Thanks.
>> >> >
>> >> >Darin.
>> >> >
>> >> >
>> >> >________________________________
>> >> >
>> >> >
>> >> >From: Aaron Davidson <ilike...@gmail.com>
>> >> >To: Andrew Ash <and...@andrewash.com>
>> >> >Cc: Josh Rosen <rosenvi...@gmail.com>; Mingyu Kim <m...@palantir.com
>> >;
>> >> >"u...@spark.apache.org" <u...@spark.apache.org>; Aaron Davidson
>> >> ><aa...@databricks.com>
>> >> >Sent: Saturday, February 21, 2015 7:01 PM
>> >> >Subject: Re: Which OutputCommitter to use for S3?
>> >> >
>> >> >
>> >> >
>> >> >Here is the class:
>> >> >
>> >>
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>> >>
>> >>
>> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>> >>
>> >>
>> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>> >>
>> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>> >> >
>> >> >You can use it by setting "mapred.output.committer.class" in the
>> Hadoop
>> >> >configuration (or "spark.hadoop.mapred.output.committer.class" in the
>> >> >Spark configuration). Note that this only works for the old Hadoop
>> APIs,
>> >> >I believe the new Hadoop APIs strongly tie committer to input format
>> (so
>> >> >FileInputFormat always uses FileOutputCommitter), which makes this fix
>> >> >more difficult to apply.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash <and...@andrewash.com>
>> >> wrote:
>> >> >
>> >> >Josh is that class something you guys would consider open sourcing, or
>> >> >would you rather the community step up and create an OutputCommitter
>> >> >implementation optimized for S3?
>> >> >>
>> >> >>
>> >> >>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen <rosenvi...@gmail.com>
>> >> wrote:
>> >> >>
>> >> >>We (Databricks) use our own DirectOutputCommitter implementation,
>> which
>> >> >>is a couple tens of lines of Scala code.  The class would almost
>> >> >>entirely be a no-op except we took some care to properly handle the
>> >> >>_SUCCESS file.
>> >> >>>
>> >> >>>
>> >> >>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim <m...@palantir.com>
>> wrote:
>> >> >>>
>> >> >>>I didn¹t get any response. It¹d be really appreciated if anyone
>> using a
>> >> >>>special OutputCommitter for S3 can comment on this!
>> >> >>>>
>> >> >>>>
>> >> >>>>Thanks,
>> >> >>>>Mingyu
>> >> >>>>
>> >> >>>>
>> >> >>>>From: Mingyu Kim <m...@palantir.com>
>> >> >>>>Date: Monday, February 16, 2015 at 1:15 AM
>> >> >>>>To: "u...@spark.apache.org" <u...@spark.apache.org>
>> >> >>>>Subject: Which OutputCommitter to use for S3?
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>>HI all,
>> >> >>>>
>> >> >>>>
>> >> >>>>The default OutputCommitter used by RDD, which is
>> FileOutputCommitter,
>> >> >>>>seems to require moving files at the commit step, which is not a
>> >> >>>>constant operation in S3, as discussed in
>> >> >>>>
>> >>
>> https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
>> >>
>> >>
>> >>>>che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent
>> >>
>> >>
>> >>>>ropy.be-253E&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=e
>> >>
>> >>
>> >>>>nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFszOvl_-
>> >> >>>>ZnxmkBPHo1K24TfGE&s=EQOZaHRANJupdjXCfHSXL2t5BZ9YgMt2pRc3pht4o7o&e=
>> .
>> >>
>> >> >>>>People seem to develop their own NullOutputCommitter
>> implementation or
>> >> >>>>use DirectFileOutputCommitter (as mentioned in SPARK-3595), but I
>> >> >>>>wanted to check if there is a de facto standard, publicly available
>> >> >>>>OutputCommitter to use for S3 in conjunction with Spark.
>> >> >>>>
>> >> >>>>
>> >> >>>>Thanks,
>> >> >>>>Mingyu
>> >> >>>
>> >> >>
>> >> >
>> >> >---------------------------------------------------------------------
>> >> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> >For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >> >
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >>
>> >
>>
>
>

Reply via email to