Why do you say it's not honored -- what do you observe? looking at the code, it does not seem to depend on the RDD parallelism. Can you narrow this down to a shorter example?
On Wed, Jun 22, 2016 at 5:39 AM, Sneha Shukla <sneha29shu...@gmail.com> wrote: > Hi, > > I'm trying to use the BinaryClassificationMetrics class to compute the pr > curve as below - > > import org.apache.avro.generic.GenericRecord > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.mapred.JobConf > import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics > import org.apache.spark.{SparkConf, SparkContext} > > /** > * Created by sneha.shukla on 17/06/16. > */ > > object TestCode { > > def main(args: Array[String]): Unit = { > > val sparkConf = new > SparkConf().setAppName("HBaseRead").setMaster("local") > > sparkConf.set("spark.default.parallelism","1") > > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > sparkConf.registerKryoClasses(Array(classOf[GenericRecord], > classOf[LabelledData], classOf[Configuration])) > > val sc = new SparkContext(sparkConf) > > val jobConf = new JobConf(sc.hadoopConfiguration) > > val rdd = sc.hadoopFile( > "sampleData", > classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], > classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], > classOf[org.apache.hadoop.io.NullWritable],2) > > println("Original Partitions : "+rdd.partitions.size) > > val anotherRDD = rdd.map(row => row._1.datum).map(rowValue => > rowValue.get("value").toString.split("\\|")) > > println("Another RDD partitions : "+anotherRDD.partitions.size) > > var res = scala.collection.mutable.ListBuffer[(Double, Double)]() > > val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator > => { > while (iterator.hasNext) { > val array = iterator.next > val iter = array.iterator > val prediction = iter.next.toDouble > val label = iter.next.toDouble > val t = (prediction, label) > res += t > } > res.iterator > }).map(doubles => (doubles._1, doubles._2)) > > println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size) > > //Sample data in yetAnotherRDD > // (0.0025952152930881676,0.0) > // (8.08581095750238E-5,0.0) > // (0.1420529729314534,0.0) > // (1.287933787473423,0.0) > // (0.007534799826226573,0.0) > // (0.008488829931163747,0.0) > // (1.441921051791096,0.0) > // (0.0036552783890398343,0.0) > // (2.3833004789198267,0.0) > // (0.3695065893117973,0.0) > > //Metrics Calculation. Explicitly setting numBins to 10 > val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10) > > val pr = metrics.pr().collect() > > val thr = metrics.thresholds().collect() > > val joined = > metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect() > > println(joined.size) > > println(thr.size) > > println(pr.size) > } > > } > > In the local mode, my local machine as 2 cores, and hence I set the > minPartitions in the original RDD to 2 (based on suggestions here : > http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010) > > However, upon experimenting a bit, it turns out that the numBins property in > BinaryClassificationMetrics class is not honoured in case the > "spark.default.parallelism" property is not set to 1. > AFAIU, the numBins should downsample the input RDD, as documented here : > https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html > > When "spark.default.parallelism" is set to 1, the size of the thesholds and > pr curve is near about the numBins, as documented here > In case I make it 100, the size of the thresholds in the > BinaryClassification class becomes ~100 and so on. > > Am I missing something here? In case the dataset on which pr is being > computed is huge, wouldn't setting parallelism to 1 impact performance? > > I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1 > in cluster mode has a similar results. > > Any pointers/help would be appreciated! > > Thanks! > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org