Cool I'll take a look and give it a try!
Thanks,
Ron
Sent from my iPad
On Jul 24, 2014, at 10:35 PM, Andrew Ash and...@andrewash.com wrote:
Hi Ron,
I think you're encountering the issue where cacheing data from Hadoop ends up
with many duplicate values instead of what you expect. Try adding a .clone()
to the datum() call.
The issue is that Hadoop returns the same object many times but with its
contents changed. This is an optimization to prevent allocating and GC'ing
an object for every row in Hadoop. This works fine in Hadoop MapReduce
because it's single-threaded and with no cacheing of the objects.
Spark though saves a reference to each object it gets back from Hadoop. So
by the end of the partition, Spark ends up with a bunch of references all to
the same object! I think it's just by chance that this ends up changing your
average to be rounded.
Can you try with cloning the records in the map call? Also look at the
contents and see if they're actually changed, or if the resulting RDD after a
cache is just the last record smeared across all the others.
Cheers,
Andrew
On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez zlgonza...@yahoo.com wrote:
Hi,
I'm doing the following:
def main(args: Array[String]) = {
val sparkConf = new
SparkConf().setAppName(AvroTest).setMaster(local[2])
val sc = new SparkContext(sparkConf)
val conf = new Configuration()
val job = new Job(conf)
val path = new Path(/tmp/a.avro);
val schema = AvroUtils.getSchema(conf, path);
AvroJob.setInputKeySchema(job, schema);
val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x = x._1.datum())
val sum = rdd.map(p =
p.get(SEPAL_WIDTH).asInstanceOf[Float]).reduce(_ + _)
val avg = sum/rdd.count()
println(sSum = $sum)
println(sAvg = $avg)
}
If I run this, it works as expected, when I add .cache() to
val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x = x._1.datum()).cache()
then the command rounds up the average.
Any idea why this works this way? Any tips on how to fix this?
Thanks,
Ron