Hi,
  I'm doing the following:

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = new Configuration()
    val job = new Job(conf)
    val path = new Path("/tmp/a.avro");
    val schema = AvroUtils.getSchema(conf, path);

    AvroJob.setInputKeySchema(job, schema);
    
    val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum())
    val sum = rdd.map(p => p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + 
_)
    val avg = sum/rdd.count()
    println(s"Sum = $sum")
    println(s"Avg = $avg")
  }

If I run this, it works as expected, when I add .cache() to 

val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum()).cache()

then the command rounds up the average.

Any idea why this works this way? Any tips on how to fix this?

Thanks,
Ron

Reply via email to