Cool I'll take a look and give it a try! Thanks, Ron
Sent from my iPad > On Jul 24, 2014, at 10:35 PM, Andrew Ash <and...@andrewash.com> wrote: > > Hi Ron, > > I think you're encountering the issue where cacheing data from Hadoop ends up > with many duplicate values instead of what you expect. Try adding a .clone() > to the datum() call. > > The issue is that Hadoop returns the same object many times but with its > contents changed. This is an optimization to prevent allocating and GC'ing > an object for every row in Hadoop. This works fine in Hadoop MapReduce > because it's single-threaded and with no cacheing of the objects. > > Spark though saves a reference to each object it gets back from Hadoop. So > by the end of the partition, Spark ends up with a bunch of references all to > the same object! I think it's just by chance that this ends up changing your > average to be rounded. > > Can you try with cloning the records in the map call? Also look at the > contents and see if they're actually changed, or if the resulting RDD after a > cache is just the last record "smeared" across all the others. > > Cheers, > Andrew > > >> On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez <zlgonza...@yahoo.com> wrote: >> Hi, >> I'm doing the following: >> >> def main(args: Array[String]) = { >> val sparkConf = new >> SparkConf().setAppName("AvroTest").setMaster("local[2]") >> val sc = new SparkContext(sparkConf) >> val conf = new Configuration() >> val job = new Job(conf) >> val path = new Path("/tmp/a.avro"); >> val schema = AvroUtils.getSchema(conf, path); >> >> AvroJob.setInputKeySchema(job, schema); >> >> val rdd = sc.newAPIHadoopFile( >> path.toString(), >> classOf[AvroKeyInputFormat[GenericRecord]], >> classOf[AvroKey[GenericRecord]], >> classOf[NullWritable], conf).map(x => x._1.datum()) >> val sum = rdd.map(p => >> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _) >> val avg = sum/rdd.count() >> println(s"Sum = $sum") >> println(s"Avg = $avg") >> } >> >> If I run this, it works as expected, when I add .cache() to >> >> val rdd = sc.newAPIHadoopFile( >> path.toString(), >> classOf[AvroKeyInputFormat[GenericRecord]], >> classOf[AvroKey[GenericRecord]], >> classOf[NullWritable], conf).map(x => x._1.datum()).cache() >> >> then the command rounds up the average. >> >> Any idea why this works this way? Any tips on how to fix this? >> >> Thanks, >> Ron >