Hi, I'm trying to use the BinaryClassificationMetrics class to compute the pr curve as below -
import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.{SparkConf, SparkContext} /** * Created by sneha.shukla on 17/06/16. */ object TestCode { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local") sparkConf.set("spark.default.parallelism","1") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[GenericRecord], classOf[LabelledData], classOf[Configuration])) val sc = new SparkContext(sparkConf) val jobConf = new JobConf(sc.hadoopConfiguration) val rdd = sc.hadoopFile( "sampleData", classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable],2) println("Original Partitions : "+rdd.partitions.size) val anotherRDD = rdd.map(row => row._1.datum).map(rowValue => rowValue.get("value").toString.split("\\|")) println("Another RDD partitions : "+anotherRDD.partitions.size) var res = scala.collection.mutable.ListBuffer[(Double, Double)]() val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator => { while (iterator.hasNext) { val array = iterator.next val iter = array.iterator val prediction = iter.next.toDouble val label = iter.next.toDouble val t = (prediction, label) res += t } res.iterator }).map(doubles => (doubles._1, doubles._2)) println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size) //Sample data in yetAnotherRDD // (0.0025952152930881676,0.0) // (8.08581095750238E-5,0.0) // (0.1420529729314534,0.0) // (1.287933787473423,0.0) // (0.007534799826226573,0.0) // (0.008488829931163747,0.0) // (1.441921051791096,0.0) // (0.0036552783890398343,0.0) // (2.3833004789198267,0.0) // (0.3695065893117973,0.0) //Metrics Calculation. Explicitly setting numBins to 10 val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10) val pr = metrics.pr().collect() val thr = metrics.thresholds().collect() val joined = metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect() println(joined.size) println(thr.size) println(pr.size) } } In the local mode, my local machine as 2 cores, and hence I set the minPartitions in the original RDD to 2 (based on suggestions here : http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010 ) However, upon experimenting a bit, it turns out that the numBins property in BinaryClassificationMetrics class is not honoured in case the "spark.default.parallelism" property is not set to 1. AFAIU, the numBins should downsample the input RDD, as documented here : https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html When "spark.default.parallelism" is set to 1, the size of the thesholds and pr curve is near about the numBins, as documented here In case I make it 100, the size of the thresholds in the BinaryClassification class becomes ~100 and so on. Am I missing something here? In case the dataset on which pr is being computed is huge, wouldn't setting parallelism to 1 impact performance? I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1 in cluster mode has a similar results. Any pointers/help would be appreciated! Thanks!