Extending this discussion further : 
Anyone able to write out Lzo compressed Protobuf to hdfs (using Elephant Bird - 
or any other way)? 

I have an RDD that I want written out as it is - but I'm unable to figure out a 
direct way of doing that. I can convert it to a "PairRDD" or Rdd of "Key" and 
"Value" instead of just "Value" by force injecting a long and then using the 
PairRDDFunctions.saveAsNewAPIHadoopFile function 


e.g the RDD I want to write out is 
   
    myRDD : org.apache.spark.rdd.RDD[com.xyz.MyProto] = MappedRDD[14] at map at 
<console>:42

    val conf = new Job().getConfiguration
    conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
        //wrap it around with ProtobufWritable 
    val protoToWrite = myRDD.map(x => new ProtobufWritable[MyProto](x,new 
TypeRef[MyProto](x.getClass){})    
        //now add an extra long to make it a KeyValue pair to be able to use 
PairRDDFunctions
     protoToWrite.map(x => 
(1L,x)).saveAsNewAPIHadoopFile("/tmp/vipul/temp/proto",classOf[LongWritable],classOf[BinaryWritable[MyProto]],classOf[LzoProtobufBlockOutputFormat[MyProto]],conf);

As you can see this is just a kluge to get things running. Is there a neater 
way to write out the original "myRDD" as block compressed lzo?

Thanks,
Vipul



On Jan 29, 2014, at 9:40 AM, Issac Buenrostro <buenros...@ooyala.com> wrote:

> Good! I'll keep your experience in mind in case we have problems in the 
> future :)
> 
> 
> On Tue, Jan 28, 2014 at 5:55 PM, Vipul Pandey <vipan...@gmail.com> wrote:
> I got this to run, maybe in a tad twisted way. Here is what I did to get to 
> read Lzo compressed Protobufs in spark (I'm on 0.8.0) : 
> 
> - I added hadoop's conf folder to spark classpath (in spark-env.sh) in all 
> the nodes and the shell as well - but that didn't help either. So I just 
> added the property in configuration manually : 
>     val conf = new Job().getConfiguration
>     conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
>     val logRecord = sc.newAPIHadoopFile(   
> filepath,classOf[...],classOf[...],classOf[...], conf)
> This seem to resolve the "No codec found" problem below
>  
> - I use twitter's ElephantBird to read lzo compressed protobufs using 
> MultiInputFormat and read the data out as BinaryWritable. The only additional 
> thing I had to do was to set the classConf in MutiInputFormat class. 
> 
> import com.twitter.elephantbird.mapreduce.input.MultiInputFormat
> import com.twitter.elephantbird.mapreduce.io.BinaryWritable
> 
>     MultiInputFormat.setClassConf(classOf[MyProtoClass],conf)
>     val record = sc.newAPIHadoopFile(   
> inputpath,classOf[MultiInputFormat[MyProtoClass]],classOf[LongWritable],classOf[BinaryWritable[MyProtoClass]],
>  conf)
> 
> //this gets you the protobuf from BinaryWritable - thereafter you just follow 
> your class structure
>     val protobuf = record.map(_._2.get.getProtobuf)  
> 
> 
> Hope this helps whoever is working with lzo compressed protobufs 
> 
> ~Vipul
> 
> 
> 
> 
> On Jan 22, 2014, at 2:09 PM, Vipul Pandey <vipan...@gmail.com> wrote:
> 
>> Issac,
>> 
>> I have all these entries in my core-site.xml and as I mentioned before my 
>> Pig jobs are running just fine. And the JAVA_LIBRARY_PATH already points to 
>> the lzo lib directory. 
>> Not sure what to change/add and where.
>> 
>> Thanks,
>> Vipul
>> 
>> 
>> 
>> On Jan 22, 2014, at 1:37 PM, Issac Buenrostro <buenros...@ooyala.com> wrote:
>> 
>>> You need a core-site.xml file in the classpath with these lines
>>> 
>>> <?xml version="1.0"?>
>>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>>> 
>>> <configuration>
>>> 
>>>   <property>
>>>     <name>io.compression.codecs</name>
>>>     
>>> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
>>>   </property>
>>>   <property>
>>>     <name>io.compression.codec.lzo.class</name>
>>>     <value>com.hadoop.compression.lzo.LzoCodec</value>
>>>   </property>
>>> 
>>> </configuration>
>>> 
>>> 
>>> I also added both the native libraries path and the path to lzoc library to 
>>> JAVA_LIBRARY_PATH, but I don't know if this is necessary. This is the 
>>> command I used in mac:
>>> 
>>> export 
>>> JAVA_LIBRARY_PATH=/Users/*/hadoop-lzo/target/native/Mac_OS_X-x86_64-64/lib:/usr/local/Cellar/lzo/2.06/lib
>>> 
>>> 
>>> On Wed, Jan 22, 2014 at 12:28 PM, Vipul Pandey <vipan...@gmail.com> wrote:
>>> 
>>>> Have you tried looking at the HBase and Cassandra examples under the spark 
>>>> example project? These use custom InputFormats and may provide guidance as 
>>>> to how to go about using the relevant Protobuf inputformat.
>>> 
>>> 
>>> Thanks for the pointer Nick, I will look at it once I get past the LZO 
>>> stage. 
>>> 
>>> 
>>> Issac,
>>> 
>>> How did you get Spark to use the LZO native libraries. I have a fully 
>>> functional hadoop deployment with pig and scalding crunching the lzo files. 
>>> But even after adding the lzo library folder to SPARK_CLASSPATH I get the 
>>> following error : 
>>> 
>>> java.io.IOException: No codec for file 
>>> hdfs://abc.xxx.com:8020/path/to/lzo/file.lzo found, cannot run
>>>     at 
>>> com.twitter.elephantbird.mapreduce.input.LzoRecordReader.initialize(LzoRecordReader.java:80)
>>>     at 
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:86)
>>> 
>>> 
>>> 
>>> Thanks
>>> Vipul
>>> 
>>> On Jan 21, 2014, at 9:32 AM, Issac Buenrostro <buenros...@ooyala.com> wrote:
>>> 
>>>> Hi Vipul,
>>>> 
>>>> I use something like this to read from LZO compressed text files, it may 
>>>> be helpful:
>>>> 
>>>> import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat
>>>> import org.apache.hadoop.io.{LongWritable, Text}
>>>> import org.apache.hadoop.mapreduce.Job
>>>> 
>>>> val sc = new SparkContext(sparkMaster, "lzoreader", sparkDir, 
>>>> List(config.getString("spark.jar")))
>>>> sc.newAPIHadoopFile(logFile,classOf[LzoTextInputFormat],classOf[LongWritable],classOf[Text],
>>>>  new Job().getConfiguration()).map(line => line._2)
>>>> 
>>>> Additionally I had to compile LZO native libraries, so keep that in mind.
>>>> 
>>>> 
>>>> On Tue, Jan 21, 2014 at 6:57 AM, Nick Pentreath <nick.pentre...@gmail.com> 
>>>> wrote:
>>>> Hi Vipul
>>>> 
>>>> Have you tried looking at the HBase and Cassandra examples under the spark 
>>>> example project? These use custom InputFormats and may provide guidance as 
>>>> to how to go about using the relevant Protobuf inputformat.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Jan 20, 2014 at 11:48 PM, Vipul Pandey <vipan...@gmail.com> wrote:
>>>> Any suggestions, anyone? 
>>>> Core team / contributors / spark-developers - any thoughts?
>>>> 
>>>> On Jan 17, 2014, at 4:45 PM, Vipul Pandey <vipan...@gmail.com> wrote:
>>>> 
>>>>> Hi All,
>>>>> 
>>>>> Can someone please share (sample) code to read lzo compressed protobufs 
>>>>> from hdfs (using elephant bird)? I'm trying whatever I see in the forum 
>>>>> and on the web but it doesn't seem comprehensive to me. 
>>>>> 
>>>>> I'm using Spark0.8.0 . My pig scripts are able to read protobuf just fine 
>>>>> so the hadoop layer is setup alright.  It will be really helpful if 
>>>>> someone can list out what needs to be done with/in spark. 
>>>>> 
>>>>> ~Vipul
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> --
>>>> Issac Buenrostro
>>>> Software Engineer | 
>>>> buenros...@ooyala.com | (617) 997-3350
>>>> www.ooyala.com | blog | @ooyala
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> --
>>> Issac Buenrostro
>>> Software Engineer | 
>>> buenros...@ooyala.com | (617) 997-3350
>>> www.ooyala.com | blog | @ooyala
>> 
> 
> 
> 
> 
> -- 
> --
> Issac Buenrostro
> Software Engineer | 
> buenros...@ooyala.com | (617) 997-3350
> www.ooyala.com | blog | @ooyala

Reply via email to