RE: How to set Spark executor memory?
Hi Xi Shen, You could set the spark.executor.memory in the code itself . new SparkConf()..set(spark.executor.memory, 2g) Or you can try the -- spark.executor.memory 2g while submitting the jar. Regards Jishnu Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, March 16, 2015 2:06 PM To: Xi Shen Cc: user@spark.apache.org Subject: Re: How to set Spark executor memory? By default spark.executor.memory is set to 512m, I'm assuming since you are submiting the job using spark-submit and it is not able to override the value since you are running in local mode. Can you try it without using spark-submit as a standalone project? Thanks Best Regards On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen davidshe...@gmail.commailto:davidshe...@gmail.com wrote: I set it in code, not by configuration. I submit my jar file to local. I am working in my developer environment. On Mon, 16 Mar 2015 18:28 Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: How are you setting it? and how are you submitting the job? Thanks Best Regards On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen davidshe...@gmail.commailto:davidshe...@gmail.com wrote: Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
RE: Spark SQL Stackoverflow error
import com.google.gson.{GsonBuilder, JsonParser} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans /** * Examine the collected tweets and trains a model based on them. */ object ExamineAndTrain { val jsonParser = new JsonParser() val gson = new GsonBuilder().setPrettyPrinting().create() def main(args: Array[String]) { val outputModelDir=C:\\outputmode111 val tweetInput=C:\\test val numClusters=10 val numIterations=20 val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster(local[4]).set(spark.executor.memory, 1g) val sc = new SparkContext(conf) val tweets = sc.textFile(tweetInput) val vectors = tweets.map(Utils.featurize).cache() vectors.count() // Calls an action on the RDD to populate the vectors cache. val model = KMeans.train(vectors, numClusters, numIterations) sc.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(outputModelDir) val some_tweets = tweets.take(2) println(Example tweets from the clusters) for (i - 0 until numClusters) { println(s\nCLUSTER $i:) some_tweets.foreach { t = if (model.predict(Utils.featurize(t)) == i) { println(t) } } } } } From: lovelylavs [via Apache Spark User List] [mailto:ml-node+s1001560n21956...@n3.nabble.com] Sent: Sunday, March 08, 2015 2:34 AM To: Jishnu Menath Prathap (WT01 - BAS) Subject: Re: Spark SQL Stackoverflow error Thank you so much for your reply. If it is possible can you please provide me with the code? Thank you so much. Lavanya. From: Jishnu Prathap [via Apache Spark User List] ml-node+[hidden email]/user/SendEmail.jtp?type=nodenode=21956i=0 Sent: Sunday, March 1, 2015 3:03 AM To: Nadikuda, Lavanya Subject: RE: Spark SQL Stackoverflow error Hi The Issue was not fixed . I removed the between sql layer and directly created features from the file. Regards Jishnu Prathap From: lovelylavs [via Apache Spark User List] [mailto:ml-node+[hidden email]/user/SendEmail.jtp?type=nodenode=21863i=0] Sent: Sunday, March 01, 2015 4:44 AM To: Jishnu Menath Prathap (WT01 - BAS) Subject: Re: Spark SQL Stackoverflow error Hi, how was this issue fixed? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Stackoverflow-error-tp12086p21862.html To unsubscribe from Spark SQL Stackoverflow error, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Stackoverflow-error-tp12086p21863.html To unsubscribe from Spark SQL Stackoverflow error, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Stackoverflow-error-tp12086p21956.html To unsubscribe from Spark SQL Stackoverflow error, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=12086code=amlzaG51LnByYXRoYXBAd2lwcm8uY29tfDEyMDg2fC0xNzUwOTc3MjE3.
RE: Error KafkaStream
Hi, If your message is string you will have to Change Encoder and Decoder to StringEncoder , StringDecoder. If your message Is byte[] you can use DefaultEncoder Decoder. Also Don’t forget to add import statements depending on ur encoder and decoder. import kafka.serializer.StringEncoder; import kafka.serializer. StringDecoder; Regards Jishnu Prathap -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Friday, February 06, 2015 6:41 AM To: Eduardo Costa Alfaia; Sean Owen Cc: user@spark.apache.org Subject: RE: Error KafkaStream Hi, I think you should change the `DefaultDecoder` of your type parameter into `StringDecoder`, seems you want to decode the message into String. `DefaultDecoder` is to return Array[Byte], not String, so here class casting will meet error. Thanks Jerry -Original Message- From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] Sent: Friday, February 6, 2015 12:04 AM To: Sean Owen Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Error KafkaStream I don’t think so Sean. On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue? On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia e.costaalf...@unibs.itmailto:e.costaalf...@unibs.it wrote: Hi Guys, I’m getting this error in KafkaWordCount; TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): java.lang.ClassCastException: [B cannot be cast to java.lang.String at org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu n$apply$1.apply(KafkaWordCount.scala:7 Some idea that could be? Bellow the piece of code val kafkaStream = { val kafkaParams = Map[String, String]( zookeeper.connect - achab3:2181, group.id - mygroup, zookeeper.connect.timeout.ms - 1, kafka.fetch.message.max.bytes - 400, auto.offset.reset - largest) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap //val lines = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2) val KafkaDStreams = (1 to numStreams).map {_ = KafkaUtils.createStream[String, String, DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = StorageLevel.MEMORY_ONLY_SER).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) unifiedStream.repartition(sparkProcessingParallelism) } Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
RE: How to integrate Spark with OpenCV?
Hi Akhil Thanks for the response Our use case is Object detection in multiple videos. It’s kind of searching an image if present in the video by matching the image with all the frames of the video. I am able to do it in normal java code using OpenCV lib now but I don’t think it is scalable to an extend we could implement it for thousands of large sized videos. So I thought we could leverage distributed computing and performance of spark If possible. I could see Jaonary Rabarisoahttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=340 has tried to use OpenCV with spark http://apache-spark-user-list.1001560.n3.nabble.com/Getting-started-using-spark-for-computer-vision-and-video-analytics-td1551.html. But I don’t have any code reference on how to do it with OpenCV. In case any Image+Video processing library works better with Spark plz let me know. Any help would be really appreciated. . Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, January 14, 2015 12:35 PM To: Jishnu Menath Prathap (WT01 - BAS) Cc: user@spark.apache.org Subject: Re: How to integrate Spark with OpenCV? I ddn't played with OpenCV yet, but i was just wondering about your use-case. What exactly are you trying to do? Thanks Best Regards Jishnu Prathap jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi, Can somone suggest any Video+image processing library which works well with spark. Currently i am trying to integrate OpenCV with Spark. I am relatively new to both spark and OpenCV It would really help me if someone could share some sample code how to use Mat ,IplImage and spark rdd 's together .Any help would be really appreciated. Thanks in Advance!! View this message in context: How to integrate Spark with OpenCV?http://apache-spark-user-list.1001560.n3.nabble.com/How-to-integrate-Spark-with-OpenCV-tp21133.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Stack overflow Error while executing spark SQL
Hi I am getting Stack overflow Error Exception in main java.lang.stackoverflowerror scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) while executing the following code sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) The complete code is from github https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/ExamineAndTrain.scala import com.google.gson.{GsonBuilder, JsonParser} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans /** * Examine the collected tweets and trains a model based on them. */ object ExamineAndTrain { val jsonParser = new JsonParser() val gson = new GsonBuilder().setPrettyPrinting().create() def main(args: Array[String]) { // Process program arguments and set properties /*if (args.length 3) { System.err.println(Usage: + this.getClass.getSimpleName + tweetInput outputModelDir numClusters numIterations) System.exit(1) } * */ val outputModelDir=C:\\MLModel val tweetInput=C:\\MLInput val numClusters=10 val numIterations=20 //val Array(tweetInput, outputModelDir, Utils.IntParam(numClusters), Utils.IntParam(numIterations)) = args val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // Pretty print some of the tweets. val tweets = sc.textFile(tweetInput) println(Sample JSON Tweets---) for (tweet - tweets.take(5)) { println(gson.toJson(jsonParser.parse(tweet))) } val tweetTable = sqlContext.jsonFile(tweetInput).cache() tweetTable.registerTempTable(tweetTable) println(--Tweet table Schema---) tweetTable.printSchema() println(Sample Tweet Text-) sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) println(--Sample Lang, Name, text---) sqlContext.sql(SELECT user.lang, user.name, text FROM tweetTable LIMIT 1000).collect().foreach(println) println(--Total count by languages Lang, count(*)---) sqlContext.sql(SELECT user.lang, COUNT(*) as cnt FROM tweetTable GROUP BY user.lang ORDER BY cnt DESC LIMIT 25).collect.foreach(println) println(--- Training the model and persist it) val texts = sqlContext.sql(SELECT text from tweetTable).map(_.head.toString) // Cache the vectors RDD since it will be used for all the KMeans iterations. val vectors = texts.map(Utils.featurize).cache() vectors.count() // Calls an action on the RDD to populate the vectors cache. val model = KMeans.train(vectors, numClusters, numIterations) sc.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(outputModelDir) val some_tweets = texts.take(100) println(Example tweets from the clusters) for (i - 0 until numClusters) { println(s\nCLUSTER $i:) some_tweets.foreach { t = if (model.predict(Utils.featurize(t)) == i) { println(t) } } } } } Thanks Regards Jishnu Menath Prathap
Stack overflow Error while executing spark SQL
Hi I am getting Stack overflow Error Exception in main java.lang.stackoverflowerror scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) while executing the following code sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) The complete code is from github https://github.com/databricks/reference-apps/blob/master/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/ExamineAndTrain.scala import com.google.gson.{GsonBuilder, JsonParser} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans /** * Examine the collected tweets and trains a model based on them. */ object ExamineAndTrain { val jsonParser = new JsonParser() val gson = new GsonBuilder().setPrettyPrinting().create() def main(args: Array[String]) { // Process program arguments and set properties /*if (args.length 3) { System.err.println(Usage: + this.getClass.getSimpleName + tweetInput outputModelDir numClusters numIterations) System.exit(1) } * */ val outputModelDir=C:\\MLModel val tweetInput=C:\\MLInput val numClusters=10 val numIterations=20 //val Array(tweetInput, outputModelDir, Utils.IntParam(numClusters), Utils.IntParam(numIterations)) = args val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // Pretty print some of the tweets. val tweets = sc.textFile(tweetInput) println(Sample JSON Tweets---) for (tweet - tweets.take(5)) { println(gson.toJson(jsonParser.parse(tweet))) } val tweetTable = sqlContext.jsonFile(tweetInput).cache() tweetTable.registerTempTable(tweetTable) println(--Tweet table Schema---) tweetTable.printSchema() println(Sample Tweet Text-) sqlContext.sql(SELECT text FROM tweetTable LIMIT 10).collect().foreach(println) println(--Sample Lang, Name, text---) sqlContext.sql(SELECT user.lang, user.name, text FROM tweetTable LIMIT 1000).collect().foreach(println) println(--Total count by languages Lang, count(*)---) sqlContext.sql(SELECT user.lang, COUNT(*) as cnt FROM tweetTable GROUP BY user.lang ORDER BY cnt DESC LIMIT 25).collect.foreach(println) println(--- Training the model and persist it) val texts = sqlContext.sql(SELECT text from tweetTable).map(_.head.toString) // Cache the vectors RDD since it will be used for all the KMeans iterations. val vectors = texts.map(Utils.featurize).cache() vectors.count() // Calls an action on the RDD to populate the vectors cache. val model = KMeans.train(vectors, numClusters, numIterations) sc.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(outputModelDir) val some_tweets = texts.take(100) println(Example tweets from the clusters) for (i - 0 until numClusters) { println(s\nCLUSTER $i:) some_tweets.foreach { t = if (model.predict(Utils.featurize(t)) == i) { println(t) } } } } } Thanks Regards Jishnu Menath Prathap
RE: Persist streams to text files
Hi Thank you ☺Akhil it worked like charm….. I used the file writer outside rdd.foreach that might be the reason for nonserialisable exception…. Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 1:15 PM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.org Subject: Re: Persist streams to text files Here's a quick version to store (append) in your local machine val tweets = TwitterUtils.createStream(ssc, None) val hashTags = tweets.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.foreachRDD(rdds = { rdds.foreach(rdd = { val fw = new FileWriter(/home/akhld/tags.txt, true) println(HashTag = + rdd) fw.write(rdd + \n) fw.close() }) }) Thanks Best Regards On Fri, Nov 21, 2014 at 12:12 PM, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. val stream = TwitterUtils.createStream(ssc, None) //, filters) val statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) val line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { if(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//Now I print in console but I need to update it to a file in local machine }) }) Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 11:48 AM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. Originally Posted by GaganBM Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged
RE: Persist streams to text files
Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. val stream = TwitterUtils.createStream(ssc, None) //, filters) val statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) val line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { if(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//Now I print in console but I need to update it to a file in local machine }) }) Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 11:48 AM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.org Subject: Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. Originally Posted by GaganBM Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
basic twitter stream program not working.
Hi I am trying to run a basic twitter stream program but getting blank output. Please correct me if I am missing something. import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.Seconds import org.apache.log4j.LogManager import org.apache.log4j.Level object Sparktwiter1 { def main(args: Array[String]) { LogManager.getRootLogger().setLevel(Level.ERROR); System.setProperty(http.proxyHost, proxy4.wipro.com); System.setProperty(http.proxyPort, 8080); System.setProperty(twitter4j.oauth.consumerKey, ) System.setProperty(twitter4j.oauth.consumerSecret, ) System.setProperty(twitter4j.oauth.accessToken, ) System.setProperty(twitter4j.oauth.accessTokenSecret, ) val sparkConf = new SparkConf().setAppName(TwitterPopularTags).setMaster(local).set(spark.eventLog.enabled, true) val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None)//, filters) stream.print val s1 = stream.flatMap(status = status.getText) s1.print val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.print ssc.start() ssc.awaitTermination() } } Output --- Time: 1415869348000 ms --- --- Time: 1415869348000 ms --- --- Time: 1415869348000 ms --- [cid:image005.jpg@01CFFF52.453A17F0] [cid:image006.jpg@01CFFF52.453A17F0] The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
runexample TwitterPopularTags showing Class Not found error
Hi I am getting the following error while running the TwitterPopularTags example .I am using spark-1.1.0-bin-hadoop2.4 . jishnu@getafix:~/spark/bin$ run-example TwitterPopularTags *** ** ** *** ** spark assembly has been built with Hive, including Datanucleus jars on classpath java.lang.ClassNotFoundException: org.apache.spark.examples.org.apache.spark.streaming.examples.TwitterPopularTags at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:318) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) tried executing in three different machines but all showed the same error.I am able to run other examples like SparkPi . Thanks Regards Jishnu Menath Prathap BAS EBI(Open Source) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
RE: basic twitter stream program not working.
Hi Thanks Akhil you saved the day…. Its working perfectly … Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Thursday, November 13, 2014 3:25 PM To: Jishnu Menath Prathap (WT01 - BAS) Cc: Akhil [via Apache Spark User List]; user@spark.apache.org Subject: Re: basic twitter stream program not working. Change this line val sparkConf = new SparkConf().setAppName(TwitterPopularTags).setMaster(local).set(spark.eventLog.enabled,true) to val sparkConf = new SparkConf().setAppName(TwitterPopularTags).setMaster(local[4]).set(spark.eventLog.enabled,true) Thanks Best Regards On Thu, Nov 13, 2014 at 2:58 PM, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am trying to run a basic twitter stream program but getting blank output. Please correct me if I am missing something. import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.Seconds import org.apache.log4j.LogManager import org.apache.log4j.Level object Sparktwiter1 { def main(args: Array[String]) { LogManager.getRootLogger().setLevel(Level.ERROR); System.setProperty(http.proxyHost, proxy4.wipro.comhttp://proxy4.wipro.com); System.setProperty(http.proxyPort, 8080); System.setProperty(twitter4j.oauth.consumerKey, ) System.setProperty(twitter4j.oauth.consumerSecret, ) System.setProperty(twitter4j.oauth.accessToken, ) System.setProperty(twitter4j.oauth.accessTokenSecret, ) val sparkConf = new SparkConf().setAppName(TwitterPopularTags).setMaster(local).set(spark.eventLog.enabled, true) val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None)//, filters) stream.print val s1 = stream.flatMap(status = status.getText) s1.print val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.print ssc.start() ssc.awaitTermination() } } Output --- Time: 1415869348000 ms --- --- Time: 1415869348000 ms --- --- Time: 1415869348000 ms --- [cid:image001.jpg@01CFFF64.0FD789F0] [cid:image002.jpg@01CFFF64.0FD789F0] The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: java.lang.NumberFormatException while starting spark-worker
Hi , I am getting this weird error while starting Worker. -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
RE: java.lang.NumberFormatException while starting spark-worker
Hi , I am getting this weird error while starting Worker. -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
java.lang.NumberFormatException while starting spark-worker
Hi , I am getting this weird error while starting Worker. -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
java.lang.NumberFormatException while starting spark-worker
Hi , I am getting this weird error while starting Worker. -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: java.lang.NumberFormatException while starting spark-worker
No .. I am not passing any argument. I am getting this error while starting the Master The same spark binary i am able to run in another machine ( ubuntu ) installed. The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
RE: java.lang.NumberFormatException while starting spark-worker
Hi Sorry for the repeated mails .My post was not accepted by the mailing list due to some problem in postmas...@wipro.com I had to manually send it . Still it was not visible for half an hour.I retried. But later all the post was visible. I deleted it from the page but it was already delivered to the mailing list. Sorry for the repeated mails. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, September 24, 2014 6:17 PM To: Jishnu Menath Prathap (WT01 - BAS) Subject: Re: java.lang.NumberFormatException while starting spark-worker Please stop emailing the same message repeatedly every half hour. On Wed, Sep 24, 2014 at 12:21 PM, jishnu.prat...@wipro.com wrote: Hi , I am getting this weird error while starting Worker. -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com