Hi,

I'm trying to use the BinaryClassificationMetrics class to compute the pr
curve as below -

import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by sneha.shukla on 17/06/16.
  */

object TestCode {

  def main(args: Array[String]): Unit = {

    val sparkConf = new
SparkConf().setAppName("HBaseRead").setMaster("local")

    sparkConf.set("spark.default.parallelism","1")

    sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
    sparkConf.registerKryoClasses(Array(classOf[GenericRecord],
classOf[LabelledData], classOf[Configuration]))

    val sc = new SparkContext(sparkConf)

    val jobConf = new JobConf(sc.hadoopConfiguration)

    val rdd = sc.hadoopFile(
      "sampleData",
      classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
      classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
      classOf[org.apache.hadoop.io.NullWritable],2)

    println("Original Partitions : "+rdd.partitions.size)

    val anotherRDD = rdd.map(row => row._1.datum).map(rowValue =>
rowValue.get("value").toString.split("\\|"))

    println("Another RDD partitions : "+anotherRDD.partitions.size)

    var res = scala.collection.mutable.ListBuffer[(Double, Double)]()

    val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator
=> {
      while (iterator.hasNext) {
        val array = iterator.next
        val iter = array.iterator
        val prediction = iter.next.toDouble
        val label = iter.next.toDouble
        val t = (prediction, label)
        res += t
      }
      res.iterator
    }).map(doubles => (doubles._1, doubles._2))

    println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size)

    //Sample data in yetAnotherRDD
//    (0.0025952152930881676,0.0)
//    (8.08581095750238E-5,0.0)
//    (0.1420529729314534,0.0)
//    (1.287933787473423,0.0)
//    (0.007534799826226573,0.0)
//    (0.008488829931163747,0.0)
//    (1.441921051791096,0.0)
//    (0.0036552783890398343,0.0)
//    (2.3833004789198267,0.0)
//    (0.3695065893117973,0.0)

    //Metrics Calculation. Explicitly setting numBins to 10
    val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10)

    val pr = metrics.pr().collect()

    val thr = metrics.thresholds().collect()

    val joined =
metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect()

    println(joined.size)

    println(thr.size)

    println(pr.size)
  }

}

In the local mode, my local machine as 2 cores, and hence I set the
minPartitions in the original RDD to 2 (based on suggestions here :
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010
)

However, upon experimenting a bit, it turns out that the numBins property
in BinaryClassificationMetrics class is not honoured in case the
"spark.default.parallelism" property is not set to 1.
AFAIU, the numBins should downsample the input RDD, as documented here :
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html


 When "spark.default.parallelism" is set to 1, the size of the thesholds
and pr curve is near about the numBins, as documented here
In case I make it 100, the size of the thresholds in the
BinaryClassification class becomes ~100 and so on.

Am I missing something here? In case the dataset on which pr is being
computed is huge, wouldn't setting parallelism to 1 impact performance?

I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1
in cluster mode has a similar results.

Any pointers/help would be appreciated!

Thanks!

Reply via email to