Re: Using DIMSUM with ids
Right now dimsum is meant to be used for tall and skinny matrices, and so columnSimilarities() returns similar columns, not rows. We are working on adding an efficient row similarity as well, tracked by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 Reza On Mon, Apr 6, 2015 at 6:08 AM, James alcaid1...@gmail.com wrote: The example below illustrates how to use the DIMSUM algorithm to calculate the similarity between each two rows and output row pairs with cosine simiarity that is not less than a threshold. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala But what if I hope to hold an Id of each row, which means the input file is: id1 vector1 id2 vector2 id3 vector3 ... And we hope to output id1 id2 sim(id1, id2) id1 id3 sim(id1, id3) ... Alcaid
Re: Spark Avarage
Thanks for your replies I solved the problem with this code val weathersRDD = sc.textFile(csvfilePath).map { line = val Array(dayOfdate, minDeg, maxDeg, meanDeg) = line.replaceAll(\,).trim.split(,) Tuple2(dayOfdate.substring(0,7), (minDeg.toInt, maxDeg.toInt, meanDeg.toInt)) }.mapValues(x = (x, 1)).reduceByKey((x, y) = ((x._1._1 + y._1._1, x._1._2 + y._1._2,x._1._3 + y._1._3),x._2 + y._2)) .mapValues{ case ((sumMin,sumMax,sumMean), count) = ((1.0 * sumMin)/count , (1.0 * sumMax)/count, (1.0 * sumMean)/count) }.collectAsMap() but I will also try Dataframe API thanks again 2015-04-06 13:31 GMT-04:00 Cheng, Hao hao.ch...@intel.com: The Dataframe API should be perfectly helpful in this case. https://spark.apache.org/docs/1.3.0/sql-programming-guide.html Some code snippet will like: val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ weathersRDD.toDF.registerTempTable(weathers) val results = sqlContext.sql(SELECT avg(minDeg), avg(maxDeg), avg(meanDeg) FROM weathers GROUP BY dayToMonth(dayOfDate))) results.collect.foreach(println) -Original Message- From: barisak [mailto:baris.akg...@gmail.com] Sent: Monday, April 6, 2015 10:50 PM To: user@spark.apache.org Subject: Spark Avarage Hi I have a class in above desc. case class weatherCond(dayOfdate: String, minDeg: Int, maxDeg: Int, meanDeg: Int) I am reading the data from csv file and I put this data into weatherCond class with this code val weathersRDD = sc.textFile(weather.csv).map { line = val Array(dayOfdate, minDeg, maxDeg, meanDeg) = line.replaceAll(\,).trim.split(,) weatherCond(dayOfdate, minDeg.toInt, maxDeg.toInt, meanDeg.toInt) } the question is ; how can I average the minDeg, maxDeg and meanDeg values for each month ; The data set example day, min, max , mean 2014-03-17,-3,5,5 2014-03-18,6,7,7 2014-03-19,6,14,10 result has to be (2014-03, 3, 8.6 ,7.3) -- (Average for 2014 - 03 ) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Avarage-tp22391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark unit test fails
Trying to bump up the rank of the question. Any example on Github can someone point to? ..Manas On Fri, Apr 3, 2015 at 9:39 AM, manasdebashiskar manasdebashis...@gmail.com wrote: Hi experts, I am trying to write unit tests for my spark application which fails with javax.servlet.FilterRegistration error. I am using CDH5.3.2 Spark and below is my dependencies list. val spark = 1.2.0-cdh5.3.2 val esriGeometryAPI = 1.2 val csvWriter = 1.0.0 val hadoopClient= 2.3.0 val scalaTest = 2.2.1 val jodaTime= 1.6.0 val scalajHTTP = 1.0.1 val avro= 1.7.7 val scopt = 3.2.0 val config = 1.2.1 val jobserver = 0.4.1 val excludeJBossNetty = ExclusionRule(organization = org.jboss.netty) val excludeIONetty = ExclusionRule(organization = io.netty) val excludeEclipseJetty = ExclusionRule(organization = org.eclipse.jetty) val excludeMortbayJetty = ExclusionRule(organization = org.mortbay.jetty) val excludeAsm = ExclusionRule(organization = org.ow2.asm) val excludeOldAsm = ExclusionRule(organization = asm) val excludeCommonsLogging = ExclusionRule(organization = commons-logging) val excludeSLF4J = ExclusionRule(organization = org.slf4j) val excludeScalap = ExclusionRule(organization = org.scala-lang, artifact = scalap) val excludeHadoop = ExclusionRule(organization = org.apache.hadoop) val excludeCurator = ExclusionRule(organization = org.apache.curator) val excludePowermock = ExclusionRule(organization = org.powermock) val excludeFastutil = ExclusionRule(organization = it.unimi.dsi) val excludeJruby = ExclusionRule(organization = org.jruby) val excludeThrift = ExclusionRule(organization = org.apache.thrift) val excludeServletApi = ExclusionRule(organization = javax.servlet, artifact = servlet-api) val excludeJUnit = ExclusionRule(organization = junit) I found the link ( http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-SecurityException-when-running-tests-with-Spark-1-0-0-td6747.html#a6749 ) talking about the issue and the work around of the same. But that work around does not get rid of the problem for me. I am using an SBT build which can't be changed to maven. What am I missing? Stack trace - [info] FiltersRDDSpec: [info] - Spark Filter *** FAILED *** [info] java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package [info] at java.lang.ClassLoader.checkCerts(Unknown Source) [info] at java.lang.ClassLoader.preDefineClass(Unknown Source) [info] at java.lang.ClassLoader.defineClass(Unknown Source) [info] at java.security.SecureClassLoader.defineClass(Unknown Source) [info] at java.net.URLClassLoader.defineClass(Unknown Source) [info] at java.net.URLClassLoader.access$100(Unknown Source) [info] at java.net.URLClassLoader$1.run(Unknown Source) [info] at java.net.URLClassLoader$1.run(Unknown Source) [info] at java.security.AccessController.doPrivileged(Native Method) [info] at java.net.URLClassLoader.findClass(Unknown Source) Thanks Manas Manas Kar -- View this message in context: Spark unit test fails http://apache-spark-user-list.1001560.n3.nabble.com/Spark-unit-test-fails-tp22368.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: From DataFrame to LabeledPoint
I'd make sure you're selecting the correct columns. If not that, then your input data might be corrupt. CCing user to keep it on the user list. On Mon, Apr 6, 2015 at 6:53 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi!, I had tried your solution, and I saw that the first row is null. This is important? Can I work with null rows? Some rows have some columns with null values. This is the first row of Dataframe: scala dataDF.take(1) res11: Array[org.apache.spark.sql.Row] = Array([null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null]) This is the RDD[LabeledPoint] created: scala data.take(1) 15/04/06 15:46:31 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 243, 10.101.5.194): java.lang.NullPointerException Thank's for all. Sergio J. 2015-04-03 20:14 GMT+02:00 Joseph Bradley jos...@databricks.com: I'd recommend going through each step, taking 1 RDD element (myDataFrame.take(1)), and examining it to see where this issue is happening. On Fri, Apr 3, 2015 at 9:44 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: This solution its really good. But I was working with feature.toString.toDouble because the feature is the type Any. Now, when I try to work with the LabeledPoint created I have a NullPointerException =/ El 02/04/2015 21:23, Joseph Bradley jos...@databricks.com escribió: Peter's suggestion sounds good, but watch out for the match case since I believe you'll have to match on: case (Row(feature1, feature2, ...), Row(label)) = On Thu, Apr 2, 2015 at 7:57 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi try next code: val labeledPoints: RDD[LabeledPoint] = features.zip(labels).map{ case Row(feture1, feture2,..., label) = LabeledPoint(label, Vectors.dense(feature1, feature2, ...)) } Thanks, Peter Rudenko On 2015-04-02 17:17, drarse wrote: Hello!, I have a questions since days ago. I am working with DataFrame and with Spark SQL I imported a jsonFile: /val df = sqlContext.jsonFile(file.json)/ In this json I have the label and de features. I selected it: / val features = df.select (feature1,feature2,feature3,...); val labels = df.select (cassification)/ But, now, I don't know create a LabeledPoint for RandomForest. I tried some solutions without success. Can you help me? Thanks for all! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/From-DataFrame-to-LabeledPoint-tp22354.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Avarage
The Dataframe API should be perfectly helpful in this case. https://spark.apache.org/docs/1.3.0/sql-programming-guide.html Some code snippet will like: val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ weathersRDD.toDF.registerTempTable(weathers) val results = sqlContext.sql(SELECT avg(minDeg), avg(maxDeg), avg(meanDeg) FROM weathers GROUP BY dayToMonth(dayOfDate))) results.collect.foreach(println) -Original Message- From: barisak [mailto:baris.akg...@gmail.com] Sent: Monday, April 6, 2015 10:50 PM To: user@spark.apache.org Subject: Spark Avarage Hi I have a class in above desc. case class weatherCond(dayOfdate: String, minDeg: Int, maxDeg: Int, meanDeg: Int) I am reading the data from csv file and I put this data into weatherCond class with this code val weathersRDD = sc.textFile(weather.csv).map { line = val Array(dayOfdate, minDeg, maxDeg, meanDeg) = line.replaceAll(\,).trim.split(,) weatherCond(dayOfdate, minDeg.toInt, maxDeg.toInt, meanDeg.toInt) } the question is ; how can I average the minDeg, maxDeg and meanDeg values for each month ; The data set example day, min, max , mean 2014-03-17,-3,5,5 2014-03-18,6,7,7 2014-03-19,6,14,10 result has to be (2014-03, 3, 8.6 ,7.3) -- (Average for 2014 - 03 ) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Avarage-tp22391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.ml.recommendation.ALS
Please attach the full stack trace. -Xiangrui On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = ALS.train(ratings, rank, numIterations, 0.01) Now, for the new ALS fit method, I am trying to use the below code to run, but getting a compilation error: val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = als.fit(ratings.toDF()) I get an error that the method toDF() is not a member of org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]]. Appreciate the help ! Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to restrict foreach on a streaming RDD only once upon receiver completion
I have created a Custom Receiver to fetch records pertaining to a specific query from Elastic Search and have implemented Streaming RDD transformations to process the data generated by the receiver. The final RDD is a sorted list of name value pairs and I want to read the top 20 results programmatically rather than write to an external file. I use foreach on the RDD and take the top 20 values into a list. I see that forEach is processed every time there is a new microbatch from the receiver. However, I want the foreach computation to be done only once when the receiver has finished fetching all the records from Elastic Search and before the streaming context is killed so that I can populate the results into a list and process it in my driver program. Appreciate any guidance in this regard. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis
Cc'ing Chris Fregly, who wrote the Kinesis integration. Maybe he can help. On Mon, Apr 6, 2015 at 9:23 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim ᐧ On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig,
Re: WordCount example
There are no workers registered with the Spark Standalone master! That is the crux of the problem. :) Follow the instructions properly - https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts Especially make the conf/slaves file has intended workers listed. TD On Mon, Apr 6, 2015 at 9:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Interesting, I see 0 cores in the UI? - *Cores:* 0 Total, 0 Used On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das t...@databricks.com wrote: What does the Spark Standalone UI at port 8080 say about number of cores? On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com wrote: [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
Spark SQL Parquet as External table - 1.3.x HiveMetastoreType now hidden
In 1.2.1 of I was persisting a set of parquet files as a table for use by spark-sql cli later on. There was a post here http://apache-spark-user-list.1001560.n3.nabble.com/persist-table-schema-in-spark-sql-tt16297.html#a16311 by Mchael Armbrust that provide a nice little helper method for dealing with this: /** * Sugar for creating a Hive external table from a parquet path. */def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } In migrating to 1.3.x I see that the spark.sql.hive.convertMetastoreParquet is no longer public, so the above no longer works. I can define a helper method that wraps the HiveMetastoreTypes something like: package org.apache.spark.sql.hive import org.apache.spark.sql.types.DataType /** * Helper to expose HiveMetastoreTypes hidden by Spark. It is created in this name space to make it accessible. */ object HiveTypeHelper { def toDataType(metastoreType: String): DataType = HiveMetastoreTypes.toDataType(metastoreType) def toMetastoreType(dataType: DataType): String = HiveMetastoreTypes.toMetastoreType(dataType) } While this will work, is there a better way to achieve this under 1.3.x? TIA for the assistance. -Todd
task not serialize
In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: DataFrame groupBy MapType
In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
Thanks. I’ll look into it. But the JSON string I push via receiver goes through a series of transformations, before it ends up in the final RDD. I need to take care to ensure that this magic value propagates all the way down to the last one that I’m iterating on. Currently, I’m calling “stop from the receiver once its done fetching all the records and have a StreamingListener to act on it via the “onReceiverStopped” hook through which I’m stopping the streamingContext and it seems to be working except that I see this message 2015-04-06 16:41:48,002 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,XYZ,)): Is this not advised? BTW I’m running in local mode. On Apr 7, 2015, at 1:43 AM, Michael Malak michaelma...@yahoo.com mailto:michaelma...@yahoo.com wrote: You could have your receiver send a magic value when it is done. I discuss this Spark Streaming pattern in my presentation Spark Gotchas and Anti-Patterns. In the PDF version, it's slides 34-36. http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language YouTube version cued to that place: http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s From: Hari Polisetty hpoli...@icloud.com mailto:hpoli...@icloud.com To: Tathagata Das t...@databricks.com mailto:t...@databricks.com Cc: user user@spark.apache.org mailto:user@spark.apache.org Sent: Monday, April 6, 2015 2:02 PM Subject: Re: How to restrict foreach on a streaming RDD only once upon receiver completion Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStreamString jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStreamString fieldVariations = jsonStrings.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = 465237345751948L; @Override public IterableString call(String jsonString) { ListString r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStreamString, Integer fieldVariationCounts = fieldVariations.mapToPair( new PairFunctionString, String, Integer() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger () { private static final long serialVersionUID = 7598681835161199865L; public OptionalInteger call(ListInteger nums, OptionalInteger current) { Integer sum = current.or((int) 0L); return (OptionalInteger) Optional.of(sum + nums.size());
Re: DataFrame groupBy MapType
I'll add that I don't think there is a convenient way to do this in the Column API ATM, but would welcome a JIRA for adding it :) On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com wrote: In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.regression.LabeledPoint
Hello Sparkers, I kept getting this error: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.regression.LabeledPoint I have tried the following to convert v._1 to double: Method 1: (if(v._10) 1d else 0d) Method 2: def bool2Double(b:Boolean): Double = { if (b) 1.0 else 0.0 } bool2Double(v._10) Method 3: implicit def bool2Double(b:Boolean): Double = { if (b) 1.0 else 0.0 } None of them works. Any advice would be appreciated. Thanks! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
You could have your receiver send a magic value when it is done. I discuss this Spark Streaming pattern in my presentation Spark Gotchas and Anti-Patterns. In the PDF version, it's slides 34-36.http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language YouTube version cued to that place: http://www.youtube.com/watch?v=W5Uece_JmNst=23m18s From: Hari Polisetty hpoli...@icloud.com To: Tathagata Das t...@databricks.com Cc: user user@spark.apache.org Sent: Monday, April 6, 2015 2:02 PM Subject: Re: How to restrict foreach on a streaming RDD only once upon receiver completion Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStreamString jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStreamString fieldVariations = jsonStrings.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = 465237345751948L; @Override public IterableString call(String jsonString) { ListString r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStreamString, Integer fieldVariationCounts = fieldVariations.mapToPair( new PairFunctionString, String, Integer() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger () { private static final long serialVersionUID = 7598681835161199865L; public OptionalInteger call(ListInteger nums, OptionalInteger current) { Integer sum = current.or((int) 0L); return (OptionalInteger) Optional.of(sum + nums.size()); } }).reduceByKey(new Function2Integer, Integer, Integer() { private static final long serialVersionUID = -5906059838295609562L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can sort on frequencies JavaPairDStreamInteger, String swappedPair = fieldVariationCounts.mapToPair(new PairFunctionTuple2String, Integer, Integer, String() { private static final long serialVersionUID = -5889774695187619957L; @Override public Tuple2Integer, String call(Tuple2String, Integer item) throws Exception { return item.swap(); } }); //Sort based on Key i.e, frequency JavaPairDStreamInteger, String sortedCounts = swappedPair.transformToPair( new FunctionJavaPairRDDInteger, String, JavaPairRDDInteger, String() { private static final long serialVersionUID = -4172702039963232779L; public JavaPairRDDInteger, String call(JavaPairRDDInteger, String in) throws Exception { //False to denote sort in descending order return in.sortByKey(false); } }); //Iterate through the RDD and get the top 20 values in the sorted pair and write to results list sortedCounts.foreach( new FunctionJavaPairRDDInteger, String, Void () { private static final long serialVersionUID = 2186144129973051920L; public Void call(JavaPairRDDInteger, String rdd) { resultList.clear(); for (Tuple2Integer, String t: rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) { resultList.add(new Tuple3String,Integer, Double(t._2(), t._1(), (double) (100*t._1())/totalProcessed.value())); } return null; } } ); On Apr 7, 2015, at 1:14 AM, Tathagata Das t...@databricks.com wrote: So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala) But stepping back, if you want to
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
What's the advantage of killing an application for lack of resources? I think the rationale behind killing an app based on executor failures is that, if we see a lot of them in a short span of time, it means there's probably something going wrong in the app or on the cluster. On Wed, Apr 1, 2015 at 7:08 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Thanks Sandy. Another way to look at this is that would we like to have our long running application to die? So let's say, we create a window of around 10 batches, and we are using incremental kind of operations inside our application, as restart here is a relatively more costlier, so should it be the maximum number of executor failure's kind of criteria to fail the application or should we have some parameters around minimum number of executor's availability for some x time? So, if the application is not able to have minimum n number of executors within x period of time, then we should fail the application. Adding time factor here, will allow some window for spark to get more executors allocated if some of them fails. Thoughts please. Thanks, Twinkle On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), exceeds this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: Spark SQL Parquet as External table - 1.3.x HiveMetastoreType now hidden
Hey Todd, In migrating to 1.3.x I see that the spark.sql.hive.convertMetastoreParquet is no longer public, so the above no longer works. This was probably just a typo, but to be clear, spark.sql.hive.convertMetastoreParquet is still a supported option and should work. You are correct that the HiveMetastoreTypes class is now private (we made a lot of stuff private starting with 1.3 (and the removal of alpha) since we are now promising binary compatibility for public APIs. Your hack seems reasonable, but I'll caution this is not a stable public API so could break with future upgrades. While this will work, is there a better way to achieve this under 1.3.x? If you are only looking for the ability to read this data with Spark SQL (and not Hive) I suggest you look at the Data Sources API syntax for creating tables. You don't need to specify the schema at all for self describing formats like parquet. CREATE TABLE tableName USING parquet OPTIONS ( path '/path/to/file' ) Michael On Mon, Apr 6, 2015 at 11:37 AM, Todd Nist tsind...@gmail.com wrote: In 1.2.1 of I was persisting a set of parquet files as a table for use by spark-sql cli later on. There was a post here http://apache-spark-user-list.1001560.n3.nabble.com/persist-table-schema-in-spark-sql-tt16297.html#a16311 by Mchael Armbrust that provide a nice little helper method for dealing with this: /** * Sugar for creating a Hive external table from a parquet path. */def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } In migrating to 1.3.x I see that the spark.sql.hive.convertMetastoreParquet is no longer public, so the above no longer works. I can define a helper method that wraps the HiveMetastoreTypes something like: package org.apache.spark.sql.hive import org.apache.spark.sql.types.DataType /** * Helper to expose HiveMetastoreTypes hidden by Spark. It is created in this name space to make it accessible. */ object HiveTypeHelper { def toDataType(metastoreType: String): DataType = HiveMetastoreTypes.toDataType(metastoreType) def toMetastoreType(dataType: DataType): String = HiveMetastoreTypes.toMetastoreType(dataType) } While this will work, is there a better way to achieve this under 1.3.x? TIA for the assistance. -Todd
Re: org.apache.spark.ml.recommendation.ALS
Here is the command that I have used : spark-submit —class packagename.ALSNew --num-executors 100 --master yarn ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path Btw - I could run the old ALS in mllib package. On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote: So ALSNew.scala is your own application, did you add it with spark-submit or spark-shell? The correct command should like spark-submit --class your.package.name.ALSNew ALSNew.jar [options] Please check the documentation: http://spark.apache.org/docs/latest/submitting-applications.html -Xiangrui On Mon, Apr 6, 2015 at 12:27 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi, Here is the stack trace: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at ALSNew$.main(ALSNew.scala:35) at ALSNew.main(ALSNew.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Jay On Apr 6, 2015, at 12:24 PM, Xiangrui Meng men...@gmail.com wrote: Please attach the full stack trace. -Xiangrui On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to
Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
I hit again same issue This time I tried to return the Object it failed with task not serialized below is the code here vendor record is serializable private static JavaRDDVendorRecord getVendorDataToProcess(JavaSparkContext sc) throws IOException { return sc .newAPIHadoopRDD(getVendorDataRowKeyScannerConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, Result.class) .map(new FunctionTuple2ImmutableBytesWritable, Result, VendorRecord() { @Override public VendorRecord call(Tuple2ImmutableBytesWritable, Result v1) throws Exception { String rowKey = new String(v1._1.get()); VendorRecord vd=vendorDataDAO.getVendorDataForRowkey(rowKey); return vd; } }); } On 1 April 2015 at 02:07, Ted Yu yuzhih...@gmail.com wrote: Jeetendra: Please extract the information you need from Result and return the extracted portion - instead of returning Result itself. Cheers On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu zhunanmcg...@gmail.com wrote: The example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala might help Best, -- Nan Zhu http://codingcat.me On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote: Yep, it's not serializable: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html You can't return this from a distributed operation since that would mean it has to travel over the network and you haven't supplied any way to convert the thing into bytes. On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com wrote: When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, // Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStreamString jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStreamString fieldVariations = jsonStrings.flatMap(new FlatMapFunctionString, String() { private static final long serialVersionUID = 465237345751948L; @Override public IterableString call(String jsonString) { ListString r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStreamString, Integer fieldVariationCounts = fieldVariations.mapToPair( new PairFunctionString, String, Integer() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger () { private static final long serialVersionUID = 7598681835161199865L; public OptionalInteger call(ListInteger nums, OptionalInteger current) { Integer sum = current.or((int) 0L); return (OptionalInteger) Optional.of(sum + nums.size()); } }).reduceByKey(new Function2Integer, Integer, Integer() { private static final long serialVersionUID = -5906059838295609562L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can sort on frequencies JavaPairDStreamInteger, String swappedPair = fieldVariationCounts.mapToPair(new PairFunctionTuple2String, Integer, Integer, String() { private static final long serialVersionUID = -5889774695187619957L; @Override public Tuple2Integer, String call(Tuple2String, Integer item) throws Exception { return item.swap(); } }); //Sort based on Key i.e, frequency JavaPairDStreamInteger, String sortedCounts = swappedPair.transformToPair( new FunctionJavaPairRDDInteger, String, JavaPairRDDInteger, String() { private static final long serialVersionUID = -4172702039963232779L; public JavaPairRDDInteger, String call(JavaPairRDDInteger, String in) throws Exception { //False to denote sort in descending order return in.sortByKey(false); }
Re: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.regression.LabeledPoint
Did you try to treat RDD[(Double, Vector)] as RDD[LabeledPoint]? If that is the case, you need to cast them explicitly: rdd.map { case (label, features) = LabeledPoint(label, features) } -Xiangrui On Mon, Apr 6, 2015 at 11:59 AM, Joanne Contact joannenetw...@gmail.com wrote: Hello Sparkers, I kept getting this error: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.regression.LabeledPoint I have tried the following to convert v._1 to double: Method 1: (if(v._10) 1d else 0d) Method 2: def bool2Double(b:Boolean): Double = { if (b) 1.0 else 0.0 } bool2Double(v._10) Method 3: implicit def bool2Double(b:Boolean): Double = { if (b) 1.0 else 0.0 } None of them works. Any advice would be appreciated. Thanks! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
org.apache.spark.ml.recommendation.ALS
Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = ALS.train(ratings, rank, numIterations, 0.01) Now, for the new ALS fit method, I am trying to use the below code to run, but getting a compilation error: val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = als.fit(ratings.toDF()) I get an error that the method toDF() is not a member of org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]]. Appreciate the help ! Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: DataFrame -- help with encoding factor variables
Before OneHotEncoder or LabelIndexer is merged, you can define an UDF to do the mapping. val labelToIndex = udf { ... } featureDF.withColumn(f3_dummy, labelToIndex(col(f3))) See instructions here http://spark.apache.org/docs/latest/sql-programming-guide.html#udf-registration-moved-to-sqlcontextudf-java--scala -Xiangrui On Mon, Apr 6, 2015 at 7:31 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, currently have a DF that has a factor variable -- say gender. I am hoping to use the RandomForest algorithm on this data an it appears that this needs to be converted to RDD[LabeledPoint] first -- i.e. all features need to be double-encoded. I see https://issues.apache.org/jira/browse/SPARK-5888 is still open but was wondering what is the recommended way to add a column? I can think of featuresDF.map { case Row(f1,f2,f3) =(f1,f2,if (f3=='male') 0 else 1,if (f3=='female') 0 else 1) }.toDF(f1,f2,f3_dummy,f3_dummy2) but that isn't ideal as I already have 80+ features in that dataframe so the matching itself is a pain -- thinking there's got to be a better way to append |levels| number of columns and select all columns but f3? I see a withColumn method but no constructor to create a column...should I be creating the dummy features in a new dataframe and then select them out of there to get a Column? Any pointers are appreciated -- I'm sure I'm not the first person to attempt this, just unsure of the least painful way to achieve. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.ml.recommendation.ALS
Hi, Here is the stack trace: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at ALSNew$.main(ALSNew.scala:35) at ALSNew.main(ALSNew.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Jay On Apr 6, 2015, at 12:24 PM, Xiangrui Meng men...@gmail.com wrote: Please attach the full stack trace. -Xiangrui On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings =
Re: How to work with sparse data in Python?
We support sparse vectors in MLlib, which recognizes MLlib's sparse vector and SciPy's csc_matrix with a single column. You can create RDD of sparse vectors for your data and save/load them to/from parquet format using dataframes. Sparse matrix supported will be added in 1.4. -Xiangrui On Mon, Apr 6, 2015 at 7:58 AM, SecondDatke lovejay-lovemu...@outlook.com wrote: I'm trying to apply Spark to a NLP problem that I'm working around. I have near 4 million tweets text and I have converted them into word vectors. It's pretty sparse because each message just has dozens of words but the vocabulary has tens of thousand words. These vectors should be loaded each time my program handles the data. I stack these vectors to a 50k(size of voca.)x4M(count of msg.) sparse matrix with scipy.sparse to persist it on my disk for two reasons: 1) It just costs 400MB of disk space 2) Loading and parsing it is really fast. (I convert it to csr_matrix and index each row for the messages) This works good on my local machine, with common Python and scipy/numpy. However, It seems Spark does not support scipy.sparse directly. Again, I used a csr_matrix, and I can extract a specific row and convert to a numpy array efficiently. But when I parallelize it Spark errored: sparse matrix length is ambiguous; use getnnz() or shape[0]. csr_matrix does not support len(), so Spark cannot partition it. Now I use this matrix as a broadcast variable (it's relatively small for the memory), and parallelize a xrange(0, matrix.shape[0]) list to index the matrix in map function. It's there a better solution? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.ml.recommendation.ALS
So ALSNew.scala is your own application, did you add it with spark-submit or spark-shell? The correct command should like spark-submit --class your.package.name.ALSNew ALSNew.jar [options] Please check the documentation: http://spark.apache.org/docs/latest/submitting-applications.html -Xiangrui On Mon, Apr 6, 2015 at 12:27 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi, Here is the stack trace: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at ALSNew$.main(ALSNew.scala:35) at ALSNew.main(ALSNew.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Jay On Apr 6, 2015, at 12:24 PM, Xiangrui Meng men...@gmail.com wrote: Please attach the full stack trace. -Xiangrui On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda:
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala ) But stepping back, if you want to get the final results at the end of the receiving all the data (as opposed to continuously), why are you even using streaming? You could create a custom RDD that reads from ElasticSearch and then use it in a Spark program. I think that's more natural as your application is more batch-like than streaming-like as you are using the results in real-time. TD On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty hpoli...@icloud.com wrote: I have created a Custom Receiver to fetch records pertaining to a specific query from Elastic Search and have implemented Streaming RDD transformations to process the data generated by the receiver. The final RDD is a sorted list of name value pairs and I want to read the top 20 results programmatically rather than write to an external file. I use foreach on the RDD and take the top 20 values into a list. I see that forEach is processed every time there is a new microbatch from the receiver. However, I want the foreach computation to be done only once when the receiver has finished fetching all the records from Elastic Search and before the streaming context is killed so that I can populate the results into a list and process it in my driver program. Appreciate any guidance in this regard. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: task not serialize
Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: task not serialize
On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Super slow caching in 1.3?
Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Druid integration
You could certainly build a connector, but it seems like you would want support for pushing down aggregations to get the benefits of Druid. There are only experimental interfaces for doing so today, but it sounds like a pretty cool project. On Mon, Apr 6, 2015 at 2:23 PM, Paolo Platter paolo.plat...@agilelab.it wrote: Hi, Do you think it is possible to build an integration beetween druid and spark, using Datasource API ? Is someone investigating this kind of solution ? I think that Spark SQL could fill the lack of a complete SQL Layer of Druid. It could be a great OLAP solution. WDYT ? Paolo Platter AgileLab CTO
Processing Large Images in Spark?
Hi all, I'm new to Spark and wondering if it's appropriate to use for some image processing tasks on pretty sizable (~1 GB) images. Here is an example use case. Amazon recently put the entire Landsat8 archive in S3: http://aws.amazon.com/public-data-sets/landsat/ I have a bunch of GDAL based (a C library for geospatial raster I/O) Python scripts that take a collection of Landsat images and mash them into a single mosaic. This works great for little mosaics, but if I wanted to do the entire world, I need more horse power! The scripts do the following: 1. Copy the selected rasters down from S3 to the local file system 2. Read each image into memory as numpy arrays (a big 3D array), do some image processing using various Python libs, and write the result out to the local file system 3. Blast all the processed imagery back to S3, and hooks up MapServer for viewing Step 2 takes a long time; this is what I'd like to leverage Spark for. Each image, if you stack all the bands together, can be ~1 GB in size. So here are a couple of questions: 1. If I have a large image/array, what's a good way of getting it into an RDD? I've seen some stuff about folks tiling up imagery into little chunks and storing it in HBase. I imagine I would want an image chunk in each partition of the RDD. If I wanted to apply something like a gaussian filter I'd need each chunk to to overlap a bit. 2. In a similar vain, does anyone have any thoughts on storing a really large raster in HDFS? Seems like if I just dump the image into HDFS as it, it'll get stored in blocks all across the system and when I go to read it, there will be a ton of network traffic from all the blocks to the reading node! 3. How is the numpy's ndarray support in Spark? For instance, if I do a map on my theoretical chunked image RDD, can I easily realize the image chunk as a numpy array inside the function? Most of the Python algorithms I use take in and return a numpy array. I saw some discussion in the past on image processing: These threads talk about processing lots of little images, but this isn't really my situation as I've got one very large image: http://apache-spark-user-list.1001560.n3.nabble.com/Better-way-to-process-large-image-data-set-td14533.html http://apache-spark-user-list.1001560.n3.nabble.com/Processing-audio-video-images-td6752.html Further, I'd like to have the imagery in HDFS rather than on the file system to avoid I/O bottlenecks if possible! Thanks for any ideas and advice! -Patrick
Re: Spark SQL code generation
Thanks for the info, Michael. Is there a reason to do so, as opposed to shipping out the bytecode and loading it via the classloader? Is it more complex? I can imagine caching to be effective for repeated queries, but when the subsequent queries are different. On Mon, Apr 6, 2015 at 2:41 PM, Michael Armbrust mich...@databricks.com wrote: It is generated and cached on each of the executors. On Mon, Apr 6, 2015 at 2:32 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
SparkSQL + Parquet performance
Hi all, is there anyone using SparkSQL + Parquet that has made a benchmark about storing parquet files on HDFS or on CFS ( Cassandra File System )? What storage can improve performance of SparkSQL+ Parquet ? Thanks Paolo
Spark SQL code generation
Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
Re: task not serialize
On 7 April 2015 at 04:03, Dean Wampler deanwamp...@gmail.com wrote: On Mon, Apr 6, 2015 at 6:20 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks a lot.That means Spark does not support the nested RDD? if I pass the javaSparkContext that also wont work. I mean passing SparkContext not possible since its not serializable That's right. RDD don't nest and SparkContexts aren't serializable. i have a requirement where I will get JavaRDDVendorRecord matchRdd and I need to return the postential matches for this record from Hbase. so for each field of VendorRecord I have to do following 1. query Hbase to get the list of potential record in RDD 2. run logistic regression on RDD return from steps 1 and each element of the passed matchRdd. If I understand you correctly, each VectorRecord could correspond to 0-to-N records in HBase, which you need to fetch, true? yes thats correct each Vendorrecord corresponds to 0 to N matches If so, you could use the RDD flatMap method, which takes a function a that accepts each record, then returns a sequence of 0-to-N new records of some other type, like your HBase records. However, running an HBase query for each VendorRecord could be expensive. If you can turn this into a range query or something like that, it would help. I haven't used HBase much, so I don't have good advice on optimizing this, if necessary. Alternatively, can you do some sort of join on the VendorRecord RDD and an RDD of query results from HBase? Join will give too big result RDD of query result is returning around 1 for each record and i have 2 millions to process so it will be huge to have this. 2 m*1 big number For #2, it sounds like you need flatMap to return records that combine the input VendorRecords and fields pulled from HBase. Whatever you can do to make this work like table scans and joins will probably be most efficient. dean On 7 April 2015 at 03:33, Dean Wampler deanwamp...@gmail.com wrote: The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: Spark SQL code generation
The compilation happens in parallel on all of the machines, so its not really clear that there is a win to generating it on the driver and shipping it from a latency perspective. However, really I just took the easiest path that didn't require more bytecode extracting / shipping machinery. On Mon, Apr 6, 2015 at 3:07 PM, Akshat Aranya aara...@gmail.com wrote: Thanks for the info, Michael. Is there a reason to do so, as opposed to shipping out the bytecode and loading it via the classloader? Is it more complex? I can imagine caching to be effective for repeated queries, but when the subsequent queries are different. On Mon, Apr 6, 2015 at 2:41 PM, Michael Armbrust mich...@databricks.com wrote: It is generated and cached on each of the executors. On Mon, Apr 6, 2015 at 2:32 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
Spark Druid integration
Hi, Do you think it is possible to build an integration beetween druid and spark, using Datasource API ? Is someone investigating this kind of solution ? I think that Spark SQL could fill the lack of a complete SQL Layer of Druid. It could be a great OLAP solution. WDYT ? Paolo Platter AgileLab CTO
Re: Spark SQL code generation
It is generated and cached on each of the executors. On Mon, Apr 6, 2015 at 2:32 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm curious as to how Spark does code generation for SQL queries. Following through the code, I saw that an expression is parsed and compiled into a class using Scala reflection toolbox. However, it's unclear to me whether the actual byte code is generated on the master or on each of the executors. If it generated on the master, how is the byte code shipped out to the executors? Thanks, Akshat https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
Re: task not serialize
The log instance won't be serializable, because it will have a file handle to write to. Try defining another static method outside matchAndMerge that encapsulates the call to log.error. CompanyMatcherHelper might not be serializable either, but you didn't provide it. If it holds a database connection, same problem. You can't suppress the warning because it's actually an error. The VoidFunction can't be serialized to send it over the cluster's network. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 6, 2015 at 4:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: In this code in foreach I am getting task not serialized exception @SuppressWarnings(serial) public static void matchAndMerge(JavaRDDVendorRecord matchRdd, final JavaSparkContext jsc) throws IOException{ log.info(Company matcher started); //final JavaSparkContext jsc = getSparkContext(); matchRdd.foreachAsync(new VoidFunctionVendorRecord(){ @Override public void call(VendorRecord t) throws Exception { if(t !=null){ try{ CompanyMatcherHelper.UpdateMatchedRecord(jsc,t); } catch (Exception e) { log.error(ERROR while running Matcher for company + t.getCompanyId(), e); } } } }); }
Re: Super slow caching in 1.3?
Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Tableau + Spark SQL Thrift Server + Cassandra
Sure, will do. I may not be able to get to it until next week, but will let you know if I am able to the crack the code. Mohammed From: Todd Nist [mailto:tsind...@gmail.com] Sent: Friday, April 3, 2015 5:52 PM To: Mohammed Guller Cc: pawan kumar; user@spark.apache.org Subject: Re: Tableau + Spark SQL Thrift Server + Cassandra Thanks Mohammed, I was aware of Calliope, but haven't used it since with since the spark-cassandra-connector project got released. I was not aware of the CalliopeServer2; cool thanks for sharing that one. I would appreciate it if you could lmk how you decide to proceed with this; I can see this coming up on my radar in the next few months; thanks. -Todd On Fri, Apr 3, 2015 at 5:53 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Thanks, Todd. It is an interesting idea; worth trying. I think the cash project is old. The tuplejump guy has created another project called CalliopeServer2, which works like a charm with BI tools that use JDBC, but unfortunately Tableau throws an error when it connects to it. Mohammed From: Todd Nist [mailto:tsind...@gmail.commailto:tsind...@gmail.com] Sent: Friday, April 3, 2015 11:39 AM To: pawan kumar Cc: Mohammed Guller; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Tableau + Spark SQL Thrift Server + Cassandra Hi Mohammed, Not sure if you have tried this or not. You could try using the below api to start the thriftserver with an existing context. https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L42 The one thing that Michael Ambrust @ databrick recommended was this: You can start a JDBC server with an existing context. See my answer here: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html So something like this based on example from Cheng Lian: Server import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.catalyst.types._ val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ makeRDD((1,hello) :: (2,world) ::Nil).toSchemaRDD.cache().registerTempTable(t) // replace the above with the C* + spark-casandra-connectore to generate SchemaRDD and registerTempTable import org.apache.spark.sql.hive.thriftserver._ HiveThriftServer2.startWithContext(sqlContext) Then Startup ./bin/beeline -u jdbc:hive2://localhost:1/default 0: jdbc:hive2://localhost:1/default select * from t; I have not tried this yet from Tableau. My understanding is that the tempTable is only valid as long as the sqlContext is, so if one terminates the code representing the Server, and then restarts the standard thrift server, sbin/start-thriftserver ..., the table won't be available. Another possibility is to perhaps use the tuplejump cash project, https://github.com/tuplejump/cash. HTH. -Todd On Fri, Apr 3, 2015 at 11:11 AM, pawan kumar pkv...@gmail.commailto:pkv...@gmail.com wrote: Thanks mohammed. Will give it a try today. We would also need the sparksSQL piece as we are migrating our data store from oracle to C* and it would be easier to maintain all the reports rather recreating each one from scratch. Thanks, Pawan Venugopal. On Apr 3, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi Todd, We are using Apache C* 2.1.3, not DSE. We got Tableau to work directly with C* using the ODBC driver, but now would like to add Spark SQL to the mix. I haven’t been able to find any documentation for how to make this combination work. We are using the Spark-Cassandra-Connector in our applications, but haven’t been able to figure out how to get the Spark SQL Thrift Server to use it and connect to C*. That is the missing piece. Once we solve that piece of the puzzle then Tableau should be able to see the tables in C*. Hi Pawan, Tableau + C* is pretty straight forward, especially if you are using DSE. Create a new DSN in Tableau using the ODBC driver that comes with DSE. Once you connect, Tableau allows to use C* keyspace as schema and column families as tables. Mohammed From: pawan kumar [mailto:pkv...@gmail.commailto:pkv...@gmail.com] Sent: Friday, April 3, 2015 7:41 AM To: Todd Nist Cc: user@spark.apache.orgmailto:user@spark.apache.org; Mohammed Guller Subject: Re: Tableau + Spark SQL Thrift Server + Cassandra Hi Todd, Thanks for the link. I would be interested in this solution. I am using DSE for cassandra. Would you provide me with info on connecting with DSE either through Tableau or zeppelin. The goal here is query cassandra through spark sql so that I could perform joins and groupby on my queries. Are you able to perform spark sql queries with tableau? Thanks, Pawan Venugopal On Apr 3, 2015 5:03 AM, Todd Nist tsind...@gmail.commailto:tsind...@gmail.com wrote: What version of Cassandra are
Processing Large Images in Spark?
Hi all, I'm new to Spark and wondering if it's appropriate to use for some image processing tasks on pretty sizable (~1 GB) images. Here is an example use case. Amazon recently put the entire Landsat8 archive in S3: http://aws.amazon.com/public-data-sets/landsat/ I have a bunch of GDAL based (a C library for geospatial raster I/O) Python scripts that take a collection of Landsat images and mash them into a single mosaic. This works great for little mosaics, but if I wanted to do the entire world, I need more horse power! The scripts do the following: 1. Copy the selected rasters down from S3 to the local file system 2. Read each image into memory as numpy arrays (a big 3D array), do some image processing using various Python libs, and write the result out to the local file system 3. Blast all the processed imagery back to S3, and hooks up MapServer for viewing Step 2 takes a long time; this is what I'd like to leverage Spark for. Each image, if you stack all the bands together, can be ~1 GB in size. So here are a couple of questions: 1. If I have a large image/array, what's a good way of getting it into an RDD? I've seen some stuff about folks tiling up imagery into little chunks and storing it in HBase. I imagine I would want an image chunk in each partition of the RDD. If I wanted to apply something like a gaussian filter I'd need each chunk to to overlap a bit. 2. In a similar vain, does anyone have any thoughts on storing a really large raster in HDFS? Seems like if I just dump the image into HDFS as it, it'll get stored in blocks all across the system and when I go to read it, there will be a ton of network traffic from all the blocks to the reading node! 3. How is the numpy's ndarray support in Spark? For instance, if I do a map on my theoretical chunked image RDD, can I easily realize the image chunk as a numpy array inside the function? Most of the Python algorithms I use take in and return a numpy array. I saw some discussion in the past on image processing: These threads talk about processing lots of little images, but this isn't really my situation as I've got one very large image: http://apache-spark-user-list.1001560.n3.nabble.com/Better-way-to-process-large-image-data-set-td14533.html http://apache-spark-user-list.1001560.n3.nabble.com/Processing-audio-video-images-td6752.html Further, I'd like to have the imagery in HDFS rather than on the file system to avoid I/O bottlenecks if possible! Thanks for any ideas and advice! -Patrick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-Large-Images-in-Spark-tp22397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Seeing message about receiver not being de-registered on invoking Streaming context stop
My application is running Spark in local mode and I have a Spark Streaming Listener as well as a Custom Receiver. When the receiver is done fetching all documents, it invokes “stop” on itself. I see the StreamingListener getting a callback on “onReceiverStopped” where I stop the streaming context. However, I see the following message in my logs: 2015-04-06 16:41:51,193 WARN [Thread-66] com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop - Stopped receiver 2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: AlHURLEY 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] org.apache.spark.Logging$class.logWarning - Stopped executor without error 2015-04-06 16:41:51,203 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY)) What am I missing or doing wrong?
Re: A problem with Spark 1.3 artifacts
My hunch is that this behavior was introduced by a patch to start shading Jetty in Spark 1.3: https://issues.apache.org/jira/browse/SPARK-3996. Note that Spark's *MetricsSystem* class is marked as *private[spark]* and thus isn't intended to be interacted with directly by users. It's not super likely that this API would break, but it's excluded from our MiMa checks and thus is liable to change in incompatible ways across releases. If you add these Jetty classes as a compile-only dependency but don't add them to the runtime classpath, do you get runtime errors? If the metrics system is usable at runtime and we only have errors when attempting to compile user code against non-public APIs, then I'm not sure that this is a high-priority issue to fix since. If the metrics system doesn't work at runtime, on the other hand, then that's definitely a bug that should be fixed. If you'd like to continue debugging this issue, I think we should move this discussion over to JIRA so it's easier to track and reference. Hope this helps, Josh On Thu, Apr 2, 2015 at 7:34 AM, Jacek Lewandowski jacek.lewandow...@datastax.com wrote: A very simple example which works well with Spark 1.2, and fail to compile with Spark 1.3: build.sbt: name := untitled version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 Test.scala: package org.apache.spark.metrics import org.apache.spark.SparkEnv class Test { SparkEnv.get.metricsSystem.report() } Produces: Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term eclipse in package org which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. Error:scalac: bad symbolic reference. A signature in MetricsSystem.class refers to term jetty in value org.eclipse which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling MetricsSystem.class. This looks like something wrong with shading jetty. MetricsSystem references MetricsServlet which references some classes from Jetty, in the original package instead of shaded one. I'm not sure, but adding the following dependencies solves the problem: libraryDependencies += org.eclipse.jetty % jetty-server % 8.1.14.v20131031 libraryDependencies += org.eclipse.jetty % jetty-servlet % 8.1.14.v20131031 Is it intended or is it a bug? Thanks ! Jacek
Microsoft SQL jdbc support from spark sql
Hi, I am trying to pull data from ms-sql server. I have tried using the spark.sql.jdbc CREATE TEMPORARY TABLE c USING org.apache.spark.sql.jdbc OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But it shows java.sql.SQLException: No suitable driver found for jdbc:sqlserver I have jdbc drivers for mssql but i am not sure how to use them I provide the jars to the sql shell and then tried the following: CREATE TEMPORARY TABLE c USING com.microsoft.sqlserver.jdbc.SQLServerDriver OPTIONS ( url jdbc:sqlserver://10.1.0.12:1433\;databaseName=dbname\;, dbtable Customer ); But this gives ERROR CliDriver: scala.MatchError: SQLServerDriver:4 (of class com.microsoft.sqlserver.jdbc.SQLServerDriver) Can anyone tell what is the proper way to connect to ms-sql server. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
graphx running time
Hi im currently using graphx for some analysis and have come into a bit of a hurdle. If use my test dataset of 20 nodes and about 30 links it runs really quickly. I have two other data sets i use one of 10million links and one of 20 million. When i create my graphs seems to work okay and i can get a count of the vertices in around 10 minutes (12 node cluster with 192 cores and 256Gb RAM on each node). Code below for this part: import org.apache.spark.graphx._ import java.util.HashMap import scala.collection.JavaConversions._ import org.apache.spark.rdd.RDD //create the source graph from hdfs case class Entity(Active: String, WA_Ind: String, Ent_Typ: String, Mkt_Seg: String, Status: String) //read the vertex information from HDFS val vRDD: RDD[(Long,(Entity))] = sc.parallelize(sc.textFile(hdfs://Data/entities_2011.csv,1000). map(x=x.split(,)). map(x=(x(0).toLong,new Entity(x(1),x(2),x(3),x(4),A))). collect) //read the edge information from hdfs val eRDD = sc.parallelize((sc.textFile(hdfs://Data/links_2011.csv,1000). map(x=(Edge(x.split(,)(0).toInt,x.split(,)(1).toInt,1))).collect)) val sourceGraph: Graph[(Entity),Int] = Graph(vRDD,eRDD).cache() type DistanceMap = HashMap[(VertexId,String), Int] //create new class and place holders for information case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg: Int,Distance:DistanceMap) //val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case (id,(entity)) = Node(entity,0,0,0,new DistanceMap)} val sourceGraph2 = sourceGraph.mapVertices{case (id,(entity)) = Node(entity,0,0,0,new DistanceMap)} the problem is though when i use pregel to do some work in the graph it never seems to finish. I can do the really small set (20 vertices) fine, the 10 million works but the 20 million never finishes. Code below. Any assistance appreciated //create new class and place holders for information case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg: Int,Distance:DistanceMap) //val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case (id,(entity)) = Node(entity,0,0,0,new DistanceMap)} val sourceGraph2 = sourceGraph.mapVertices{case (id,(entity)) = Node(entity,0,0,0,new DistanceMap)} // distance map to hold network root and node level // updated the pregel functions to include nodes market structure // distance map was HashMap[VertexId, Int] // and changed // initMap.put((vid), 0) // to // initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0) type DistanceMap = HashMap[(VertexId,String), Int] val initDists: Graph[DistanceMap, Int] = sourceGraph2.outerJoinVertices(sourceGraph2.inDegrees) { (vid, prevAttr, inDeg) = val initMap = new DistanceMap if (inDeg.getOrElse(0) == 0) { initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0) } initMap }.cache() def sendMsg(edge: EdgeTriplet[DistanceMap, Int]): Iterator[(VertexId, DistanceMap)] = { val updatedDists = new DistanceMap edge.srcAttr.foreach { case (source, dist) = if (!edge.dstAttr.containsKey(source) || edge.dstAttr.get(source) dist + 1) { updatedDists.put(source, dist + 1) } } if (!updatedDists.isEmpty) { Iterator((edge.dstId, updatedDists)) } else { Iterator.empty } } def mergeMsg(a: DistanceMap, b: DistanceMap): DistanceMap = { val merged = new DistanceMap(a) b.foreach { case (source, dist) = if (merged.containsKey(source)) { merged.put(source, math.min(merged.get(source), dist)) } else { merged.put(source, dist) } } merged } def vprog(vid: VertexId, curDists: DistanceMap, newDists: DistanceMap): DistanceMap = { mergeMsg(curDists, newDists) } val dists = initDists.pregel[DistanceMap](new DistanceMap)(vprog, sendMsg, mergeMsg) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/graphx-running-time-tp22398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2.0 with Play/Activator
Hello Manish, you can take a look at the spark-notebook build, it's a bit tricky to get rid of some clashes but at least you can refer to this build to have ideas. LSS, I have stripped out akka from play deps. ref: https://github.com/andypetrella/spark-notebook/blob/master/build.sbt https://github.com/andypetrella/spark-notebook/blob/master/project/Dependencies.scala https://github.com/andypetrella/spark-notebook/blob/master/project/Shared.scala HTH, cheers andy Le mar 7 avr. 2015 07:26, Manish Gupta 8 mgupt...@sapient.com a écrit : Hi, We are trying to build a Play framework based web application integrated with Apache Spark. We are running *Apache Spark 1.2.0 CDH 5.3.0*. But struggling with akka version conflicts (errors like java.lang.NoSuchMethodError in akka). We have tried Play 2.2.6 as well as Activator 1.3.2. If anyone has successfully integrated Spark 1.2.0 with Play/Activator, please share the version we should use and akka dependencies we should add in Build.sbt. Thanks, Manish
Re: Sending RDD object over the network
Are you expecting to receive 1 to 100 values in your second program? RDD is just an abstraction, you would need to do like: num.foreach(x = send(x)) Thanks Best Regards On Mon, Apr 6, 2015 at 1:56 AM, raggy raghav0110...@gmail.com wrote: For a class project, I am trying to utilize 2 spark Applications communicate with each other by passing an RDD object that was created from one application to another Spark application. The first application is developed in Scala and creates an RDD and sends it to the 2nd application over the network as follows: val logFile = ../../spark-1.3.0/README.md // Should be some file on your system val conf = new SparkConf(); conf.setAppName(Simple Application).setMaster(local[2]) val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 100, 2).toJavaRDD(); val s = new Socket(127.0.0.1, 8000); val objectOutput = new ObjectOutputStream(s.getOutputStream()); objectOutput.writeObject(nums); s.close(); The second Spark application is a Java application, which tries to receive the RDD object and then perform some operations on it. At the moment, I am trying to see if I have properly obtained the object. ServerSocket listener = null; Socket client; try{ listener = new ServerSocket(8000); }catch(Exception e){ e.printStackTrace(); } System.out.println(Listening); try{ client = listener.accept(); ObjectInputStream objectInput = new ObjectInputStream(client.getInputStream()); Object object =(JavaRDD) objectInput.readObject(); JavaRDD tmp = (JavaRDD) object; if(tmp != null){ System.out.println(tmp.getStorageLevel().toString()); ListPartition p = tmp.partitions(); } else{ System.out.println(variable is null); } }catch(Exception e){ e.printStackTrace(); } The output I get is: StorageLevel(false, false, false, false, 1) java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at SimpleApp.main(SimpleApp.java:35) So, System.out.println(tmp.getStorageLevel().toString()); prints out properly. But, ListPartition p = tmp.partitions(); throws the NullPointerException. I can't seem to figure out why this is happening. In a nutshell, I am basically trying to create an RDD object in one Spark application and then send the object to another application. After receiving the object I try to make sure I received it properly by accessing its methods. Invoking the partitions() method in the original Spark application does not throw any errors either. I would greatly appreciate any suggestion on how I can solve my problem, or an alternative solution for what I am trying to accomplish. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sending RDD object over the network
Hey Akhil, Thanks for your response! No, I am not expecting to receive the values themselves. I am just trying to receive the RDD object on my second Spark application. However, I get a NPE when I try to use the object within my second program. Would you know how I can properly send the RDD object to my second program? Thanks, Raghav On Mon, Apr 6, 2015 at 3:08 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you expecting to receive 1 to 100 values in your second program? RDD is just an abstraction, you would need to do like: num.foreach(x = send(x)) Thanks Best Regards On Mon, Apr 6, 2015 at 1:56 AM, raggy raghav0110...@gmail.com wrote: For a class project, I am trying to utilize 2 spark Applications communicate with each other by passing an RDD object that was created from one application to another Spark application. The first application is developed in Scala and creates an RDD and sends it to the 2nd application over the network as follows: val logFile = ../../spark-1.3.0/README.md // Should be some file on your system val conf = new SparkConf(); conf.setAppName(Simple Application).setMaster(local[2]) val sc = new SparkContext(conf) val nums = sc.parallelize(1 to 100, 2).toJavaRDD(); val s = new Socket(127.0.0.1, 8000); val objectOutput = new ObjectOutputStream(s.getOutputStream()); objectOutput.writeObject(nums); s.close(); The second Spark application is a Java application, which tries to receive the RDD object and then perform some operations on it. At the moment, I am trying to see if I have properly obtained the object. ServerSocket listener = null; Socket client; try{ listener = new ServerSocket(8000); }catch(Exception e){ e.printStackTrace(); } System.out.println(Listening); try{ client = listener.accept(); ObjectInputStream objectInput = new ObjectInputStream(client.getInputStream()); Object object =(JavaRDD) objectInput.readObject(); JavaRDD tmp = (JavaRDD) object; if(tmp != null){ System.out.println(tmp.getStorageLevel().toString()); ListPartition p = tmp.partitions(); } else{ System.out.println(variable is null); } }catch(Exception e){ e.printStackTrace(); } The output I get is: StorageLevel(false, false, false, false, 1) java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at SimpleApp.main(SimpleApp.java:35) So, System.out.println(tmp.getStorageLevel().toString()); prints out properly. But, ListPartition p = tmp.partitions(); throws the NullPointerException. I can't seem to figure out why this is happening. In a nutshell, I am basically trying to create an RDD object in one Spark application and then send the object to another application. After receiving the object I try to make sure I received it properly by accessing its methods. Invoking the partitions() method in the original Spark application does not throw any errors either. I would greatly appreciate any suggestion on how I can solve my problem, or an alternative solution for what I am trying to accomplish. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-RDD-object-over-the-network-tp22382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low resource when upgrading from 1.1.0 to 1.3.0
I also meet the same problem. I deploy and run spark(version:1.3.0) on local mode. when i run a simple app that counts lines of a file, the console prints TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources . I think my example app doen't need 512M memory.(I start worker with 512M) omidb, if you have solved this problem, please tell me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-resource-when-upgrading-from-1-1-0-to-1-3-0-tp22379p22387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Write to Parquet File in Python
Thank you so much for your reply. We would like to provide a tool to the user to convert a binary file to a file in Avro/Parquet format on his own computer. The tool will parse binary file in python, and convert the data to Parquet. (BTW can we append to parquet file). The issue is that we do not want the user to install spark on his machine. Our converter is in python. How can we access Spark ? Is it possible to include it as a jar library and access from our python code? Any input will be useful, Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Write-to-Parquet-File-in-Python-tp22186p22388.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Learning Spark
We had few sessions at Sigmoid, you could go through the meetup page for details: http://www.meetup.com/Real-Time-Data-Processing-and-Cloud-Computing/ On 6 Apr 2015 18:01, Abhideep Chakravarty abhideep.chakrava...@mindtree.com wrote: Hi all, We are here planning to setup a Spark learning session series. I need all of your input to create a TOC for this program i.e. what all to cover if we need to start from basics and upto what we should go to cover all the aspects of Spark in details. Also, I need to know on what all databases, Spark can work (other than Cassandra) ? Input from you will be very helpful. Thanks in advance for your time and effort. Regards, Abhideep -- http://www.mindtree.com/email/disclaimer.html
Re: (send this email to subscribe)
Please send email to user-subscr...@spark.apache.org On Mon, Apr 6, 2015 at 6:52 AM, 林晨 bewit...@gmail.com wrote:
RDD generated on every query
Hi , In Spark Web Application the RDD is generating every time client is sending a query request. Is there any way where the RDD is compiled once and run query again and again on active SparkContext? Thanks, Siddharth Ubale, Synchronized Communications #43, Velankani Tech Park, Block No. II, 3rd Floor, Electronic City Phase I, Bangalore – 560 100 Tel : +91 80 3202 4060 Web: www.syncoms.comhttp://www.syncoms.com/ [LogoNEWmohLARGE] London|Bangalore|Orlando we innovate, plan, execute, and transform the business
Re: Learning Spark
bq. I need to know on what all databases You can access HBase using Spark. Cheers On Mon, Apr 6, 2015 at 5:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: We had few sessions at Sigmoid, you could go through the meetup page for details: http://www.meetup.com/Real-Time-Data-Processing-and-Cloud-Computing/ On 6 Apr 2015 18:01, Abhideep Chakravarty abhideep.chakrava...@mindtree.com wrote: Hi all, We are here planning to setup a Spark learning session series. I need all of your input to create a TOC for this program i.e. what all to cover if we need to start from basics and upto what we should go to cover all the aspects of Spark in details. Also, I need to know on what all databases, Spark can work (other than Cassandra) ? Input from you will be very helpful. Thanks in advance for your time and effort. Regards, Abhideep -- http://www.mindtree.com/email/disclaimer.html
Spark 1.3.0: Running Pi example on YARN fails
I have `Hadoop 2.6.0.2.2.0.0-2041` with `Hive 0.14.0.2.2.0.0-2041 ` After building Spark with command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package I try to run Pi example on YARN with the following command: export HADOOP_CONF_DIR=/etc/hadoop/conf /var/home2/test/spark/bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --executor-memory 3G \ --num-executors 50 \ hdfs:///user/test/jars/spark-examples-1.3.0-hadoop2.4.0.jar \ 1000 I get exceptions: `application_1427875242006_0029 failed 2 times due to AM Container for appattempt_1427875242006_0029_02 exited with exitCode: 1` Which in fact is `Diagnostics: Exception from container-launch.`(please see log below). Application tracking url reveals the following messages: java.lang.Exception: Unknown container. Container either has not started or has already completed or doesn't belong to this node at all and also: Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster I have Hadoop working fine on 4 nodes and completly at a loss how to make Spark work on YARN. Please advise where to look for, any ideas would be of great help, thank you! Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/04/06 10:53:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/06 10:53:42 INFO impl.TimelineClientImpl: Timeline service address: http://etl-hdp-yarn.foo.bar.com:8188/ws/v1/timeline/ 15/04/06 10:53:42 INFO client.RMProxy: Connecting to ResourceManager at etl-hdp-yarn.foo.bar.com/192.168.0.16:8050 15/04/06 10:53:42 INFO yarn.Client: Requesting a new application from cluster with 4 NodeManagers 15/04/06 10:53:42 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/04/06 10:53:42 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/04/06 10:53:42 INFO yarn.Client: Setting up container launch context for our AM 15/04/06 10:53:42 INFO yarn.Client: Preparing resources for our AM container 15/04/06 10:53:43 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/04/06 10:53:43 INFO yarn.Client: Uploading resource file:/var/home2/test/spark-1.3.0/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.6.0.jar - hdfs:// etl-hdp-nn1.foo.bar.com:8020/user/test/.sparkStaging/application_1427875242006_0029/spark-assembly-1.3.0-hadoop2.6.0.jar 15/04/06 10:53:44 INFO yarn.Client: Source and destination file systems are the same. Not copying hdfs:/user/test/jars/spark-examples-1.3.0-hadoop2.4.0.jar 15/04/06 10:53:44 INFO yarn.Client: Setting up the launch environment for our AM container 15/04/06 10:53:44 INFO spark.SecurityManager: Changing view acls to: test 15/04/06 10:53:44 INFO spark.SecurityManager: Changing modify acls to: test 15/04/06 10:53:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test); users with modify permissions: Set(test) 15/04/06 10:53:44 INFO yarn.Client: Submitting application 29 to ResourceManager 15/04/06 10:53:44 INFO impl.YarnClientImpl: Submitted application application_1427875242006_0029 15/04/06 10:53:45 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:45 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428317623905 final status: UNDEFINED tracking URL: http://etl-hdp-yarn.foo.bar.com:8088/proxy/application_1427875242006_0029/ user: test 15/04/06 10:53:46 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:47 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:48 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:49 INFO yarn.Client: Application report for application_1427875242006_0029 (state: FAILED) 15/04/06 10:53:49 INFO yarn.Client: client token: N/A diagnostics: Application application_1427875242006_0029 failed 2 times due to AM Container for appattempt_1427875242006_0029_02 exited with exitCode: 1 For more detailed output, check application tracking page: http://etl-hdp-yarn.foo.bar.com:8088/proxy/application_1427875242006_0029/Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_1427875242006_0029_02_01
What happened to the Row class in 1.3.0?
I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cannot build learning spark project
Hi, I am trying to build this project https://github.com/databricks/learning-spark with mvn package.This should work out of the box but unfortunately it doesn't. In fact, I get the following error: mvn pachage -X Apache Maven 3.0.5 Maven home: /usr/share/maven Java version: 1.7.0_76, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-7-oracle/jre Default locale: en_US, platform encoding: UTF-8 OS name: linux, version: 3.13.0-45-generic, arch: amd64, family: unix [INFO] Error stacktraces are turned on. [DEBUG] Reading global settings from /usr/share/maven/conf/settings.xml [DEBUG] Reading user settings from /home/adam/.m2/settings.xml [DEBUG] Using local repository at /home/adam/.m2/repository [DEBUG] Using manager EnhancedLocalRepositoryManager with priority 10 for /home/adam/.m2/repository [INFO] Scanning for projects... [DEBUG] Extension realms for project com.oreilly.learningsparkexamples:java:jar:0.0.2: (none) [DEBUG] Looking up lifecyle mappings for packaging jar from ClassRealm[plexus.core, parent: null] [ERROR] The build could not read 1 project - [Help 1] org.apache.maven.project.ProjectBuildingException: Some problems were encountered while processing the POMs: [ERROR] 'dependencies.dependency.artifactId' for org.scalatest:scalatest_${scala.binary.version}:jar with value 'scalatest_${scala.binary.version}' does not match a valid id pattern. @ line 101, column 19 at org.apache.maven.project.DefaultProjectBuilder.build(DefaultProjectBuilder.java:363) at org.apache.maven.DefaultMaven.collectProjects(DefaultMaven.java:636) at org.apache.maven.DefaultMaven.getProjectsForMavenReactor(DefaultMaven.java:585) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:234) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) [ERROR] [ERROR] The project com.oreilly.learningsparkexamples:java:0.0.2 (/home/adam/learning-spark/learning-spark-master/pom.xml) has 1 error [ERROR] 'dependencies.dependency.artifactId' for org.scalatest:scalatest_${scala.binary.version}:jar with value 'scalatest_${scala.binary.version}' does not match a valid id pattern. @ line 101, column 19 [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException As a further step I would like to know how to build it against DataStax Enterprise 4.6.2 Any help is appreciated! *// Adamantios*
Re: Cannot build learning spark project
(This mailing list concerns Spark itself rather than the book about Spark. Your question is about building code that isn't part of Spark, so, the right place to ask is https://github.com/databricks/learning-spark You have a typo in pachage but I assume that's just your typo in this email.) On Mon, Apr 6, 2015 at 12:23 PM, Adamantios Corais adamantios.cor...@gmail.com wrote: Hi, I am trying to build this project https://github.com/databricks/learning-spark with mvn package.This should work out of the box but unfortunately it doesn't. In fact, I get the following error: mvn pachage -X Apache Maven 3.0.5 Maven home: /usr/share/maven Java version: 1.7.0_76, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-7-oracle/jre Default locale: en_US, platform encoding: UTF-8 OS name: linux, version: 3.13.0-45-generic, arch: amd64, family: unix [INFO] Error stacktraces are turned on. [DEBUG] Reading global settings from /usr/share/maven/conf/settings.xml [DEBUG] Reading user settings from /home/adam/.m2/settings.xml [DEBUG] Using local repository at /home/adam/.m2/repository [DEBUG] Using manager EnhancedLocalRepositoryManager with priority 10 for /home/adam/.m2/repository [INFO] Scanning for projects... [DEBUG] Extension realms for project com.oreilly.learningsparkexamples:java:jar:0.0.2: (none) [DEBUG] Looking up lifecyle mappings for packaging jar from ClassRealm[plexus.core, parent: null] [ERROR] The build could not read 1 project - [Help 1] org.apache.maven.project.ProjectBuildingException: Some problems were encountered while processing the POMs: [ERROR] 'dependencies.dependency.artifactId' for org.scalatest:scalatest_${scala.binary.version}:jar with value 'scalatest_${scala.binary.version}' does not match a valid id pattern. @ line 101, column 19 at org.apache.maven.project.DefaultProjectBuilder.build(DefaultProjectBuilder.java:363) at org.apache.maven.DefaultMaven.collectProjects(DefaultMaven.java:636) at org.apache.maven.DefaultMaven.getProjectsForMavenReactor(DefaultMaven.java:585) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:234) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) [ERROR] [ERROR] The project com.oreilly.learningsparkexamples:java:0.0.2 (/home/adam/learning-spark/learning-spark-master/pom.xml) has 1 error [ERROR] 'dependencies.dependency.artifactId' for org.scalatest:scalatest_${scala.binary.version}:jar with value 'scalatest_${scala.binary.version}' does not match a valid id pattern. @ line 101, column 19 [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException As a further step I would like to know how to build it against DataStax Enterprise 4.6.2 Any help is appreciated! // Adamantios - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
(send this email to subscribe)
Learning Spark
Hi all, We are here planning to setup a Spark learning session series. I need all of your input to create a TOC for this program i.e. what all to cover if we need to start from basics and upto what we should go to cover all the aspects of Spark in details. Also, I need to know on what all databases, Spark can work (other than Cassandra) ? Input from you will be very helpful. Thanks in advance for your time and effort. Regards, Abhideep http://www.mindtree.com/email/disclaimer.html
Using DIMSUM with ids
The example below illustrates how to use the DIMSUM algorithm to calculate the similarity between each two rows and output row pairs with cosine simiarity that is not less than a threshold. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala But what if I hope to hold an Id of each row, which means the input file is: id1 vector1 id2 vector2 id3 vector3 ... And we hope to output id1 id2 sim(id1, id2) id1 id3 sim(id1, id3) ... Alcaid
Spark 1.3.0: Running Pi example on YARN fails
I have `Hadoop 2.6.0.2.2.0.0-2041` with `Hive 0.14.0.2.2.0.0-2041 ` After building Spark with command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package I try to run Pi example on YARN with the following command: export HADOOP_CONF_DIR=/etc/hadoop/conf /var/home2/test/spark/bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --executor-memory 3G \ --num-executors 50 \ hdfs:///user/test/jars/spark-examples-1.3.0-hadoop2.4.0.jar \ 1000 I get exceptions: `application_1427875242006_0029 failed 2 times due to AM Container for appattempt_1427875242006_0029_02 exited with exitCode: 1` Which in fact is `Diagnostics: Exception from container-launch.`(please see log below). Application tracking url reveals the following messages: java.lang.Exception: Unknown container. Container either has not started or has already completed or doesn't belong to this node at all and also: Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster I have Hadoop working fine on 4 nodes and completly at a loss how to make Spark work on YARN. Please advise where to look for, any ideas would be of great help, thank you! Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/04/06 10:53:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/06 10:53:42 INFO impl.TimelineClientImpl: Timeline service address: http://etl-hdp-yarn.foo.bar.com:8188/ws/v1/timeline/ 15/04/06 10:53:42 INFO client.RMProxy: Connecting to ResourceManager at etl-hdp-yarn.foo.bar.com/192.168.0.16:8050 15/04/06 10:53:42 INFO yarn.Client: Requesting a new application from cluster with 4 NodeManagers 15/04/06 10:53:42 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/04/06 10:53:42 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/04/06 10:53:42 INFO yarn.Client: Setting up container launch context for our AM 15/04/06 10:53:42 INFO yarn.Client: Preparing resources for our AM container 15/04/06 10:53:43 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/04/06 10:53:43 INFO yarn.Client: Uploading resource file:/var/home2/test/spark-1.3.0/assembly/target/scala-2.10/spark-assembly-1.3.0-hadoop2.6.0.jar - hdfs://etl-hdp-nn1.foo.bar.com:8020/user/test/.sparkStaging/application_1427875242006_0029/spark-assembly-1.3.0-hadoop2.6.0.jar 15/04/06 10:53:44 INFO yarn.Client: Source and destination file systems are the same. Not copying hdfs:/user/test/jars/spark-examples-1.3.0-hadoop2.4.0.jar 15/04/06 10:53:44 INFO yarn.Client: Setting up the launch environment for our AM container 15/04/06 10:53:44 INFO spark.SecurityManager: Changing view acls to: test 15/04/06 10:53:44 INFO spark.SecurityManager: Changing modify acls to: test 15/04/06 10:53:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test); users with modify permissions: Set(test) 15/04/06 10:53:44 INFO yarn.Client: Submitting application 29 to ResourceManager 15/04/06 10:53:44 INFO impl.YarnClientImpl: Submitted application application_1427875242006_0029 15/04/06 10:53:45 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:45 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1428317623905 final status: UNDEFINED tracking URL: http://etl-hdp-yarn.foo.bar.com:8088/proxy/application_1427875242006_0029/ user: test 15/04/06 10:53:46 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:47 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:48 INFO yarn.Client: Application report for application_1427875242006_0029 (state: ACCEPTED) 15/04/06 10:53:49 INFO yarn.Client: Application report for application_1427875242006_0029 (state: FAILED) 15/04/06 10:53:49 INFO yarn.Client: client token: N/A diagnostics: Application application_1427875242006_0029 failed 2 times due to AM Container for appattempt_1427875242006_0029_02 exited with exitCode: 1 For more detailed output, check application tracking page:http://etl-hdp-yarn.foo.bar.com:8088/proxy/application_1427875242006_0029/Then, click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_1427875242006_0029_02_01
Re: What happened to the Row class in 1.3.0?
From scaladoc of sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala : * To create a new Row, use [[RowFactory.create()]] in Java or [[Row.apply()]] in Scala. * Cheers On Mon, Apr 6, 2015 at 7:23 AM, ARose ashley.r...@telarix.com wrote: I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What happened to the Row class in 1.3.0?
I searched code base but didn't find RowFactory class. Pardon me. On Mon, Apr 6, 2015 at 7:39 AM, Ted Yu yuzhih...@gmail.com wrote: From scaladoc of sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala : * To create a new Row, use [[RowFactory.create()]] in Java or [[Row.apply()]] in Scala. * Cheers On Mon, Apr 6, 2015 at 7:23 AM, ARose ashley.r...@telarix.com wrote: I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What happened to the Row class in 1.3.0?
Row class was not documented mistakenly in 1.3.0 you can check the 1.3.1 API doc http://people.apache.org/~pwendell/spark-1.3.1-rc1-docs/api/scala/index.html#org.apache.spark.sql.Row Best, -- Nan Zhu http://codingcat.me On Monday, April 6, 2015 at 10:23 AM, ARose wrote: I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: What happened to the Row class in 1.3.0?
Hi, Ted It’s here: https://github.com/apache/spark/blob/61b427d4b1c4934bd70ed4da844b64f0e9a377aa/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java Best, -- Nan Zhu http://codingcat.me On Monday, April 6, 2015 at 10:44 AM, Ted Yu wrote: I searched code base but didn't find RowFactory class. Pardon me. On Mon, Apr 6, 2015 at 7:39 AM, Ted Yu yuzhih...@gmail.com (mailto:yuzhih...@gmail.com) wrote: From scaladoc of sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala : * To create a new Row, use [[RowFactory.create()]] in Java or [[Row.apply()]] in Scala. * Cheers On Mon, Apr 6, 2015 at 7:23 AM, ARose ashley.r...@telarix.com (mailto:ashley.r...@telarix.com) wrote: I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Spark Avarage
Hi I have a class in above desc. case class weatherCond(dayOfdate: String, minDeg: Int, maxDeg: Int, meanDeg: Int) I am reading the data from csv file and I put this data into weatherCond class with this code val weathersRDD = sc.textFile(weather.csv).map { line = val Array(dayOfdate, minDeg, maxDeg, meanDeg) = line.replaceAll(\,).trim.split(,) weatherCond(dayOfdate, minDeg.toInt, maxDeg.toInt, meanDeg.toInt) } the question is ; how can I average the minDeg, maxDeg and meanDeg values for each month ; The data set example day, min, max , mean 2014-03-17,-3,5,5 2014-03-18,6,7,7 2014-03-19,6,14,10 result has to be (2014-03, 3, 8.6 ,7.3) -- (Average for 2014 - 03 ) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Avarage-tp22391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to work with sparse data in Python?
I'm trying to apply Spark to a NLP problem that I'm working around. I have near 4 million tweets text and I have converted them into word vectors. It's pretty sparse because each message just has dozens of words but the vocabulary has tens of thousand words. These vectors should be loaded each time my program handles the data. I stack these vectors to a 50k(size of voca.)x4M(count of msg.) sparse matrix with scipy.sparse to persist it on my disk for two reasons: 1) It just costs 400MB of disk space 2) Loading and parsing it is really fast. (I convert it to csr_matrix and index each row for the messages) This works good on my local machine, with common Python and scipy/numpy. However, It seems Spark does not support scipy.sparse directly. Again, I used a csr_matrix, and I can extract a specific row and convert to a numpy array efficiently. But when I parallelize it Spark errored: sparse matrix length is ambiguous; use getnnz() or shape[0]. csr_matrix does not support len(), so Spark cannot partition it. Now I use this matrix as a broadcast variable (it's relatively small for the memory), and parallelize a xrange(0, matrix.shape[0]) list to index the matrix in map function. It's there a better solution? Thanks.
DataFrame -- help with encoding factor variables
Hi folks, currently have a DF that has a factor variable -- say gender. I am hoping to use the RandomForest algorithm on this data an it appears that this needs to be converted to RDD[LabeledPoint] first -- i.e. all features need to be double-encoded. I see https://issues.apache.org/jira/browse/SPARK-5888 is still open but was wondering what is the recommended way to add a column? I can think of featuresDF.map { case Row(f1,f2,f3) =(f1,f2,if (f3=='male') 0 else 1,if (f3=='female') 0 else 1) }.toDF(f1,f2,f3_dummy,f3_dummy2) but that isn't ideal as I already have 80+ features in that dataframe so the matching itself is a pain -- thinking there's got to be a better way to append |levels| number of columns and select all columns but f3? I see a withColumn method but no constructor to create a column...should I be creating the dummy features in a new dataframe and then select them out of there to get a Column? Any pointers are appreciated -- I'm sure I'm not the first person to attempt this, just unsure of the least painful way to achieve.
Re: What happened to the Row class in 1.3.0?
Thanks Nan. I was searching for RowFactory.scala Cheers On Mon, Apr 6, 2015 at 7:52 AM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Ted It’s here: https://github.com/apache/spark/blob/61b427d4b1c4934bd70ed4da844b64f0e9a377aa/sql/catalyst/src/main/java/org/apache/spark/sql/RowFactory.java Best, -- Nan Zhu http://codingcat.me On Monday, April 6, 2015 at 10:44 AM, Ted Yu wrote: I searched code base but didn't find RowFactory class. Pardon me. On Mon, Apr 6, 2015 at 7:39 AM, Ted Yu yuzhih...@gmail.com wrote: From scaladoc of sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala : * To create a new Row, use [[RowFactory.create()]] in Java or [[Row.apply()]] in Scala. * Cheers On Mon, Apr 6, 2015 at 7:23 AM, ARose ashley.r...@telarix.com wrote: I am trying to call Row.create(object[]) similarly to what's shown in this programming guide https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema , but the create() method is no longer recognized. I tried to look up the documentation for the Row api, but it does not seem to exist: http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/scala/index.html#org.apache.spark.sql.api.java.Row Is there a new equivalent for doing this programmatic specification of schema in 1.3.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happened-to-the-Row-class-in-1-3-0-tp22389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis
Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim ᐧ On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: ᐧ Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same
Re: Spark Streaming 1.3 Kafka Direct Streams
Somewhat agree on subclassing and its issues. It looks like the alternative in spark 1.3.0 to create a custom build. Is there an enhancement filed for this? If not, I'll file one. Thanks! -neelesh On Wed, Apr 1, 2015 at 12:46 PM, Tathagata Das t...@databricks.com wrote: The challenge of opening up these internal classes to public (even with Developer API tag) is that it prevents us from making non-trivial changes without breaking API compatibility for all those who had subclassed. Its a tradeoff that is hard to optimize. That's why we favor exposing more optional parameters in the stable API (KafkaUtils) so that we can maintain binary compatibility with user code as well as allowing us to make non-trivial changes internally. That said, it may be worthwhile to actually take an optional compute function as a parameter through the KafkaUtils, as Cody suggested ( (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD]). Worth thinking about its implications in the context of the driver restarts, etc (as those function will get called again on restart, and different return value from before can screw up semantics). TD On Wed, Apr 1, 2015 at 12:28 PM, Neelesh neele...@gmail.com wrote: +1 for subclassing. its more flexible if we can subclass the implementation classes. On Apr 1, 2015 12:19 PM, Cody Koeninger c...@koeninger.org wrote: As I said in the original ticket, I think the implementation classes should be exposed so that people can subclass and override compute() to suit their needs. Just adding a function from Time = Set[TopicAndPartition] wouldn't be sufficient for some of my current production use cases. compute() isn't really a function from Time = Option[KafkaRDD], it's a function from (Time, current offsets, kafka metadata, etc) = Option[KafkaRDD] I think it's more straightforward to give access to that additional state via subclassing than it is to add in more callbacks for every possible use case. On Wed, Apr 1, 2015 at 2:01 PM, Tathagata Das t...@databricks.com wrote: We should be able to support that use case in the direct API. It may be as simple as allowing the users to pass on a function that returns the set of topic+partitions to read from. That is function (Time) = Set[TopicAndPartition] This gets called every batch interval before the offsets are decided. This would allow users to add topics, delete topics, modify partitions on the fly. What do you think Cody? On Wed, Apr 1, 2015 at 11:57 AM, Neelesh neele...@gmail.com wrote: Thanks Cody! On Wed, Apr 1, 2015 at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without building your own spark. On Wed, Apr 1, 2015 at 1:09 PM, Neelesh neele...@gmail.com wrote: Thanks Cody, that was really helpful. I have a much better understanding now. One last question - Kafka topics are initialized once in the driver, is there an easy way of adding/removing topics on the fly? KafkaRDD#getPartitions() seems to be computed only once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the workers. However, with the new kafka direct input streams, its hard for me to understand where the code that's reading from kafka brokers runs. Does it run on the driver (I hope not), or does it run on workers? Any help appreciated thanks! -neelesh
Re: Spark Avarage
If you're going to do it this way, I would ouput dayOfdate.substring(0,7), i.e. the month part, and instead of weatherCond, you can use (month,(minDeg,maxDeg,meanDeg)) --i.e. PairRDD. So weathersRDD: RDD[(String,(Double,Double,Double))]. Then use a reduceByKey as shown in multiple Spark examples..You'd end up with the sum for each metric and in the end divide by the count to get the avg of each column. If you want to use Algebird you can output (month,(Avg(minDeg),Avg(maxDeg),Avg(meanDeg))) and then all your reduce operations would be _+_. With that said, if you're using spark 1.3 check out https://github.com/databricks/spark-csv (you should likely use the CSV package anyway, even with a lower version of Spark) and https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.DataFrame (esp. the example at the top of the file). You'd just need .groupByand .agg if you setup your dataframe column that you're grouping by to contain just the -MM portion of your date string. On Mon, Apr 6, 2015 at 10:50 AM, barisak baris.akg...@gmail.com wrote: Hi I have a class in above desc. case class weatherCond(dayOfdate: String, minDeg: Int, maxDeg: Int, meanDeg: Int) I am reading the data from csv file and I put this data into weatherCond class with this code val weathersRDD = sc.textFile(weather.csv).map { line = val Array(dayOfdate, minDeg, maxDeg, meanDeg) = line.replaceAll(\,).trim.split(,) weatherCond(dayOfdate, minDeg.toInt, maxDeg.toInt, meanDeg.toInt) } the question is ; how can I average the minDeg, maxDeg and meanDeg values for each month ; The data set example day, min, max , mean 2014-03-17,-3,5,5 2014-03-18,6,7,7 2014-03-19,6,14,10 result has to be (2014-03, 3, 8.6 ,7.3) -- (Average for 2014 - 03 ) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Avarage-tp22391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WordCount example
Interesting, I see 0 cores in the UI? - *Cores:* 0 Total, 0 Used On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das t...@databricks.com wrote: What does the Spark Standalone UI at port 8080 say about number of cores? On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com wrote: [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: