Exclude certain data from Training Data - Mlib
I have my data in two colors and excluded_colors. colors contains all colors excluded_colors contains some colors that I wish to exclude from my trainingset. I am trying to split the data into a training and testing set and ensure that the colors in excluded_colors are not in my training set but exist in the testing set. In order to achieve the above, I did this var colors = spark.sql(""" select colors.* from colors LEFT JOIN excluded_colors ON excluded_colors.color_id = colors.color_id where excluded_colors.color_id IS NULL """)val trainer: (Int => Int) = (arg:Int) => 0val sqlTrainer = udf(trainer)val tester: (Int => Int) = (arg:Int) => 1val sqlTester = udf(tester) val rsplit = colors.randomSplit(Array(0.7, 0.3)) val train_colors = splits(0).select("color_id").withColumn("test",sqlTrainer(col("color_id")))val test_colors = splits(1).select("color_id").withColumn("test",sqlTester(col("color_id"))) However, I'm realizing that by doing the above the colors in excluded_colors are completely ignored. They are not even in my testing set. How can I split the data in 70/30 while also ensuring that the colors in excluded_colorsare not in training but are present in testing.
Re: scala.MatchError while doing BinaryClassificationMetrics
Thank, Nick. This worked for me. val evaluator = new BinaryClassificationEvaluator(). setLabelCol("label"). setRawPredictionCol("ModelProbability"). setMetricName("areaUnderROC") val auROC = evaluator.evaluate(testResults) On Mon, Nov 14, 2016 at 4:00 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Typically you pass in the result of a model transform to the evaluator. > > So: > val model = estimator.fit(data) > val auc = evaluator.evaluate(model.transform(testData) > > Check Scala API docs for some details: http://spark.apache. > org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation. > BinaryClassificationEvaluator > > On Mon, 14 Nov 2016 at 20:02 Bhaarat Sharma <bhaara...@gmail.com> wrote: > > Can you please suggest how I can use BinaryClassificationEvaluator? I > tried: > > scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator > import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator > > scala> val evaluator = new BinaryClassificationEvaluator() > evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = > binEval_0d57372b7579 > > Try 1: > > scala> evaluator.evaluate(testScoreAndLabel.rdd) > :105: error: type mismatch; > found : org.apache.spark.rdd.RDD[(Double, Double)] > required: org.apache.spark.sql.Dataset[_] >evaluator.evaluate(testScoreAndLabel.rdd) > > Try 2: > > scala> evaluator.evaluate(testScoreAndLabel) > java.lang.IllegalArgumentException: Field "rawPrediction" does not exist. > at org.apache.spark.sql.types.StructType$$anonfun$apply$1. > apply(StructType.scala:228) > > Try 3: > > scala> evaluator.evaluate(testScoreAndLabel.select(" > Label","ModelProbability")) > org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given > input columns: [_1, _2]; > at org.apache.spark.sql.catalyst.analysis.package$ > AnalysisErrorAt.failAnalysis(package.scala:42) > > > On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: > > DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the > doubles from the test score and label DF. > > But you may prefer to just use spark.ml evaluators, which work with > DataFrames. Try BinaryClassificationEvaluator. > > On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma <bhaara...@gmail.com> wrote: > > I am getting scala.MatchError in the code below. I'm not able to see why > this would be happening. I am using Spark 2.0.1 > > scala> testResults.columns > res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, > isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, > sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, > rawPrediction, ModelProbability, ModelPrediction) > > scala> testResults.select("Label","ModelProbability").take(1) > res542: Array[org.apache.spark.sql.Row] = > Array([0.0,[0.737304818744076,0.262695181255924]]) > > scala> val testScoreAndLabel = testResults. > | select("Label","ModelProbability"). > | map { case Row(l:Double, p:Vector) => (p(1), l) } > testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: > double, _2: double] > > scala> testScoreAndLabel > res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: > double] > > scala> testScoreAndLabel.columns > res540: Array[String] = Array(_1, _2) > > scala> val testMetrics = new > BinaryClassificationMetrics(testScoreAndLabel.rdd) > testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1 > > The code below gives the error > > val auROC = testMetrics.areaUnderROC() //this line gives the error > > Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] > (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > > >
Re: scala.MatchError while doing BinaryClassificationMetrics
Can you please suggest how I can use BinaryClassificationEvaluator? I tried: scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator scala> val evaluator = new BinaryClassificationEvaluator() evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_0d57372b7579 Try 1: scala> evaluator.evaluate(testScoreAndLabel.rdd) :105: error: type mismatch; found : org.apache.spark.rdd.RDD[(Double, Double)] required: org.apache.spark.sql.Dataset[_] evaluator.evaluate(testScoreAndLabel.rdd) Try 2: scala> evaluator.evaluate(testScoreAndLabel) java.lang.IllegalArgumentException: Field "rawPrediction" does not exist. at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228) Try 3: scala> evaluator.evaluate(testScoreAndLabel.select("Label","ModelProbability")) org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given input columns: [_1, _2]; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the > doubles from the test score and label DF. > > But you may prefer to just use spark.ml evaluators, which work with > DataFrames. Try BinaryClassificationEvaluator. > > On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma <bhaara...@gmail.com> wrote: > >> I am getting scala.MatchError in the code below. I'm not able to see why >> this would be happening. I am using Spark 2.0.1 >> >> scala> testResults.columns >> res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, >> isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, >> sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, >> rawPrediction, ModelProbability, ModelPrediction) >> >> scala> testResults.select("Label","ModelProbability").take(1) >> res542: Array[org.apache.spark.sql.Row] = >> Array([0.0,[0.737304818744076,0.262695181255924]]) >> >> scala> val testScoreAndLabel = testResults. >> | select("Label","ModelProbability"). >> | map { case Row(l:Double, p:Vector) => (p(1), l) } >> testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: >> double, _2: double] >> >> scala> testScoreAndLabel >> res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: >> double] >> >> scala> testScoreAndLabel.columns >> res540: Array[String] = Array(_1, _2) >> >> scala> val testMetrics = new >> BinaryClassificationMetrics(testScoreAndLabel.rdd) >> testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1 >> >> The code below gives the error >> >> val auROC = testMetrics.areaUnderROC() //this line gives the error >> >> Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] >> (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) >> >>
scala.MatchError while doing BinaryClassificationMetrics
I am getting scala.MatchError in the code below. I'm not able to see why this would be happening. I am using Spark 2.0.1 scala> testResults.columns res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, rawPrediction, ModelProbability, ModelPrediction) scala> testResults.select("Label","ModelProbability").take(1) res542: Array[org.apache.spark.sql.Row] = Array([0.0,[0.737304818744076,0.262695181255924]]) scala> val testScoreAndLabel = testResults. | select("Label","ModelProbability"). | map { case Row(l:Double, p:Vector) => (p(1), l) } testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: double] scala> testScoreAndLabel res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: double] scala> testScoreAndLabel.columns res540: Array[String] = Array(_1, _2) scala> val testMetrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd) testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1 The code below gives the error val auROC = testMetrics.areaUnderROC() //this line gives the error Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
Re: How to write contents of RDD to HDFS as separate file for each item in RDD (PySpark)
I am just trying to do this as a proof of concept. The actual content of the files will be quite bit. I'm having problem using foreach or something similar on an RDD. sc.binaryFiles("/root/sift_images_test/*.jpg") returns ("filename1", bytes) ("filname2",bytes) I'm wondering if there is a do processing one each of these (process in this case is just getting the bytes length but will be something else in real world) and then write the contents to separate HDFS files. If this doesn't make sense, would it make more sense to have all contents in a single HDFS file? On Sat, Jul 30, 2016 at 10:19 PM, ayan guha <guha.a...@gmail.com> wrote: > This sounds a bad idea, given hdfs does not work well with small files. > > On Sun, Jul 31, 2016 at 8:57 AM, Bhaarat Sharma <bhaara...@gmail.com> > wrote: > >> I am reading bunch of files in PySpark using binaryFiles. Then I want to >> get the number of bytes for each file and write this number to an HDFS file >> with the corresponding name. >> >> Example: >> >> if directory /myimages has one.jpg, two.jpg, and three.jpg then I want >> three files one-success.jpg, two-success.jpg, and three-success.jpg in HDFS >> with a number in each. The number will specify the length of bytes. >> >> Here is what I've done thus far: >> >> from pyspark import SparkContext >> import numpy as np >> >> sc = SparkContext("local", "test") >> >> def bytes_length(rawdata): >> length = len(np.asarray(bytearray(rawdata),dtype=np.uint8)) >> return length >> >> images = sc.binaryFiles("/root/sift_images_test/*.jpg") >> images.map(lambda(filename, contents): >> bytes_length(contents)).saveAsTextFile("hdfs://localhost:9000/tmp/somfile") >> >> >> However, doing this creates a single file in HDFS: >> >> $ hadoop fs -cat /tmp/somfile/part-0 >> >> 113212 >> 144926 >> 178923 >> >> Instead I want /tmp/somefile in HDFS to have three files: >> >> one-success.txt with value 113212 >> two-success.txt with value 144926 >> three-success.txt with value 178923 >> >> Is it possible to achieve what I'm after? I don't want to write files to >> local file system and them put them in HDFS. Instead, I want to use the >> saveAsTextFile method on the RDD directly. >> >> >> > > > -- > Best Regards, > Ayan Guha >
How to write contents of RDD to HDFS as separate file for each item in RDD (PySpark)
I am reading bunch of files in PySpark using binaryFiles. Then I want to get the number of bytes for each file and write this number to an HDFS file with the corresponding name. Example: if directory /myimages has one.jpg, two.jpg, and three.jpg then I want three files one-success.jpg, two-success.jpg, and three-success.jpg in HDFS with a number in each. The number will specify the length of bytes. Here is what I've done thus far: from pyspark import SparkContext import numpy as np sc = SparkContext("local", "test") def bytes_length(rawdata): length = len(np.asarray(bytearray(rawdata),dtype=np.uint8)) return length images = sc.binaryFiles("/root/sift_images_test/*.jpg") images.map(lambda(filename, contents): bytes_length(contents)).saveAsTextFile("hdfs://localhost:9000/tmp/somfile") However, doing this creates a single file in HDFS: $ hadoop fs -cat /tmp/somfile/part-0 113212 144926 178923 Instead I want /tmp/somefile in HDFS to have three files: one-success.txt with value 113212 two-success.txt with value 144926 three-success.txt with value 178923 Is it possible to achieve what I'm after? I don't want to write files to local file system and them put them in HDFS. Instead, I want to use the saveAsTextFile method on the RDD directly.
Re: PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles
I'm very new to Spark. Im running it on a single CentOS7 box. How would I add a test.py to spark submit? Point to any resources would be great. Thanks for your help. On Sat, Jul 30, 2016 at 1:28 AM, ayan guha <guha.a...@gmail.com> wrote: > I think you need to add test.py in spark submit so that it gets shipped to > all executors > > On Sat, Jul 30, 2016 at 3:24 PM, Bhaarat Sharma <bhaara...@gmail.com> > wrote: > >> I am using PySpark 1.6.1. In my python program I'm using ctypes and >> trying to load the liblept library via the liblept.so.4.0.2 file on my >> system. >> >> While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2") >> I get an error : 'builtin_function_or_method' object has no attribute >> '__code__' >> >> Here are my files >> >> test.py >> >> from ctypes import * >> >> class FooBar: >> def __init__(self, options=None, **kwargs): >> if options is not None: >> self.options = options >> >> def read_image_from_bytes(self, bytes): >> return "img" >> >> def text_from_image(self, img): >> self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2") >> return "test from foobar" >> >> >> spark.py >> >> from pyspark import SparkContext >> import test >> import numpy as np >> sc = SparkContext("local", "test") >> foo = test.FooBar() >> >> def file_bytes(rawdata): >> return np.asarray(bytearray(rawdata),dtype=np.uint8) >> >> def do_some_with_bytes(bytes): >> return foo.do_something_on_image(foo.read_image_from_bytes(bytes)) >> >> images = sc.binaryFiles("/myimages/*.jpg") >> image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata)) >> print images.values().map(image_to_text).take(1) #this gives an error >> >> >> What is the way to load this library? >> >> > > > -- > Best Regards, > Ayan Guha >
PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles
I am using PySpark 1.6.1. In my python program I'm using ctypes and trying to load the liblept library via the liblept.so.4.0.2 file on my system. While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2") I get an error : 'builtin_function_or_method' object has no attribute '__code__' Here are my files test.py from ctypes import * class FooBar: def __init__(self, options=None, **kwargs): if options is not None: self.options = options def read_image_from_bytes(self, bytes): return "img" def text_from_image(self, img): self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2") return "test from foobar" spark.py from pyspark import SparkContext import test import numpy as np sc = SparkContext("local", "test") foo = test.FooBar() def file_bytes(rawdata): return np.asarray(bytearray(rawdata),dtype=np.uint8) def do_some_with_bytes(bytes): return foo.do_something_on_image(foo.read_image_from_bytes(bytes)) images = sc.binaryFiles("/myimages/*.jpg") image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata)) print images.values().map(image_to_text).take(1) #this gives an error What is the way to load this library?