Re: cache changes precision

2014-07-25 Thread Ron Gonzalez
Cool I'll take a look and give it a try!

Thanks,
Ron

Sent from my iPad

 On Jul 24, 2014, at 10:35 PM, Andrew Ash and...@andrewash.com wrote:
 
 Hi Ron,
 
 I think you're encountering the issue where cacheing data from Hadoop ends up 
 with many duplicate values instead of what you expect.  Try adding a .clone() 
 to the datum() call.
 
 The issue is that Hadoop returns the same object many times but with its 
 contents changed.  This is an optimization to prevent allocating and GC'ing 
 an object for every row in Hadoop.  This works fine in Hadoop MapReduce 
 because it's single-threaded and with no cacheing of the objects.
 
 Spark though saves a reference to each object it gets back from Hadoop.  So 
 by the end of the partition, Spark ends up with a bunch of references all to 
 the same object!  I think it's just by chance that this ends up changing your 
 average to be rounded.
 
 Can you try with cloning the records in the map call?  Also look at the 
 contents and see if they're actually changed, or if the resulting RDD after a 
 cache is just the last record smeared across all the others.
 
 Cheers,
 Andrew
 
 
 On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez zlgonza...@yahoo.com wrote:
 Hi,
   I'm doing the following:
 
   def main(args: Array[String]) = {
 val sparkConf = new 
 SparkConf().setAppName(AvroTest).setMaster(local[2])
 val sc = new SparkContext(sparkConf)
 val conf = new Configuration()
 val job = new Job(conf)
 val path = new Path(/tmp/a.avro);
 val schema = AvroUtils.getSchema(conf, path);
 
 AvroJob.setInputKeySchema(job, schema);
 
 val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x = x._1.datum())
 val sum = rdd.map(p = 
 p.get(SEPAL_WIDTH).asInstanceOf[Float]).reduce(_ + _)
 val avg = sum/rdd.count()
 println(sSum = $sum)
 println(sAvg = $avg)
   }
 
 If I run this, it works as expected, when I add .cache() to 
 
 val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x = x._1.datum()).cache()
 
 then the command rounds up the average.
 
 Any idea why this works this way? Any tips on how to fix this?
 
 Thanks,
 Ron
 


cache changes precision

2014-07-24 Thread Ron Gonzalez
Hi,
  I'm doing the following:

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName(AvroTest).setMaster(local[2])
    val sc = new SparkContext(sparkConf)
    val conf = new Configuration()
    val job = new Job(conf)
    val path = new Path(/tmp/a.avro);
    val schema = AvroUtils.getSchema(conf, path);

    AvroJob.setInputKeySchema(job, schema);
    
    val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x = x._1.datum())
    val sum = rdd.map(p = p.get(SEPAL_WIDTH).asInstanceOf[Float]).reduce(_ + 
_)
    val avg = sum/rdd.count()
    println(sSum = $sum)
    println(sAvg = $avg)
  }

If I run this, it works as expected, when I add .cache() to 

val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x = x._1.datum()).cache()

then the command rounds up the average.

Any idea why this works this way? Any tips on how to fix this?

Thanks,
Ron