Why do you say it's not honored -- what do you observe? looking at the
code, it does not seem to depend on the RDD parallelism. Can you
narrow this down to a shorter example?

On Wed, Jun 22, 2016 at 5:39 AM, Sneha Shukla <sneha29shu...@gmail.com> wrote:
> Hi,
>
> I'm trying to use the BinaryClassificationMetrics class to compute the pr
> curve as below -
>
> import org.apache.avro.generic.GenericRecord
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.mapred.JobConf
> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
> import org.apache.spark.{SparkConf, SparkContext}
>
> /**
>   * Created by sneha.shukla on 17/06/16.
>   */
>
> object TestCode {
>
>   def main(args: Array[String]): Unit = {
>
>     val sparkConf = new
> SparkConf().setAppName("HBaseRead").setMaster("local")
>
>     sparkConf.set("spark.default.parallelism","1")
>
>     sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>     sparkConf.registerKryoClasses(Array(classOf[GenericRecord],
> classOf[LabelledData], classOf[Configuration]))
>
>     val sc = new SparkContext(sparkConf)
>
>     val jobConf = new JobConf(sc.hadoopConfiguration)
>
>     val rdd = sc.hadoopFile(
>       "sampleData",
>       classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
>       classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
>       classOf[org.apache.hadoop.io.NullWritable],2)
>
>     println("Original Partitions : "+rdd.partitions.size)
>
>     val anotherRDD = rdd.map(row => row._1.datum).map(rowValue =>
> rowValue.get("value").toString.split("\\|"))
>
>     println("Another RDD partitions : "+anotherRDD.partitions.size)
>
>     var res = scala.collection.mutable.ListBuffer[(Double, Double)]()
>
>     val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator
> => {
>       while (iterator.hasNext) {
>         val array = iterator.next
>         val iter = array.iterator
>         val prediction = iter.next.toDouble
>         val label = iter.next.toDouble
>         val t = (prediction, label)
>         res += t
>       }
>       res.iterator
>     }).map(doubles => (doubles._1, doubles._2))
>
>     println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size)
>
>     //Sample data in yetAnotherRDD
> //    (0.0025952152930881676,0.0)
> //    (8.08581095750238E-5,0.0)
> //    (0.1420529729314534,0.0)
> //    (1.287933787473423,0.0)
> //    (0.007534799826226573,0.0)
> //    (0.008488829931163747,0.0)
> //    (1.441921051791096,0.0)
> //    (0.0036552783890398343,0.0)
> //    (2.3833004789198267,0.0)
> //    (0.3695065893117973,0.0)
>
>     //Metrics Calculation. Explicitly setting numBins to 10
>     val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10)
>
>     val pr = metrics.pr().collect()
>
>     val thr = metrics.thresholds().collect()
>
>     val joined =
> metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect()
>
>     println(joined.size)
>
>     println(thr.size)
>
>     println(pr.size)
>   }
>
> }
>
> In the local mode, my local machine as 2 cores, and hence I set the
> minPartitions in the original RDD to 2 (based on suggestions here :
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010)
>
> However, upon experimenting a bit, it turns out that the numBins property in
> BinaryClassificationMetrics class is not honoured in case the
> "spark.default.parallelism" property is not set to 1.
> AFAIU, the numBins should downsample the input RDD, as documented here :
> https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
>
>  When "spark.default.parallelism" is set to 1, the size of the thesholds and
> pr curve is near about the numBins, as documented here
> In case I make it 100, the size of the thresholds in the
> BinaryClassification class becomes ~100 and so on.
>
> Am I missing something here? In case the dataset on which pr is being
> computed is huge, wouldn't setting parallelism to 1 impact performance?
>
> I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1
> in cluster mode has a similar results.
>
> Any pointers/help would be appreciated!
>
> Thanks!
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to