Extending this discussion further : Anyone able to write out Lzo compressed Protobuf to hdfs (using Elephant Bird - or any other way)?
I have an RDD that I want written out as it is - but I'm unable to figure out a direct way of doing that. I can convert it to a "PairRDD" or Rdd of "Key" and "Value" instead of just "Value" by force injecting a long and then using the PairRDDFunctions.saveAsNewAPIHadoopFile function e.g the RDD I want to write out is myRDD : org.apache.spark.rdd.RDD[com.xyz.MyProto] = MappedRDD[14] at map at <console>:42 val conf = new Job().getConfiguration conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec") //wrap it around with ProtobufWritable val protoToWrite = myRDD.map(x => new ProtobufWritable[MyProto](x,new TypeRef[MyProto](x.getClass){}) //now add an extra long to make it a KeyValue pair to be able to use PairRDDFunctions protoToWrite.map(x => (1L,x)).saveAsNewAPIHadoopFile("/tmp/vipul/temp/proto",classOf[LongWritable],classOf[BinaryWritable[MyProto]],classOf[LzoProtobufBlockOutputFormat[MyProto]],conf); As you can see this is just a kluge to get things running. Is there a neater way to write out the original "myRDD" as block compressed lzo? Thanks, Vipul On Jan 29, 2014, at 9:40 AM, Issac Buenrostro <buenros...@ooyala.com> wrote: > Good! I'll keep your experience in mind in case we have problems in the > future :) > > > On Tue, Jan 28, 2014 at 5:55 PM, Vipul Pandey <vipan...@gmail.com> wrote: > I got this to run, maybe in a tad twisted way. Here is what I did to get to > read Lzo compressed Protobufs in spark (I'm on 0.8.0) : > > - I added hadoop's conf folder to spark classpath (in spark-env.sh) in all > the nodes and the shell as well - but that didn't help either. So I just > added the property in configuration manually : > val conf = new Job().getConfiguration > conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec") > val logRecord = sc.newAPIHadoopFile( > filepath,classOf[...],classOf[...],classOf[...], conf) > This seem to resolve the "No codec found" problem below > > - I use twitter's ElephantBird to read lzo compressed protobufs using > MultiInputFormat and read the data out as BinaryWritable. The only additional > thing I had to do was to set the classConf in MutiInputFormat class. > > import com.twitter.elephantbird.mapreduce.input.MultiInputFormat > import com.twitter.elephantbird.mapreduce.io.BinaryWritable > > MultiInputFormat.setClassConf(classOf[MyProtoClass],conf) > val record = sc.newAPIHadoopFile( > inputpath,classOf[MultiInputFormat[MyProtoClass]],classOf[LongWritable],classOf[BinaryWritable[MyProtoClass]], > conf) > > //this gets you the protobuf from BinaryWritable - thereafter you just follow > your class structure > val protobuf = record.map(_._2.get.getProtobuf) > > > Hope this helps whoever is working with lzo compressed protobufs > > ~Vipul > > > > > On Jan 22, 2014, at 2:09 PM, Vipul Pandey <vipan...@gmail.com> wrote: > >> Issac, >> >> I have all these entries in my core-site.xml and as I mentioned before my >> Pig jobs are running just fine. And the JAVA_LIBRARY_PATH already points to >> the lzo lib directory. >> Not sure what to change/add and where. >> >> Thanks, >> Vipul >> >> >> >> On Jan 22, 2014, at 1:37 PM, Issac Buenrostro <buenros...@ooyala.com> wrote: >> >>> You need a core-site.xml file in the classpath with these lines >>> >>> <?xml version="1.0"?> >>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> >>> >>> <configuration> >>> >>> <property> >>> <name>io.compression.codecs</name> >>> >>> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value> >>> </property> >>> <property> >>> <name>io.compression.codec.lzo.class</name> >>> <value>com.hadoop.compression.lzo.LzoCodec</value> >>> </property> >>> >>> </configuration> >>> >>> >>> I also added both the native libraries path and the path to lzoc library to >>> JAVA_LIBRARY_PATH, but I don't know if this is necessary. This is the >>> command I used in mac: >>> >>> export >>> JAVA_LIBRARY_PATH=/Users/*/hadoop-lzo/target/native/Mac_OS_X-x86_64-64/lib:/usr/local/Cellar/lzo/2.06/lib >>> >>> >>> On Wed, Jan 22, 2014 at 12:28 PM, Vipul Pandey <vipan...@gmail.com> wrote: >>> >>>> Have you tried looking at the HBase and Cassandra examples under the spark >>>> example project? These use custom InputFormats and may provide guidance as >>>> to how to go about using the relevant Protobuf inputformat. >>> >>> >>> Thanks for the pointer Nick, I will look at it once I get past the LZO >>> stage. >>> >>> >>> Issac, >>> >>> How did you get Spark to use the LZO native libraries. I have a fully >>> functional hadoop deployment with pig and scalding crunching the lzo files. >>> But even after adding the lzo library folder to SPARK_CLASSPATH I get the >>> following error : >>> >>> java.io.IOException: No codec for file >>> hdfs://abc.xxx.com:8020/path/to/lzo/file.lzo found, cannot run >>> at >>> com.twitter.elephantbird.mapreduce.input.LzoRecordReader.initialize(LzoRecordReader.java:80) >>> at >>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:86) >>> >>> >>> >>> Thanks >>> Vipul >>> >>> On Jan 21, 2014, at 9:32 AM, Issac Buenrostro <buenros...@ooyala.com> wrote: >>> >>>> Hi Vipul, >>>> >>>> I use something like this to read from LZO compressed text files, it may >>>> be helpful: >>>> >>>> import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat >>>> import org.apache.hadoop.io.{LongWritable, Text} >>>> import org.apache.hadoop.mapreduce.Job >>>> >>>> val sc = new SparkContext(sparkMaster, "lzoreader", sparkDir, >>>> List(config.getString("spark.jar"))) >>>> sc.newAPIHadoopFile(logFile,classOf[LzoTextInputFormat],classOf[LongWritable],classOf[Text], >>>> new Job().getConfiguration()).map(line => line._2) >>>> >>>> Additionally I had to compile LZO native libraries, so keep that in mind. >>>> >>>> >>>> On Tue, Jan 21, 2014 at 6:57 AM, Nick Pentreath <nick.pentre...@gmail.com> >>>> wrote: >>>> Hi Vipul >>>> >>>> Have you tried looking at the HBase and Cassandra examples under the spark >>>> example project? These use custom InputFormats and may provide guidance as >>>> to how to go about using the relevant Protobuf inputformat. >>>> >>>> >>>> >>>> >>>> On Mon, Jan 20, 2014 at 11:48 PM, Vipul Pandey <vipan...@gmail.com> wrote: >>>> Any suggestions, anyone? >>>> Core team / contributors / spark-developers - any thoughts? >>>> >>>> On Jan 17, 2014, at 4:45 PM, Vipul Pandey <vipan...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> Can someone please share (sample) code to read lzo compressed protobufs >>>>> from hdfs (using elephant bird)? I'm trying whatever I see in the forum >>>>> and on the web but it doesn't seem comprehensive to me. >>>>> >>>>> I'm using Spark0.8.0 . My pig scripts are able to read protobuf just fine >>>>> so the hadoop layer is setup alright. It will be really helpful if >>>>> someone can list out what needs to be done with/in spark. >>>>> >>>>> ~Vipul >>>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> -- >>>> Issac Buenrostro >>>> Software Engineer | >>>> buenros...@ooyala.com | (617) 997-3350 >>>> www.ooyala.com | blog | @ooyala >>> >>> >>> >>> >>> -- >>> -- >>> Issac Buenrostro >>> Software Engineer | >>> buenros...@ooyala.com | (617) 997-3350 >>> www.ooyala.com | blog | @ooyala >> > > > > > -- > -- > Issac Buenrostro > Software Engineer | > buenros...@ooyala.com | (617) 997-3350 > www.ooyala.com | blog | @ooyala