Here is the reason for the behavior:
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each record, directly caching the returned RDD or directly
passing it to an aggregation or shuffle operation will create many
references to the same object. If you plan to directly cache, sort, or
aggregate Hadoop writable objects, you should first copy them using a map
 function.

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html

So it is Hadoop related.

On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com>
wrote:

> I have a bit of a strange situation:
>
> *****************
> import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
> import org.apache.avro.mapreduce.AvroKeyInputFormat
> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>
> val path = "/path/to/data.avro"
>
> val rdd = sc.newAPIHadoopFile(path,
> classOf[AvroKeyInputFormat[GenericRecord]],
> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
> rdd.take(10).foreach( x => println( x._1.datum() ))
> *****************
>
> In this situation, I get the right number of records returned, and if I
> look at the contents of rdd I see the individual records as tuple2's...
> however, if I println on each one as shown above, I get the same result
> every time.
>
> Apparently this has to do with something in Spark or Avro keeping a
> reference to the item its iterating over, so I need to clone the object
> before I use it. However, if I try to clone it (from the spark-shell
> console), I get:
>
> *****************
> rdd.take(10).foreach( x => {
>   val clonedDatum = x._1.datum().clone()
>   println(clonedDatum.datum())
> })
>
> <console>:37: error: method clone in class Object cannot be accessed in
> org.apache.avro.generic.GenericRecord
>  Access to protected method clone not permitted because
>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>  class $iwC where the access take place
>                 val clonedDatum = x._1.datum().clone()
> *****************
>
> So, how can I clone the datum?
>
> Seems I'm not the only one who ran into this problem:
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
> can't figure out how to fix it in my case without hacking away like the
> person in the linked PR did.
>
> Suggestions?
>
> --
> Chris Miller
>

Reply via email to