Hi Ron,

I think you're encountering the issue where cacheing data from Hadoop ends
up with many duplicate values instead of what you expect.  Try adding a
.clone() to the datum() call.

The issue is that Hadoop returns the same object many times but with its
contents changed.  This is an optimization to prevent allocating and GC'ing
an object for every row in Hadoop.  This works fine in Hadoop MapReduce
because it's single-threaded and with no cacheing of the objects.

Spark though saves a reference to each object it gets back from Hadoop.  So
by the end of the partition, Spark ends up with a bunch of references all
to the same object!  I think it's just by chance that this ends up changing
your average to be rounded.

Can you try with cloning the records in the map call?  Also look at the
contents and see if they're actually changed, or if the resulting RDD after
a cache is just the last record "smeared" across all the others.

Cheers,
Andrew


On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez <zlgonza...@yahoo.com> wrote:

> Hi,
>   I'm doing the following:
>
>   def main(args: Array[String]) = {
>     val sparkConf = new
> SparkConf().setAppName("AvroTest").setMaster("local[2]")
>     val sc = new SparkContext(sparkConf)
>     val conf = new Configuration()
>     val job = new Job(conf)
>     val path = new Path("/tmp/a.avro");
>     val schema = AvroUtils.getSchema(conf, path);
>
>     AvroJob.setInputKeySchema(job, schema);
>
>     val rdd = sc.newAPIHadoopFile(
>        path.toString(),
>        classOf[AvroKeyInputFormat[GenericRecord]],
>        classOf[AvroKey[GenericRecord]],
>        classOf[NullWritable], conf).map(x => x._1.datum())
>     val sum = rdd.map(p =>
> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
>     val avg = sum/rdd.count()
>     println(s"Sum = $sum")
>     println(s"Avg = $avg")
>   }
>
> If I run this, it works as expected, when I add .cache() to
>
> val rdd = sc.newAPIHadoopFile(
>        path.toString(),
>        classOf[AvroKeyInputFormat[GenericRecord]],
>        classOf[AvroKey[GenericRecord]],
>        classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>
> then the command rounds up the average.
>
> Any idea why this works this way? Any tips on how to fix this?
>
> Thanks,
> Ron
>
>

Reply via email to