Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Well, I kind of got it... this works below: * val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum) rdd .map(item => { val item = i.copy() val record = i._1.datum()

Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Wow! That sure is buried in the documentation! But yeah, that's what I thought more or less. I tried copying as follows, but that didn't work. * val copyRDD = singleFileRDD.map(_.copy()) * When I iterate over the new copyRDD (foreach or map), I still have the

Re: Repeating Records w/ Spark + Avro?

2016-03-11 Thread Peyman Mohajerian
Here is the reason for the behavior: '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to

Repeating Records w/ Spark + Avro?

2016-03-11 Thread Chris Miller
I have a bit of a strange situation: * import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.io.{NullWritable, WritableUtils}