Re: 'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1
Why do you say it's not honored -- what do you observe? looking at the code, it does not seem to depend on the RDD parallelism. Can you narrow this down to a shorter example? On Wed, Jun 22, 2016 at 5:39 AM, Sneha Shuklawrote: > Hi, > > I'm trying to use the BinaryClassificationMetrics class to compute the pr > curve as below - > > import org.apache.avro.generic.GenericRecord > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.mapred.JobConf > import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics > import org.apache.spark.{SparkConf, SparkContext} > > /** > * Created by sneha.shukla on 17/06/16. > */ > > object TestCode { > > def main(args: Array[String]): Unit = { > > val sparkConf = new > SparkConf().setAppName("HBaseRead").setMaster("local") > > sparkConf.set("spark.default.parallelism","1") > > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > sparkConf.registerKryoClasses(Array(classOf[GenericRecord], > classOf[LabelledData], classOf[Configuration])) > > val sc = new SparkContext(sparkConf) > > val jobConf = new JobConf(sc.hadoopConfiguration) > > val rdd = sc.hadoopFile( > "sampleData", > classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], > classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], > classOf[org.apache.hadoop.io.NullWritable],2) > > println("Original Partitions : "+rdd.partitions.size) > > val anotherRDD = rdd.map(row => row._1.datum).map(rowValue => > rowValue.get("value").toString.split("\\|")) > > println("Another RDD partitions : "+anotherRDD.partitions.size) > > var res = scala.collection.mutable.ListBuffer[(Double, Double)]() > > val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator > => { > while (iterator.hasNext) { > val array = iterator.next > val iter = array.iterator > val prediction = iter.next.toDouble > val label = iter.next.toDouble > val t = (prediction, label) > res += t > } > res.iterator > }).map(doubles => (doubles._1, doubles._2)) > > println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size) > > //Sample data in yetAnotherRDD > //(0.0025952152930881676,0.0) > //(8.08581095750238E-5,0.0) > //(0.1420529729314534,0.0) > //(1.287933787473423,0.0) > //(0.007534799826226573,0.0) > //(0.008488829931163747,0.0) > //(1.441921051791096,0.0) > //(0.0036552783890398343,0.0) > //(2.3833004789198267,0.0) > //(0.3695065893117973,0.0) > > //Metrics Calculation. Explicitly setting numBins to 10 > val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10) > > val pr = metrics.pr().collect() > > val thr = metrics.thresholds().collect() > > val joined = > metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect() > > println(joined.size) > > println(thr.size) > > println(pr.size) > } > > } > > In the local mode, my local machine as 2 cores, and hence I set the > minPartitions in the original RDD to 2 (based on suggestions here : > http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010) > > However, upon experimenting a bit, it turns out that the numBins property in > BinaryClassificationMetrics class is not honoured in case the > "spark.default.parallelism" property is not set to 1. > AFAIU, the numBins should downsample the input RDD, as documented here : > https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html > > When "spark.default.parallelism" is set to 1, the size of the thesholds and > pr curve is near about the numBins, as documented here > In case I make it 100, the size of the thresholds in the > BinaryClassification class becomes ~100 and so on. > > Am I missing something here? In case the dataset on which pr is being > computed is huge, wouldn't setting parallelism to 1 impact performance? > > I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1 > in cluster mode has a similar results. > > Any pointers/help would be appreciated! > > Thanks! > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: 'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1
Hi, Any pointers? I'm not sure if this thread is reaching the right audience? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/numBins-property-not-honoured-in-BinaryClassificationMetrics-class-when-spark-default-parallelism-is1-tp27204p27269.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1
Hi, I'm trying to use the BinaryClassificationMetrics class to compute the pr curve as below - import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.{SparkConf, SparkContext} object TestCode { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local") sparkConf.set("spark.default.parallelism","1") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[GenericRecord], classOf[LabelledData], classOf[Configuration])) val sc = new SparkContext(sparkConf) val jobConf = new JobConf(sc.hadoopConfiguration) val rdd = sc.hadoopFile( "sampleData", classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable],2) println("Original Partitions : "+rdd.partitions.size) val anotherRDD = rdd.map(row => row._1.datum).map(rowValue => rowValue.get("value").toString.split("\\|")) println("Another RDD partitions : "+anotherRDD.partitions.size) var res = scala.collection.mutable.ListBuffer[(Double, Double)]() val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator => { while (iterator.hasNext) { val array = iterator.next val iter = array.iterator val prediction = iter.next.toDouble val label = iter.next.toDouble val t = (prediction, label) res += t } res.iterator }).map(doubles => (doubles._1, doubles._2)) println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size) //Sample data in yetAnotherRDD //(0.0025952152930881676,0.0) //(8.08581095750238E-5,0.0) //(0.1420529729314534,0.0) //(1.287933787473423,0.0) //(0.007534799826226573,0.0) //(0.008488829931163747,0.0) //(1.441921051791096,0.0) //(0.0036552783890398343,0.0) //(2.3833004789198267,0.0) //(0.3695065893117973,0.0) //Metrics Calculation. Explicitly setting numBins to 10 val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10) val pr = metrics.pr().collect() val thr = metrics.thresholds().collect() val joined = metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect() println(joined.size) println(thr.size) println(pr.size) } } In the local mode, my local machine as 2 cores, and hence I set the minPartitions in the original RDD to 2 (based on suggestions here : http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010) However, upon experimenting a bit, it turns out that the numBins property in BinaryClassificationMetrics class is not honoured in case the "spark.default.parallelism" property is not set to 1. AFAIU, the numBins should downsample the input RDD, as documented here : https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html When "spark.default.parallelism" is set to 1, the size of the thesholds and pr curve is near about the numBins, as documented here In case I make it 100, the size of the thresholds in the BinaryClassification class becomes ~100 and so on. Am I missing something here? In case the dataset on which pr is being computed is huge, wouldn't setting parallelism to 1 impact performance? I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1 in cluster mode has a similar results. Any pointers/help would be appreciated! Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/numBins-property-not-honoured-in-BinaryClassificationMetrics-class-when-spark-default-parallelism-is1-tp27204.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: 'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1
Hi, I'm trying to use the BinaryClassificationMetrics class to compute the pr curve as below - import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.{SparkConf, SparkContext} /** * Created by sneha.shukla on 17/06/16. */ object TestCode { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local") sparkConf.set("spark.default.parallelism","1") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[GenericRecord], classOf[LabelledData], classOf[Configuration])) val sc = new SparkContext(sparkConf) val jobConf = new JobConf(sc.hadoopConfiguration) val rdd = sc.hadoopFile( "sampleData", classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable],2) println("Original Partitions : "+rdd.partitions.size) val anotherRDD = rdd.map(row => row._1.datum).map(rowValue => rowValue.get("value").toString.split("\\|")) println("Another RDD partitions : "+anotherRDD.partitions.size) var res = scala.collection.mutable.ListBuffer[(Double, Double)]() val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator => { while (iterator.hasNext) { val array = iterator.next val iter = array.iterator val prediction = iter.next.toDouble val label = iter.next.toDouble val t = (prediction, label) res += t } res.iterator }).map(doubles => (doubles._1, doubles._2)) println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size) //Sample data in yetAnotherRDD //(0.0025952152930881676,0.0) //(8.08581095750238E-5,0.0) //(0.1420529729314534,0.0) //(1.287933787473423,0.0) //(0.007534799826226573,0.0) //(0.008488829931163747,0.0) //(1.441921051791096,0.0) //(0.0036552783890398343,0.0) //(2.3833004789198267,0.0) //(0.3695065893117973,0.0) //Metrics Calculation. Explicitly setting numBins to 10 val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10) val pr = metrics.pr().collect() val thr = metrics.thresholds().collect() val joined = metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect() println(joined.size) println(thr.size) println(pr.size) } } In the local mode, my local machine as 2 cores, and hence I set the minPartitions in the original RDD to 2 (based on suggestions here : http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010 ) However, upon experimenting a bit, it turns out that the numBins property in BinaryClassificationMetrics class is not honoured in case the "spark.default.parallelism" property is not set to 1. AFAIU, the numBins should downsample the input RDD, as documented here : https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html When "spark.default.parallelism" is set to 1, the size of the thesholds and pr curve is near about the numBins, as documented here In case I make it 100, the size of the thresholds in the BinaryClassification class becomes ~100 and so on. Am I missing something here? In case the dataset on which pr is being computed is huge, wouldn't setting parallelism to 1 impact performance? I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1 in cluster mode has a similar results. Any pointers/help would be appreciated! Thanks!