Hi,
I'm doing the following:
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = new Configuration()
val job = new Job(conf)
val path = new Path("/tmp/a.avro");
val schema = AvroUtils.getSchema(conf, path);
AvroJob.setInputKeySchema(job, schema);
val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x => x._1.datum())
val sum = rdd.map(p => p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ +
_)
val avg = sum/rdd.count()
println(s"Sum = $sum")
println(s"Avg = $avg")
}
If I run this, it works as expected, when I add .cache() to
val rdd = sc.newAPIHadoopFile(
path.toString(),
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable], conf).map(x => x._1.datum()).cache()
then the command rounds up the average.
Any idea why this works this way? Any tips on how to fix this?
Thanks,
Ron