Re: Spark MLlib:Collaborative Filtering
You could use the string indexer to convert your string userids and product ids numeric value. http://spark.apache.org/docs/latest/ml-features.html#stringindexer Thanking You - Praveen Devarao IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: glen To: "Devi P.V" Cc: "user@spark.apache.org" Date: 24/08/2016 02:10 pm Subject:Re: Spark MLlib:Collaborative Filtering Hash it to int On 2016-08-24 16:28 , Devi P.V Wrote: Hi all, I am newbie in collaborative filtering.I want to implement collaborative filtering algorithm(need to find top 10 recommended products) using Spark and Scala.I have a rating dataset where userID & ProductID are String type. UserID ProductID Rating b3a68043-c1 p1-160ff5fDS-f74 1 b3a68043-c2 p5-160ff5fDS-f74 1 b3a68043-c0 p9-160ff5fDS-f74 1 I tried ALS algorithm using spark MLlib.But it support rating userID & productID only Integer type.How can I solve this problem? Thanks In Advance
Re: removing header from csv file
Hi Ashutosh, Could you give more details as to what you are wanting do and in what feature of Spark you want use? Yes, spark-csv is a connector for SparkSQL module...hence it works with SQLContext only. Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Ashutosh Kumar To: "user @spark" Date: 27/04/2016 10:55 am Subject:removing header from csv file I see there is a library spark-csv which can be used for removing header and processing of csv files. But it seems it works with sqlcontext only. Is there a way to remove header from csv files without sqlcontext ? Thanks Ashutosh
Re: Splitting spark dstream into separate fields
Given that you are not specifying any key explicitly [usually people will user the Producer API and have a key value pair inserted] the key will be nullso your tuples would look like below (null, "ID TIMESTAMP PRICE") (null, "40,20160426-080924, 67.55738301621814598514") For values...the positions should be 0 indexedhence (referring to your invocations) words1 will return value for TIMESTAMP and words2 will return value for PRICE >>I assume this is an array that can be handled as elements of an array as well?<< These are all still under your DStream...you will need to invoke action on the DStream to use themfor instance words.foreachRDD(.) It should be easy for you to just run the streaming program and call print on each resulting DStream to understand what data is contained in it and decide how to make use of it. Thanking You --------- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Mich Talebzadeh To: Praveen Devarao/India/IBM@IBMIN, "user @spark" Date: 26/04/2016 04:03 pm Subject:Re: Splitting spark dstream into separate fields Thanks Praveen. With regard to key/value pair. My kafka takes the following rows as input cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list rhes564:9092 --topic newtopic That ${IN_FILE} is the source of prices (1000 as follows ID TIMESTAMP PRICE 40, 20160426-080924, 67.55738301621814598514 So tuples would be like below? (1,"ID") (2, "TIMESTAMP") (3, "PRICE") For values val words1 = lines.map(_.split(',').view(1)) val words2 = lines.map(_.split(',').view(2)) val words3 = lines.map(_.split(',').view(3)) So word1 will return value of ID, word2 will return value of TIMESTAMP and word3 will return value of PRICE? I assume this is an array that can be handled as elements of an array as well? Regards Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 26 April 2016 at 11:11, Praveen Devarao wrote: Hi Mich, >> val lines = dstream.map(_._2) This maps the record into components? Is that the correct understanding of it << Not sure what you refer to when said record into components. The above function is basically giving you the tuple (key/value pair) that you would have inserted into Kafka. say my Kafka producer puts data as 1=>"abc" 2 => "def" Then the above map would give you tuples as below (1,"abc") (2,"abc") >> The following splits the line into comma separated fields. val words = lines.map(_.split(',').view(2)) << Right, basically the value portion of your kafka data is being handled here >> val words = lines.map(_.split(',').view(2)) I am interested in column three So view(2) returns the value. I have also seen other ways like val words = lines.map(_.split(',').map(line => (line(0), (line(1),line(2) ... << The split operation is returning back an array of String [a immutable StringLike collection]calling the view method is creating a IndexedSeqView on the iterable while as in the second way you are iterating through it accessing the elements directly via the index position [line(0), line(1) ]. You would have to decide what is best for your use case based on evaluations should be lazy or immediate [see references below]. References: http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqLike , http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqView Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From:Mich Talebzadeh To:"user @spark" Date:26/04/2016 12:58 pm Subject:Splitting spark dstream
Re: Splitting spark dstream into separate fields
Hi Mich, >> val lines = dstream.map(_._2) This maps the record into components? Is that the correct understanding of it << Not sure what you refer to when said record into components. The above function is basically giving you the tuple (key/value pair) that you would have inserted into Kafka. say my Kafka producer puts data as 1=>"abc" 2 => "def" Then the above map would give you tuples as below (1,"abc") (2,"abc") >> The following splits the line into comma separated fields. val words = lines.map(_.split(',').view(2)) << Right, basically the value portion of your kafka data is being handled here >> val words = lines.map(_.split(',').view(2)) I am interested in column three So view(2) returns the value. I have also seen other ways like val words = lines.map(_.split(',').map(line => (line(0), (line(1),line(2) ... << The split operation is returning back an array of String [a immutable StringLike collection]calling the view method is creating a IndexedSeqView on the iterable while as in the second way you are iterating through it accessing the elements directly via the index position [line(0), line(1) ]. You would have to decide what is best for your use case based on evaluations should be lazy or immediate [see references below]. References: http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqLike , http://www.scala-lang.org/files/archive/api/2.10.6/index.html#scala.collection.mutable.IndexedSeqView Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Mich Talebzadeh To: "user @spark" Date: 26/04/2016 12:58 pm Subject:Splitting spark dstream into separate fields Hi, Is there any optimum way of splitting a dstream into components? I am doing Spark streaming and this the dstream I get val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) Now that dstream consists of 10,00 price lines per second like below ID, TIMESTAMP, PRICE 31,20160426-080924,93.53608929178084896656 The columns are separated by commas/ Now couple of questions: val lines = dstream.map(_._2) This maps the record into components? Is that the correct understanding of it The following splits the line into comma separated fields. val words = lines.map(_.split(',').view(2)) I am interested in column three So view(2) returns the value. I have also seen other ways like val words = lines.map(_.split(',').map(line => (line(0), (line(1),line(2) ... line(0), line(1) refer to the position of the fields? Which one is the adopted one or the correct one? Thanks Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com
Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Cool!! Thanks for the clarification Mike. Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Michael Armbrust To: Praveen Devarao/India/IBM@IBMIN Cc: Reynold Xin , "d...@spark.apache.org" , user Date: 25/04/2016 10:59 pm Subject:Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Spark SQL's query planner has always delayed building the RDD, so has never needed to eagerly calculate the range boundaries (since Spark 1.0). On Mon, Apr 25, 2016 at 2:04 AM, Praveen Devarao wrote: Thanks Reynold for the reason as to why sortBykey invokes a Job When you say "DataFrame/Dataset does not have this issue" is it right to assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in it? Thanking You --------- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From:Reynold Xin To:Praveen Devarao/India/IBM@IBMIN Cc:"d...@spark.apache.org" , user < user@spark.apache.org> Date:25/04/2016 11:26 am Subject:Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Usually no - but sortByKey does because it needs the range boundary to be built in order to have the RDD. It is a long standing problem that's unfortunately very difficult to solve without breaking the RDD API. In DataFrame/Dataset we don't have this issue though. On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao wrote: Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4atopCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b(sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Thanks Reynold for the reason as to why sortBykey invokes a Job When you say "DataFrame/Dataset does not have this issue" is it right to assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in it? Thanking You ----- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Reynold Xin To: Praveen Devarao/India/IBM@IBMIN Cc: "d...@spark.apache.org" , user Date: 25/04/2016 11:26 am Subject:Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Usually no - but sortByKey does because it needs the range boundary to be built in order to have the RDD. It is a long standing problem that's unfortunately very difficult to solve without breaking the RDD API. In DataFrame/Dataset we don't have this issue though. On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao wrote: Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4atopCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b(sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You --------- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Do transformation functions on RDD invoke a Job [sc.runJob]?
Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4a topCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b (sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You --------- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Re: How to know whether I'm in the first batch of spark streaming
Thanks Yu for sharing the use case. >>If our system have some problem, such as hdfs issue, and the "first batch" and "second batch" were both queued. When the issue gone, these two batch will start together. Then, will onBatchStarted be called concurrently for these two batches?<< Not sure...I have not digged in to that detail or faced a situation one such.I see a method onBatchSubmitted in the listener and the comment for the method reads "/** Called when a batch of jobs has been submitted for processing. */" Given that we have an event for batch submitted too...I think the case you mention is a possible scenarioso probably you can use this method in combination with the other two. As all the three methods take BatchInfo as their arguments and the BatchInfo class has the needed details of batchTime you should be able to achieve your task. Thanking You --------- Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Yu Xie To: Praveen Devarao/India/IBM@IBMIN Cc: user@spark.apache.org Date: 21/04/2016 01:40 pm Subject:Re: How to know whether I'm in the first batch of spark streaming Thank you Praveen in our spark streaming, we write down the data to a HDFS directory, and use the MMDDHHHmm00 format of batch time as the directory name. So, when we stop the streaming and start the streaming again (we do not use checkpoint), in the init of the first batch, we will write down the empty directory between the stop and start. If the second batch runs faster than the first batch, and it will have the chance to run the "init". In this case, the directory that the "first batch" will output to will be set to an empty directory by the "second batch", it will make the data mess. I have a question about the StreamingListener. If our system have some problem, such as hdfs issue, and the "first batch" and "second batch" were both queued. When the issue gone, these two batch will start together. Then, will onBatchStarted be called concurrently for these two batches? Thank you On Thu, Apr 21, 2016 at 3:11 PM, Praveen Devarao wrote: Hi Yu, Could you provide more details on what and how are you trying to initialize.are you having this initialization as part of the code block in action of the DStream? Say if the second batch finishes before first batch wouldn't your results be affected as init would have not taken place (since you want it on first batch itself)? One way we could think of knowing the first batch is by implementing the StreamingListenertrait which has a method onBatchStarted and onBatchCompleted...These methods should help you determine the first batch (definitely first batch will start first though order of ending is not guaranteed with concurrentJobs set to more than 1)... Would be interesting to know your use case...could you share, if possible? Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From:Yu Xie To:user@spark.apache.org Date:19/04/2016 01:24 pm Subject:How to know whether I'm in the first batch of spark streaming hi spark users I'm running a spark streaming application, with concurrentJobs > 1, so maybe more than one batches could run together. Now I would like to do some init work in the first batch based on the "time" of the first batch. So even the second batch runs faster than the first batch, I still need to init in the literal "first batch" Then is there a way that I can know that? Thank you
Re: How to know whether I'm in the first batch of spark streaming
Hi Yu, Could you provide more details on what and how are you trying to initialize.are you having this initialization as part of the code block in action of the DStream? Say if the second batch finishes before first batch wouldn't your results be affected as init would have not taken place (since you want it on first batch itself)? One way we could think of knowing the first batch is by implementing the StreamingListener trait which has a method onBatchStarted and onBatchCompleted...These methods should help you determine the first batch (definitely first batch will start first though order of ending is not guaranteed with concurrentJobs set to more than 1)... Would be interesting to know your use case...could you share, if possible? Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Yu Xie To: user@spark.apache.org Date: 19/04/2016 01:24 pm Subject:How to know whether I'm in the first batch of spark streaming hi spark users I'm running a spark streaming application, with concurrentJobs > 1, so maybe more than one batches could run together. Now I would like to do some init work in the first batch based on the "time" of the first batch. So even the second batch runs faster than the first batch, I still need to init in the literal "first batch" Then is there a way that I can know that? Thank you
Re: Spark structured streaming
Thanks Jacek for the pointer. Any idea which package can be used in .format(). The test cases seem to work out of the DefaultSource class defined within the DataFrameReaderWriterSuite [ org.apache.spark.sql.streaming.test.DefaultSource] Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Jacek Laskowski To: Praveen Devarao/India/IBM@IBMIN Cc: user , dev Date: 08/03/2016 04:17 pm Subject:Re: Spark structured streaming Hi Praveen, I've spent few hours on the changes related to streaming dataframes (included in the SPARK-8360) and concluded that it's currently only possible to read.stream(), but not write.stream() since there are no streaming Sinks yet. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Mar 8, 2016 at 10:38 AM, Praveen Devarao wrote: > Hi, > > I would like to get my hands on the structured streaming feature > coming out in Spark 2.0. I have tried looking around for code samples to get > started but am not able to find any. Only few things I could look into is > the test cases that have been committed under the JIRA umbrella > https://issues.apache.org/jira/browse/SPARK-8360but the test cases don't > lead to building a example code as they seem to be working out of internal > classes. > > Could anyone point me to some resources or pointers in code that I > can start with to understand structured streaming from a consumability > angle. > > Thanking You > --------- > Praveen Devarao > Spark Technology Centre > IBM India Software Labs > - > "Courage doesn't always roar. Sometimes courage is the quiet voice at the > end of the day saying I will try again" - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark structured streaming
Hi, I would like to get my hands on the structured streaming feature coming out in Spark 2.0. I have tried looking around for code samples to get started but am not able to find any. Only few things I could look into is the test cases that have been committed under the JIRA umbrella https://issues.apache.org/jira/browse/SPARK-8360 but the test cases don't lead to building a example code as they seem to be working out of internal classes. Could anyone point me to some resources or pointers in code that I can start with to understand structured streaming from a consumability angle. Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Re: Guidelines for writing SPARK packages
Thanks David. I am looking at extending the SparkSQL library with a custom package...hence was looking at more from details on any specific classes to be extended or implement (with) to achieve the redirect of calls to my module (when using .format). If you have any info on these lines do share with me...else debugging through would be the way :-) Thanking You Praveen Devarao From: David Russell To: Praveen Devarao/India/IBM@IBMIN Cc: user Date: 01/02/2016 07:03 pm Subject:Re: Guidelines for writing SPARK packages Sent by:marchoffo...@gmail.com Hi Praveen, The basic requirements for releasing a Spark package on spark-packages.org are as follows: 1. The package content must be hosted by GitHub in a public repo under the owner's account. 2. The repo name must match the package name. 3. The master branch of the repo must contain "README.md" and "LICENSE". Per the doc on spark-packages.org site an example package that meets those requirements can be found at https://github.com/databricks/spark-avro. My own recently released SAMBA package also meets these requirements: https://github.com/onetapbeyond/lambda-spark-executor. As you can see there is nothing in this list of requirements that demands the implementation of specific interfaces. What you'll need to implement will depend entirely on what you want to accomplish. If you want to register a release for your package you will also need to push the artifacts for your package to Maven central. David On Mon, Feb 1, 2016 at 7:03 AM, Praveen Devarao wrote: > Hi, > > Is there any guidelines or specs to write a Spark package? I would > like to implement a spark package and would like to know the way it needs to > be structured (implement some interfaces etc) so that it can plug into Spark > for extended functionality. > > Could any one help me point to docs or links on the above? > > Thanking You > > Praveen Devarao -- "All that is gold does not glitter, Not all those who wander are lost." - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Guidelines for writing SPARK packages
Hi, Is there any guidelines or specs to write a Spark package? I would like to implement a spark package and would like to know the way it needs to be structured (implement some interfaces etc) so that it can plug into Spark for extended functionality. Could any one help me point to docs or links on the above? Thanking You Praveen Devarao