Cool I'll take a look and give it a try!

Thanks,
Ron

Sent from my iPad

> On Jul 24, 2014, at 10:35 PM, Andrew Ash <and...@andrewash.com> wrote:
> 
> Hi Ron,
> 
> I think you're encountering the issue where cacheing data from Hadoop ends up 
> with many duplicate values instead of what you expect.  Try adding a .clone() 
> to the datum() call.
> 
> The issue is that Hadoop returns the same object many times but with its 
> contents changed.  This is an optimization to prevent allocating and GC'ing 
> an object for every row in Hadoop.  This works fine in Hadoop MapReduce 
> because it's single-threaded and with no cacheing of the objects.
> 
> Spark though saves a reference to each object it gets back from Hadoop.  So 
> by the end of the partition, Spark ends up with a bunch of references all to 
> the same object!  I think it's just by chance that this ends up changing your 
> average to be rounded.
> 
> Can you try with cloning the records in the map call?  Also look at the 
> contents and see if they're actually changed, or if the resulting RDD after a 
> cache is just the last record "smeared" across all the others.
> 
> Cheers,
> Andrew
> 
> 
>> On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez <zlgonza...@yahoo.com> wrote:
>> Hi,
>>   I'm doing the following:
>> 
>>   def main(args: Array[String]) = {
>>     val sparkConf = new 
>> SparkConf().setAppName("AvroTest").setMaster("local[2]")
>>     val sc = new SparkContext(sparkConf)
>>     val conf = new Configuration()
>>     val job = new Job(conf)
>>     val path = new Path("/tmp/a.avro");
>>     val schema = AvroUtils.getSchema(conf, path);
>> 
>>     AvroJob.setInputKeySchema(job, schema);
>>     
>>     val rdd = sc.newAPIHadoopFile(
>>        path.toString(),
>>        classOf[AvroKeyInputFormat[GenericRecord]],
>>        classOf[AvroKey[GenericRecord]],
>>        classOf[NullWritable], conf).map(x => x._1.datum())
>>     val sum = rdd.map(p => 
>> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
>>     val avg = sum/rdd.count()
>>     println(s"Sum = $sum")
>>     println(s"Avg = $avg")
>>   }
>> 
>> If I run this, it works as expected, when I add .cache() to 
>> 
>> val rdd = sc.newAPIHadoopFile(
>>        path.toString(),
>>        classOf[AvroKeyInputFormat[GenericRecord]],
>>        classOf[AvroKey[GenericRecord]],
>>        classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>> 
>> then the command rounds up the average.
>> 
>> Any idea why this works this way? Any tips on how to fix this?
>> 
>> Thanks,
>> Ron
> 

Reply via email to