Re: Persist streams to text files
Hi , You can use FileUtil.copemerge API and specify the path to the folder where saveAsTextFile is save the part text file. Suppose your directory is /a/b/c/ use FileUtil.copeMerge(FileSystem of source, a/b/c, FileSystem of destination, Path to the merged file say (a/b/c.txt), true(to delete the original dir,null)) Thanks. On Fri, Nov 21, 2014 at 11:31 AM, Jishnu Prathap [via Apache Spark User List] ml-node+s1001560n19449...@n3.nabble.com wrote: Hi I am also having similar problem.. any fix suggested.. *Originally Posted by GaganBM* Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449p19457.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: beeline via spark thrift doesn't retain cache
1) make sure your beeline client connected to Hiveserver2 of Spark SQL. You can found execution logs of Hiveserver2 in the environment of start-thriftserver.sh. 2) what about your scale of data. If cache with small data, it will take more time to schedule workload between different executors. Look the configuration of spark execution environment. Whether there are enough memory for RDD storage, if not, it will take some time to serialize/deserialize data between memory and disk. 2014-11-21 11:06 GMT+08:00 Judy Nash judyn...@exchange.microsoft.com: Hi friends, I have successfully setup thrift server and execute beeline on top. Beeline can handle select queries just fine, but it cannot seem to do any kind of caching/RDD operations. i.e. 1) Command “cache table” doesn’t work. See error: Error: Error while processing statement: FAILED: ParseException line 1:0 cannot recognize input near 'cache' 'table' 'hivesampletable' (state=42000,code=4) 2) Re-run SQL commands do not have any performance improvements. By comparison, Spark-SQL shell can execute “cache table” command and rerunning SQL command has a huge performance boost. Am I missing something or this is expected when execute through Spark thrift server? Thanks! Judy
RE: Persist streams to text files
Hi Thank you ☺Akhil it worked like charm….. I used the file writer outside rdd.foreach that might be the reason for nonserialisable exception…. Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 1:15 PM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.org Subject: Re: Persist streams to text files Here's a quick version to store (append) in your local machine val tweets = TwitterUtils.createStream(ssc, None) val hashTags = tweets.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.foreachRDD(rdds = { rdds.foreach(rdd = { val fw = new FileWriter(/home/akhld/tags.txt, true) println(HashTag = + rdd) fw.write(rdd + \n) fw.close() }) }) Thanks Best Regards On Fri, Nov 21, 2014 at 12:12 PM, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. val stream = TwitterUtils.createStream(ssc, None) //, filters) val statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) val line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { if(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//Now I print in console but I need to update it to a file in local machine }) }) Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 11:48 AM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. Originally Posted by GaganBM Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged
Re: processing files
Hi Simon, no, I don't need to run the tasks on multiple machines for now. I will therefore stick to Makefile + shell or Java programs as Spark appears not to be the right tool for the tasks I am trying to accomplish. Thanks you for your input. Philippe - Mail original - De: Simon Hafner reactorm...@gmail.com À: Philippe de Rochambeau phi...@free.fr Envoyé: Vendredi 21 Novembre 2014 09:47:25 Objet: Re: processing files 2014-11-21 1:46 GMT-06:00 Philippe de Rochambeau phi...@free.fr: - reads xml files in thousands of directories, two levels down, from year x to year y You could try sc.parallelize(new File(dirWithXML)).flatMap(sc.wholeTextFiles(_)) ... not guaranteed to work. - extracts data from image tags in those files and stores them in a Sql or NoSql database From what I understand, spark expects no side effects from the functions you pass to map(). So that's probably not that good of an idea if you don't want duplicated records. - generates ImageMagick commands based on the extracted data to generate images data transformation, easy. collect() and save. - generates curl commands to index the image files with Solr same as imagemagick. Does Spark provide any tools/features to facilitate and automate (batchify) the above tasks? Sure, but I wouldn't run the commands with spark. They might be run twice or more. I can do all of the above with one or several Java programs, but I wondered if using Spark would be of any use in such an endeavour. Personally, I'd use a Makefile, xmlstarlet for the xml parsing, and store the image paths to plaintext instead of a database, and get parallelization via -j X. You could also run the imagemagick and curl commands from there. But that naturally doesn't scale to multiple machines. Do you have more than one machine available to run this one? Do you need to run it on more than one machine, because it takes too long on just one? That's what spark excels at. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark serialization issues with third-party libraries
You are probably casually sending UIMA objects from the driver to executors in a closure. You'll have to design your program so that you do not need to ship these objects to or from the remote task workers. On Fri, Nov 21, 2014 at 8:39 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am planning to use UIMA library to process data in my RDDs. I have had bad experiences while using third party libraries inside worker tasks. The system gets plagued with Serialization issues. But as UIMA classes are not necessarily Serializable, I am not sure if it will work. Please explain which classes need to be Serializable and which of them can be left as it is? A clear understanding will help me a lot. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-serialization-issues-with-third-party-libraries-tp19454.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)
Is there any way to get the yarn application_id inside the program? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark code style
Hi all. Here are two code snippets. And they will produce the same result. 1. rdd.map( function ) 2. rdd.map( function1 ).map( function2 ).map( function3 ) What are the pros and cons of these two methods? Regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there a way to turn on spark eventLog on the worker node?
Hi, I'm going to debug some spark applications on our testing platform. And it would be helpful if we can see the eventLog on the *worker *node. I've tried to turn on *spark.eventLog.enabled* and set *spark.eventLog.dir* parameters on the worker node. However, it doesn't work. I do have event logs on my driver node, and I know how to turn it on. However, the same settings doesn't work on the worker node. Can anyone help me to clarify whether event log is only available on driver node? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-turn-on-spark-eventLog-on-the-worker-node-tp19464.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Determine number of running executors
You can get parameter such as spark.executor.memory, but you can not get executor or core numbers. Because executor and core are parameters of spark deploy environment not spark context. val conf = new SparkConf().set(spark.executor.memory,2G) val sc = new SparkContext(conf) sc.getConf.get(spark.executor.memory) conf.get(spark.executor.memory) 2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer t...@preferred.jp: Hi, when running on YARN, is there a way for the Spark driver to know how many executors, cores per executor etc. there are? I want to know this so I can repartition to a good number. Thanks Tobias
short-circuit local reads cannot be used
Hi, Everytime I start the spark-shell I encounter this message: 14/11/18 00:27:43 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Any idea how to overcome it ? the short-circuit feature is a big perfomance boost I don't want to lose.. Thanks, Daniel
Re: spark code style
I suppose that here function(x) = function3(function2(function1(x))) In that case, the difference will be modularity and readability of your program. If function{1,2,3} are logically different steps and potentially reusable somewhere else, I'd keep them separate. A sequence of map transformations will be pipelined by Spark with little overhead. -kr, Gerard. On Fri, Nov 21, 2014 at 10:20 AM, Kevin Jung itsjb.j...@samsung.com wrote: Hi all. Here are two code snippets. And they will produce the same result. 1. rdd.map( function ) 2. rdd.map( function1 ).map( function2 ).map( function3 ) What are the pros and cons of these two methods? Regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)
Finally, I've found two ways: 1. search the output with something like Submitted application application_1416319392519_0115 2. use specific AppName. We could query the ApplicationID(yarn) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p19466.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Another accumulator question
This sounds more like a use case for reduce? or fold? it sounds like you're kind of cobbling together the same function on accumulators, when reduce/fold are simpler and have the behavior you suggest. On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think I understand what is going on here, but I was hoping someone could confirm (or explain reality if I don't) what I'm seeing. We are collecting data using a rather sizable accumulator - essentially, an array of tens of thousands of entries. All told, about 1.3m of data. If I understand things correctly, it looks to me like, when our job is done, a copy of this array is retrieved from each individual task, all at once, for combination on the client - which means, with 400 tasks to the job, each collection is using up half a gig of memory on the client. Is this true? If so, does anyone know a way to get accumulators to accumulate as results collect, rather than all at once at the end, so we only have to hold a few in memory at a time, rather than all 400? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to deal with BigInt in my case class for RDD = SchemaRDD convertion
Hi, I got an error during rdd.registerTempTable(...) saying scala.MatchError: scala.BigInt Looks like BigInt cannot be used in SchemaRDD, is that correct? So what would you recommend to deal with it? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spark Streaming Metrics
Looks like metrics are not a hot topic to discuss - yet so important to sleep well when jobs are running in production. I've created Spark-4537 https://issues.apache.org/jira/browse/SPARK-4537 to track this issue. -kr, Gerard. On Thu, Nov 20, 2014 at 9:25 PM, Gerard Maas gerard.m...@gmail.com wrote: As the Spark Streaming tuning guide indicates, the key indicators of a healthy streaming job are: - Processing Time - Total Delay The Spark UI page for the Streaming job [1] shows these two indicators but the metrics source for Spark Streaming (StreamingSource.scala) [2] does not. Any reasons for that? I would like to monitor job performance through an external monitor (Ganglia in our case) and I've connected already the currently published metrics. -kr, Gerard. [1] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127 [2] https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
Re: Why is ALS class serializable ?
It makes sense. Thx. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark: Simple local test failed depending on memory settings
Dear all, We encountered problems of failed jobs with huge amount of data. A simple local test was prepared for this question at https://gist.github.com/copy-of-rezo/6a137e13a1e4f841e7eb It generates 2 sets of key-value pairs, join them, selects distinct values and counts data finally. object Spill { def generate = { for{ j - 1 to 10 i - 1 to 200 } yield(j, i) } def main(args: Array[String]) { val conf = new SparkConf().setAppName(getClass.getSimpleName) conf.set(spark.shuffle.spill, true) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val sc = new SparkContext(conf) println(generate) val dataA = sc.parallelize(generate) val dataB = sc.parallelize(generate) val dst = dataA.join(dataB).distinct().count() println(dst) } } We compiled it locally and run 3 times with different settings of memory: 1) *--executor-memory 10M --driver-memory 10M --num-executors 1 --executor-cores 1* It fails wtih java.lang.OutOfMemoryError: GC overhead limit exceeded at . org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) 2) *--executor-memory 20M --driver-memory 20M --num-executors 1 --executor-cores 1* It works OK 3) *--executor-memory 10M --driver-memory 10M --num-executors 1 --executor-cores 1* But let's make less data for i from 200 to 100. It reduces input data in 2 times and joined data in 4 times def generate = { for{ j - 1 to 10 i - 1 to 100 // previous value was 200 } yield(j, i) } This code works OK. We don't understand why 10M is not enough for such simple operation with 32000 bytes of ints (2 * 10 * 200 * 2 * 4) approximately? 10M of RAM works if we change the data volume in 2 times (2000 of records of (int, int)). Why spilling to disk doesn't cover this case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Simple-local-test-failed-depending-on-memory-settings-tp19473.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cores on Master
Hi, You can also set the cores in the spark application itself . http://spark.apache.org/docs/1.0.1/spark-standalone.html On Wed, Nov 19, 2014 at 6:11 AM, Pat Ferrel-2 [via Apache Spark User List] ml-node+s1001560n19238...@n3.nabble.com wrote: OK hacking the start-slave.sh did it On Nov 18, 2014, at 4:12 PM, Pat Ferrel [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=0 wrote: This seems to work only on a ‘worker’ not the master? So I’m back to having no way to control cores on the master? On Nov 18, 2014, at 3:24 PM, Pat Ferrel [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=1 wrote: Looks like I can do this by not using start-all.sh but starting each worker separately passing in a '--cores n' to the master? No config/env way? On Nov 18, 2014, at 3:14 PM, Pat Ferrel [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=2 wrote: I see the default and max cores settings but these seem to control total cores per cluster. My cobbled together home cluster needs the Master to not use all its cores or it may lock up (it does other things). Is there a way to control max cores used for a particular cluster machine in standalone mode? - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=3 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=4 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=5 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=6 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=7 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=8 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=9 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19238i=10 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19238.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19475.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Slow performance in spark streaming
Hi, Spark runs in local with a speed less than in cluster. Cluster machines usually have a high configuration and also the tasks are distrubuted in workers in order to get a faster result. So you will always find a difference in speed when running in local and when running in cluster. Try running the same in a cluster and evaluate the speed there. Thanks On Thu, Nov 20, 2014 at 6:52 PM, Blackeye [via Apache Spark User List] ml-node+s1001560n1937...@n3.nabble.com wrote: I am using spark streaming 1.1.0 locally (not in a cluster). I created a simple app that parses the data (about 10.000 entries), stores it in a stream and then makes some transformations on it. Here is the code: *def main(args : Array[String]){ val master = local[8] val conf = new SparkConf().setAppName(Tester).setMaster(master) val sc = new StreamingContext(conf, Milliseconds(11)) val stream = sc.receiverStream(new MyReceiver(localhost, )) val parsedStream = parse(stream) parsedStream.foreachRDD(rdd = println(rdd.first()+\nRULE STARTS +System.currentTimeMillis())) val result1 = parsedStream.filter(entry = entry.symbol.contains(walking) entry.symbol.contains(true) entry.symbol.contains(id0)).map(_.time) val result2 = parsedStream.filter(entry = entry.symbol == disappear entry.symbol.contains(id0)).map(_.time) val result3 = result1 .transformWith(result2, (rdd1, rdd2: RDD[Int]) = rdd1.subtract(rdd2)) result3.foreachRDD(rdd = println(rdd.first()+\nRULE ENDS +System.currentTimeMillis())) sc.start()sc.awaitTermination() } def parse(stream: DStream[String]) = { stream.flatMap { line = val entries = line.split(assert).filter(entry = !entry.isEmpty) entries.map { tuple = val pattern = \s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*.r tuple match { case pattern(symbol, time) = new Data(symbol, time.toInt) } } } } case class Data (symbol: String, time: Int)* I have a batch duration of 110.000 milliseconds in order to receive all the data in one batch. I believed that, even locally, the spark is very fast. In this case, it takes about 3.5sec to execute the rule (between RULE STARTS and RULE ENDS). Am I doing something wrong or this is the expected time? Any advise -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371p19476.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Parsing a large XML file using Spark
Hi, Parallel processing of xml files may be an issue due to the tags in the xml file. The xml file has to be intact as while parsing it matches the start and end entity and if its distributed in parts to workers possibly it may or may not find start and end tags within the same worker which will give an exception. Thanks. On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] ml-node+s1001560n19239...@n3.nabble.com wrote: If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump that all revision information also) that is stored in HDFS, is it possible to parse it in parallel/faster using Spark? Or do we have to use something like a PullParser or Iteratee? My current solution is to read the single XML file in the first pass - write it to HDFS and then read the small files in parallel on the Spark workers. Thanks -Soumya -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How can I read this avro file using spark scala?
Thanks for the pointer Michael. I've downloaded spark 1.2.0 from https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and built the spark-avro repo you linked to. When I run it against the example avro file linked to in the documentation it works. However, when I try to load my avro file (linked to in my original question) I receive the following error: java.lang.RuntimeException: Unsupported type LONG at scala.sys.package$.error(package.scala:27) at com.databricks.spark.avro.AvroRelation.com $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ... If this is useful I'm happy to try loading the various different avro files I have to try to battle-test spark-avro. Thanks On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust mich...@databricks.com wrote: One option (starting with Spark 1.2, which is currently in preview) is to use the Avro library for Spark SQL. This is very new, but we would love to get feedback: https://github.com/databricks/spark-avro On Thu, Nov 20, 2014 at 10:19 AM, al b beanb...@googlemail.com wrote: I've read several posts of people struggling to read avro in spark. The examples I've tried don't work. When I try this solution ( https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files) I get errors: spark java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper How can I read the following sample file in spark using scala? http://www.4shared.com/file/SxnYcdgJce/sample.html Thomas
Re: How can I read this avro file using spark scala?
I've been able to load a different avro file based on GenericRecord with: val person = sqlContext.avroFile(/tmp/person.avro) When I try to call `first()` on it, I get NotSerializableException exceptions again: person.first() ... 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 20) java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ... Apart from this I want to transform the records into pairs of (user_id, record). I can do this by specifying the offset of the user_id column with something like this: person.map(r = (r.getInt(2), r)).take(4).collect() Is there any way to be able to specify the column name (user_id) instead of needing to know/calculate the offset somehow? Thanks again On Fri, Nov 21, 2014 at 11:48 AM, thomas j beanb...@googlemail.com wrote: Thanks for the pointer Michael. I've downloaded spark 1.2.0 from https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and built the spark-avro repo you linked to. When I run it against the example avro file linked to in the documentation it works. However, when I try to load my avro file (linked to in my original question) I receive the following error: java.lang.RuntimeException: Unsupported type LONG at scala.sys.package$.error(package.scala:27) at com.databricks.spark.avro.AvroRelation.com $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ... If this is useful I'm happy to try loading the various different avro files I have to try to battle-test spark-avro. Thanks On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust mich...@databricks.com wrote: One option (starting with Spark 1.2, which is currently in preview) is to use the Avro library for Spark SQL. This is very new, but we would love to get feedback: https://github.com/databricks/spark-avro On Thu, Nov 20, 2014 at 10:19 AM, al b beanb...@googlemail.com wrote: I've read several posts of people struggling to read avro in spark. The examples I've tried don't work. When I try this solution ( https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files) I get errors: spark java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper How can I read the following sample file in spark using scala? http://www.4shared.com/file/SxnYcdgJce/sample.html Thomas
RE: tableau spark sql cassandra
Hi! Sure, I'll post the info I grabbed once the cassandra tables values appear in Tableau. Best, Jerome -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setup Remote HDFS for Spark
Hi, Are there any way that I can setup a remote HDFS for Spark (more specific, for Spark Streaming checkpoints)? The reason I'm asking is that our Spark and HDFS do not run on the same machines. I've been looked around but still no clue so far. Thanks, EH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Execute Spark programs from local machine on Yarn-hadoop cluster
Hi naveen, I dont think this is possible. If you are setting the master with your cluster details you cannot execute any job from your local machine. You have to execute the jobs inside your yarn machine so that sparkconf is able to connect with all the provided details. If this is not the case such give a detail explaintation of what exactly you are trying to do :) Thanks. On Fri, Nov 21, 2014 at 8:11 PM, Naveen Kumar Pokala [via Apache Spark User List] ml-node+s1001560n19482...@n3.nabble.com wrote: Hi, I am executing my spark jobs on yarn cluster by forming conf object in the following way. SparkConf conf = *new* SparkConf().setAppName(NewJob).setMaster( yarn-cluster); Now I want to execute spark jobs from my local machine how to do that. What I mean is there a way to give IP address, port all the details to connect a master(YARN) on some other network from my local spark Program. -Naveen -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482p19484.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Setup Remote HDFS for Spark
Unfortunately whether it is possible to have both Spark and HDFS running on the same machine is not under our control. :( Right now we have Spark and HDFS running in different machines. In this case, is it still possible to hook up a remote HDFS with Spark so that we can use Spark Streaming checkpoints? Thank you for your help. Best, EH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481p19485.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD data checkpoint cleaning
I have seen the same behaviour while testing the latest spark 1.2.0 snapshot. I'm trying the ReliableKafkaReceiver and it works quite well but the checkpoints folder is always increasing in size. The receivedMetaData folder remains almost constant in size but the receivedData folder is always increasing in size even if I set spark.cleaner.ttl to 300 seconds. Regards, Luis 2014-09-23 22:47 GMT+01:00 RodrigoB rodrigo.boav...@aspect.com: Just a follow-up. Just to make sure about the RDDs not being cleaned up, I just replayed the app both on the windows remote laptop and then on the linux machine and at the same time was observing the RDD folders in HDFS. Confirming the observed behavior: running on the laptop I could see the RDDs continuously increasing. When I ran on linux, only two RDD folders were there and continuously being recycled. Metadata checkpoints were being cleaned on both scenarios. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion
Hello Jianshi, The reason of that error is that we do not have a Spark SQL data type for Scala BigInt. You can use Decimal for your case. Thanks, Yin On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got an error during rdd.registerTempTable(...) saying scala.MatchError: scala.BigInt Looks like BigInt cannot be used in SchemaRDD, is that correct? So what would you recommend to deal with it? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Lots of small input files
I have a job that searches for input recursively and creates a string of pathnames to treat as one input. The files are part-x files and they are fairly small. The job seems to take a long time to complete considering the size of the total data (150m) and only runs on the master machine. The job only does rdd.map type operations. 1) Why doesn’t it use the other workers in the cluster? 2) Is there a downside to using a lot of small part files? Should I coalesce them into one input file? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How can I read this avro file using spark scala?
I have also been struggling with reading avro. Very glad to hear that there is a new avro library coming in Spark 1.2 (which by the way, seems to have a lot of other very useful improvements). In the meanwhile, I have been able to piece together several snippets/tips that I found from various sources and I am now able to read/write avro correctly. From my understanding, you basically need 3 pieces: 1. Use the kryo serializer. 2. Register your avro classes. I have done this using twitter chill 0.4.0. 3. Read/write avro with a snippet of code like the one you posted. Here is relevant code (hopefully all of it). // All of the following are needed in order to read/write AVRO files import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.fs.{ FileSystem, Path } // Uncomment the following line if you want to use generic AVRO, I am using specific //import org.apache.avro.generic.GenericData import org.apache.avro.Schema import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat } import org.apache.avro.mapred.AvroKey // Kryo/avro serialization stuff import com.esotericsoftware.kryo.Kryo import com.twitter.chill.avro.AvroSerializer import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator } object MyApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName(MyApp).setMaster(local[*]) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, MyRegistrator) } // Read val readJob = new Job() AvroJob.setInputKeySchema(readJob, schema) sc.newAPIHadoopFile(inputPath, classOf[AvroKeyInputFormat[MyAvroClass]], classOf[AvroKey[MyAvroClass]], classOf[NullWritable], readJob.getConfiguration) .map { t = t._1.datum } // Write val rddAvroWritable = rdd.map { s = (new AvroKey(s), NullWritable.get) } val writeJob = new Job() AvroJob.setOutputKeySchema(writeJob, schema) writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]]) rddAvroWritable.saveAsNewAPIHadoopFile(outputPath, classOf[AvroKey[MyAvroClass]], classOf[NullWritable], classOf[AvroKeyOutputFormat[MyAvroClass]], writeJob.getConfiguration) } } class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { // Put a line like the following for each of your Avro classes if you use specific Avro // If you use generic Avro, chill also has a function for that: GenericRecordSerializer kryo.register(classOf[MyAvroClass], AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass]) } } Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Fri, Nov 21, 2014 at 7:04 AM, thomas j beanb...@googlemail.com wrote: I've been able to load a different avro file based on GenericRecord with: val person = sqlContext.avroFile(/tmp/person.avro) When I try to call `first()` on it, I get NotSerializableException exceptions again: person.first() ... 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 20) java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ... Apart from this I want to transform the records into pairs of (user_id, record). I can do this by specifying the offset of the user_id column with something like this: person.map(r = (r.getInt(2), r)).take(4).collect() Is there any way to be able to specify the column name (user_id) instead of needing to know/calculate the offset somehow? Thanks again On Fri, Nov 21, 2014 at 11:48 AM, thomas j beanb...@googlemail.com wrote: Thanks for the pointer Michael. I've downloaded spark 1.2.0 from https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and built the spark-avro repo you linked to. When I run it against the example avro file linked to in the documentation it works. However, when I try to load my avro file (linked to in my original question) I receive the following error: java.lang.RuntimeException: Unsupported type LONG at scala.sys.package$.error(package.scala:27) at com.databricks.spark.avro.AvroRelation.com $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96) at
Re: Nightly releases
Great - what can we do to make this happen? So should I file a JIRA to track? Thanks, Arun On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash and...@andrewash.com wrote: I can see this being valuable for users wanting to live on the cutting edge without building CI infrastructure themselves, myself included. I think Patrick's recent work on the build scripts for 1.2.0 will make delivering nightly builds to a public maven repo easier. On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja aahuj...@gmail.com wrote: Of course we can run this as well to get the lastest, but the build is fairly long and this seems like a resource many would need. On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com wrote: Are nightly releases posted anywhere? There are quite a few vital bugfixes and performance improvements being commited to Spark and using the latest commits is useful (or even necessary for some jobs). Is there a place to post them, it doesn't seem like it would diffcult to run make-dist nightly and place it somwhere? Is is possible extract this from Jenkins bulds? Thanks, Arun
Re: Another accumulator question
We've done this with reduce - that definitely works. I've reworked the logic to use accumulators because, when it works, it's 5-10x faster On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote: This sounds more like a use case for reduce? or fold? it sounds like you're kind of cobbling together the same function on accumulators, when reduce/fold are simpler and have the behavior you suggest. On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think I understand what is going on here, but I was hoping someone could confirm (or explain reality if I don't) what I'm seeing. We are collecting data using a rather sizable accumulator - essentially, an array of tens of thousands of entries. All told, about 1.3m of data. If I understand things correctly, it looks to me like, when our job is done, a copy of this array is retrieved from each individual task, all at once, for combination on the client - which means, with 400 tasks to the job, each collection is using up half a gig of memory on the client. Is this true? If so, does anyone know a way to get accumulators to accumulate as results collect, rather than all at once at the end, so we only have to hold a few in memory at a time, rather than all 400? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Many retries for Python job
I¹m running a Python script with spark-submit on top of YARN on an EMR cluster with 30 nodes. The script reads in approximately 3.9 TB of data from S3, and then does some transformations and filtering, followed by some aggregate counts. During Stage 2 of the job, everything looks to complete just fine with no executor failures or resubmissions, but when Stage 3 starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors. Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can successfully start. The whole application eventually completes, but there is an addition of about 1+ hour overhead for all of the retries. I¹m trying to determine why there were FetchFailure exceptions, since anything computed in the job that could not fit in the available memory cache should be by default spilled to disk for further retrieval. I also see some java.net.ConnectException: Connection refused² and java.io.IOException: sendMessageReliably failed without being ACK¹d errors in the logs after a CancelledKeyException followed by a ClosedChannelException, but I have no idea why the nodes in the EMR cluster would suddenly stop being able to communicate. If anyone has ideas as to why the data needs to be rerun several times in this job, please let me know as I am fairly bewildered about this behavior. smime.p7s Description: S/MIME cryptographic signature
Re: Nightly releases
Great - posted here https://issues.apache.org/jira/browse/SPARK-4542 On Fri, Nov 21, 2014 at 1:03 PM, Andrew Ash and...@andrewash.com wrote: Yes you should file a Jira and echo it out here so others can follow and comment on it. Thanks Arun! On Fri, Nov 21, 2014 at 12:02 PM, Arun Ahuja aahuj...@gmail.com wrote: Great - what can we do to make this happen? So should I file a JIRA to track? Thanks, Arun On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash and...@andrewash.com wrote: I can see this being valuable for users wanting to live on the cutting edge without building CI infrastructure themselves, myself included. I think Patrick's recent work on the build scripts for 1.2.0 will make delivering nightly builds to a public maven repo easier. On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja aahuj...@gmail.com wrote: Of course we can run this as well to get the lastest, but the build is fairly long and this seems like a resource many would need. On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja aahuj...@gmail.com wrote: Are nightly releases posted anywhere? There are quite a few vital bugfixes and performance improvements being commited to Spark and using the latest commits is useful (or even necessary for some jobs). Is there a place to post them, it doesn't seem like it would diffcult to run make-dist nightly and place it somwhere? Is is possible extract this from Jenkins bulds? Thanks, Arun
SparkSQL Timestamp query failure
Hi all, I put some log files into sql tables through Spark and my schema looks like this: |-- timestamp: timestamp (nullable = true) |-- c_ip: string (nullable = true) |-- cs_username: string (nullable = true) |-- s_ip: string (nullable = true) |-- s_port: string (nullable = true) |-- cs_method: string (nullable = true) |-- cs_uri_stem: string (nullable = true) |-- cs_query: string (nullable = true) |-- sc_status: integer (nullable = false) |-- sc_bytes: integer (nullable = false) |-- cs_bytes: integer (nullable = false) |-- time_taken: integer (nullable = false) |-- User_Agent: string (nullable = true) |-- Referrer: string (nullable = true) As you can notice I created a timestamp field which I read is supported by Spark (Date wouldn't work as far as I understood). I would love to use for queries like where timestamp(2012-10-08 16:10:36.0) but when I run it I keep getting errors. I tried these 2 following sintax forms: For the second one I parse a string so Im sure Im actually pass it in a timestamp format. I use 2 functions: /parse/ and /date2timestamp/. *Any hint on how I should handle timestamp values?* Thanks, Alessandro 1) scala sqlContext.sql(SELECT * FROM Logs as l where l.timestamp=(2012-10-08 16:10:36.0)).collect java.lang.RuntimeException: [1.55] failure: ``)'' expected but 16 found SELECT * FROM Logs as l where l.timestamp=(2012-10-08 16:10:36.0) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) at $iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC.init(console:30) at $iwC.init(console:32) at init(console:34) at .init(console:38) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 2) sqlContext.sql(SELECT * FROM Logs as l where l.timestamp=+date2timestamp(formatTime3.parse(2012-10-08 16:10:36.0))).collect java.lang.RuntimeException: [1.54] failure: ``UNION'' expected but 16 found SELECT * FROM Logs as l where l.timestamp=2012-10-08 16:10:36.0 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60) at
Re: Many retries for Python job
Hi Brett, Are you noticing executors dying? Are you able to check the YARN NodeManager logs and see whether YARN is killing them for exceeding memory limits? -Sandy On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer brett.me...@crowdstrike.com wrote: I’m running a Python script with spark-submit on top of YARN on an EMR cluster with 30 nodes. The script reads in approximately 3.9 TB of data from S3, and then does some transformations and filtering, followed by some aggregate counts. During Stage 2 of the job, everything looks to complete just fine with no executor failures or resubmissions, but when Stage 3 starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors. Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can successfully start. The whole application eventually completes, but there is an addition of about 1+ hour overhead for all of the retries. I’m trying to determine why there were FetchFailure exceptions, since anything computed in the job that could not fit in the available memory cache should be by default spilled to disk for further retrieval. I also see some java.net.ConnectException: Connection refused” and java.io.IOException: sendMessageReliably failed without being ACK’d errors in the logs after a CancelledKeyException followed by a ClosedChannelException, but I have no idea why the nodes in the EMR cluster would suddenly stop being able to communicate. If anyone has ideas as to why the data needs to be rerun several times in this job, please let me know as I am fairly bewildered about this behavior.
Re: Parsing a large XML file using Spark
Unfortunately, unless you impose restrictions on the XML file (e.g., where namespaces are declared, whether entity replacement is used, etc.), you really can't parse only a piece of it even if you have start/end elements grouped together. If you want to deal effectively (and scalably) with large XML files consisting of many records, the right thing to do is to write them as one XML document per line just like the one JSON document per line, at which point the data can be split effectively. Something like Woodstox and a little custom code should make an effective pre-processor. Once you have the line-delimited XML, you can shred it however you want: JAXB, Jackson XML, etc. — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Fri, Nov 21, 2014 at 3:38 AM, Prannoy pran...@sigmoidanalytics.com wrote: Hi, Parallel processing of xml files may be an issue due to the tags in the xml file. The xml file has to be intact as while parsing it matches the start and end entity and if its distributed in parts to workers possibly it may or may not find start and end tags within the same worker which will give an exception. Thanks. On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=19477i=0 wrote: If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump that all revision information also) that is stored in HDFS, is it possible to parse it in parallel/faster using Spark? Or do we have to use something like a PullParser or Iteratee? My current solution is to read the single XML file in the first pass - write it to HDFS and then read the small files in parallel on the Spark workers. Thanks -Soumya -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=19477i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Parsing a large XML file using Spark http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Determine number of running executors
Hi Tobias, One way to find out the number of executors is through SparkContext#getExecutorMemoryStatus. You can find out the number of by asking the SparkConf for the spark.executor.cores property, which, if not set, means 1 for YARN. -Sandy On Fri, Nov 21, 2014 at 1:30 AM, Yanbo Liang yanboha...@gmail.com wrote: You can get parameter such as spark.executor.memory, but you can not get executor or core numbers. Because executor and core are parameters of spark deploy environment not spark context. val conf = new SparkConf().set(spark.executor.memory,2G) val sc = new SparkContext(conf) sc.getConf.get(spark.executor.memory) conf.get(spark.executor.memory) 2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer t...@preferred.jp: Hi, when running on YARN, is there a way for the Spark driver to know how many executors, cores per executor etc. there are? I want to know this so I can repartition to a good number. Thanks Tobias
Re: Parsing a large XML file using Spark
Actually, it's a real On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer t...@preferred.jp wrote: Hi, see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for one solution. One issue with those XML files is that they cannot be processed line by line in parallel; plus you inherently need shared/global state to parse XML or check for well-formedness, I think. (Same issue with multi-line JSON, by the way.) Tobias
Re: Extracting values from a Collecion
I am sorry the last line in the code is file1Rdd.join(file2RddGrp.mapValues(names = names.toSet)).collect().foreach(println) so My Code===val file1Rdd = sc.textFile(/Users/sansub01/mycode/data/songs/names.txt).map(x = (x.split(,)(0), x.split(,)(1)))val file2Rdd = sc.textFile(/Users/sansub01/mycode/data/songs/songs.txt).map(x = (x.split(,)(0), x.split(,)(1)))val file2RddGrp = file2Rdd.groupByKey()file1Rdd.join(file2RddGrp.mapValues(names = names.toSet)).collect().foreach(println) Result===(4,(ringo,Set(With a Little Help From My Friends, Octopus's Garden)))(2,(john,Set(Julia, Nowhere Man)))(3,(george,Set(While My Guitar Gently Weeps, Norwegian Wood)))(1,(paul,Set(Yesterday, Michelle))) Again the question is how do I extract values from the Set ? thanks sanjay From: Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID To: Arun Ahuja aahuj...@gmail.com; Andrew Ash and...@andrewash.com Cc: user user@spark.apache.org Sent: Friday, November 21, 2014 10:41 AM Subject: Extracting values from a Collecion hey guys names.txt= 1,paul2,john3,george4,ringo songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's Garden What I want to do is real simple Desired Output ==(4,(With a Little Help From My Friends, Octopus's Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian Wood))(1,(Yesterday, Michelle)) My Code===val file1Rdd = sc.textFile(/Users/sansub01/mycode/data/songs/names.txt).map(x = (x.split(,)(0), x.split(,)(1)))val file2Rdd = sc.textFile(/Users/sansub01/mycode/data/songs/songs.txt).map(x = (x.split(,)(0), x.split(,)(1)))val file2RddGrp = file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names = names.toSet).collect().foreach(println) Result===(4,Set(With a Little Help From My Friends, Octopus's Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, Norwegian Wood))(1,Set(Yesterday, Michelle)) How can I extract values from the Set ? Thanks sanjay
Re: Parsing a large XML file using Spark
(sorry about the previous spam... google inbox didn't allowed me to cancel the miserable sent action :-/) So what I was about to say: it's a real PAIN tin the ass to parse the wikipedia articles in the dump due to this mulitline articles... However, there is a way to manage that quite easily, although I found it rather slow. *1/ use XML reader* Use the org.apache.hadoop % hadoop-streaming % 1.0.4 *2/ configure the hadoop job* import org.apache.hadoop.streaming.StreamXmlRecordReader import org.apache.hadoop.mapred.JobConf val jobConf = new JobConf() jobConf.set(stream.recordreader.class, org.apache.hadoop.streaming.StreamXmlRecordReader) jobConf.set(stream.recordreader.begin, page) jobConf.set(stream.recordreader.end, /page) org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, shdfs://$master:9000/data.xml) // Load documents (one per line). val documents = sparkContext.hadoopRDD(jobConf, classOf[org.apache.hadoop.streaming.StreamInputFormat], classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) *3/ use the result as XML doc* import scala.xml.XML val texts = documents.map(_._1.toString) .map{ s = val xml = XML.loadString(s) val id = (xml \ id).text.toDouble val title = (xml \ title).text val text = (xml \ revision \ text).text.replaceAll(\\W, ) val tknzed = text.split(\\W).filter(_.size 3).toList (id, title, tknzed ) } HTH andy On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer t...@preferred.jp wrote: Hi, see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for one solution. One issue with those XML files is that they cannot be processed line by line in parallel; plus you inherently need shared/global state to parse XML or check for well-formedness, I think. (Same issue with multi-line JSON, by the way.) Tobias
Re: JVM Memory Woes
Quick update: It is a filter job that creates the error above, not the reduceByKey Why would a filter cause an out of memory? Here is my code val inputgsup =hdfs://+sparkmasterip+/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*; val gsupfile = sc.newAPIHadoopFile[BytesWritable,BytesWritable,SequenceFileAsBinaryInputFormat](inputgsup) val gsup = gsupfile.map(x = (GsupHandler.DeserializeKey( x._1.getBytes ),GsupHandler.DeserializeValue( x._2.getBytes ))).map(x = (x._1._1,x._1._2,x._2._1, x._2._2)) val gsup_results_geod = gsup.flatMap(x= doQueryGSUP(has_expo_criteria, has_fence_criteria, timerange_start_expo, timerange_end_expo, timerange_start_fence, timerange_end_fence, expo_pois, fence_pois,x)) val gsup_results_reduced = gsup_results_geod.reduceByKey((a,b)=((a._1.toShort | b._1.toShort).toByte, a._2+b._2)) *val gsup_results = gsup_results_reduced.filter(x=(criteria_filter.value contains x._2._1.toInt))* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-Memory-Woes-tp19496p19510.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Many retries for Python job
According to the web UI I don¹t see any executors dying during Stage 2. I looked over the YARN logs and didn¹t see anything suspicious, but I may not have been looking closely enough. Stage 2 seems to complete just fine, it¹s just when it enters Stage 3 that the results from the previous stage seem to be missing in many cases and result in FetchFailure errors. I should probably also mention that I have the spark.storage.memoryFraction set to 0.2. From: Sandy Ryza sandy.r...@cloudera.com Date: Friday, November 21, 2014 at 1:41 PM To: Brett Meyer brett.me...@crowdstrike.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Many retries for Python job Hi Brett, Are you noticing executors dying? Are you able to check the YARN NodeManager logs and see whether YARN is killing them for exceeding memory limits? -Sandy On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer brett.me...@crowdstrike.com wrote: I¹m running a Python script with spark-submit on top of YARN on an EMR cluster with 30 nodes. The script reads in approximately 3.9 TB of data from S3, and then does some transformations and filtering, followed by some aggregate counts. During Stage 2 of the job, everything looks to complete just fine with no executor failures or resubmissions, but when Stage 3 starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors. Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can successfully start. The whole application eventually completes, but there is an addition of about 1+ hour overhead for all of the retries. I¹m trying to determine why there were FetchFailure exceptions, since anything computed in the job that could not fit in the available memory cache should be by default spilled to disk for further retrieval. I also see some java.net.ConnectException: Connection refused² and java.io.IOException: sendMessageReliably failed without being ACK¹d errors in the logs after a CancelledKeyException followed by a ClosedChannelException, but I have no idea why the nodes in the EMR cluster would suddenly stop being able to communicate. If anyone has ideas as to why the data needs to be rerun several times in this job, please let me know as I am fairly bewildered about this behavior. smime.p7s Description: S/MIME cryptographic signature
RE: tableau spark sql cassandra
Thanks, Jerome. BTW, have you tried the CalliopeServer2 from tuplejump? I was able to quickly connect from beeline/Squirrel to my Cassandra cluster using CalliopeServer2, which extends Spark SQL Thrift Server. It was very straight forward. Next step is to connect from Tableau, but I can't find Tableau's Spark connector. Where did you download it from? Mohammed -Original Message- From: jererc [mailto:jer...@gmail.com] Sent: Friday, November 21, 2014 5:27 AM To: u...@spark.incubator.apache.org Subject: RE: tableau spark sql cassandra Hi! Sure, I'll post the info I grabbed once the cassandra tables values appear in Tableau. Best, Jerome -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib: LinearRegressionWithSGD performance
Hi Sameer, You can try increasing the number of executor-cores. -Jayant On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I have been using MLLib's linear regression and I have some question regarding the performance. We have a cluster of 10 nodes -- each node has 24 cores and 148GB memory. I am running my app as follows: time spark-submit --class medslogistic.MedsLogistic --master yarn-client --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar I am also going to play with number of executors (reduce it) may be that will give us different results. The input is a 800MB sparse file in LibSVNM format. Total number of features is 150K. It takes approximately 70 minutes for the regression to finish. The job imposes very little load on CPU, memory, network, and disk. Total number of tasks is 104. Total time gets divided fairly uniformly across these tasks each task. I was wondering, is it possible to reduce the execution time further? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib: LinearRegressionWithSGD performance
Hi Sameer, You can also use repartition to create a higher number of tasks. -Jayant On Fri, Nov 21, 2014 at 12:02 PM, Jayant Shekhar jay...@cloudera.com wrote: Hi Sameer, You can try increasing the number of executor-cores. -Jayant On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak ssti...@live.com wrote: Hi All, I have been using MLLib's linear regression and I have some question regarding the performance. We have a cluster of 10 nodes -- each node has 24 cores and 148GB memory. I am running my app as follows: time spark-submit --class medslogistic.MedsLogistic --master yarn-client --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar I am also going to play with number of executors (reduce it) may be that will give us different results. The input is a 800MB sparse file in LibSVNM format. Total number of features is 150K. It takes approximately 70 minutes for the regression to finish. The job imposes very little load on CPU, memory, network, and disk. Total number of tasks is 104. Total time gets divided fairly uniformly across these tasks each task. I was wondering, is it possible to reduce the execution time further? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL with Apache Phoenix lower and upper Bound
I want to run queries on Apache Phoenix which has a JDBC driver. The query that I want to run is: select ts,ename from random_data_date limit 10 But I'm having issues with the JdbcRDD upper and lowerBound parameters (that I don't actually understand). Here's what I have so far: import org.apache.spark.rdd.JdbcRDD import java.sql.{Connection, DriverManager, ResultSet} val url=jdbc:phoenix:zookeeper val sql = select ts,ename from random_data_date limit ? val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql, 5, 10, 2, r = r.getString(ts) + , + r.getString(ename)) But this doesn't work because the sql expression that the JdbcRDD expects has to have two ?s to represent the lower and upper bound. How can I run my query through the JdbcRDD? Regards, Alaa Ali
Re: Spark SQL with Apache Phoenix lower and upper Bound
Hi Alaa Ali, In order for Spark to split the JDBC query in parallel, it expects an upper and lower bound for your input data, as well as a number of partitions so that it can split the query across multiple tasks. For example, depending on your data distribution, you could set an upper and lower bound on your timestamp range, and spark should be able to create new sub-queries to split up the data. Another option is to load up the whole table using the PhoenixInputFormat as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate functions, but it does let you load up whole tables as RDDs. I've previously posted example code here: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E There's also an example library implementation here, although I haven't had a chance to test it yet: https://github.com/simplymeasured/phoenix-spark Josh On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote: I want to run queries on Apache Phoenix which has a JDBC driver. The query that I want to run is: select ts,ename from random_data_date limit 10 But I'm having issues with the JdbcRDD upper and lowerBound parameters (that I don't actually understand). Here's what I have so far: import org.apache.spark.rdd.JdbcRDD import java.sql.{Connection, DriverManager, ResultSet} val url=jdbc:phoenix:zookeeper val sql = select ts,ename from random_data_date limit ? val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql, 5, 10, 2, r = r.getString(ts) + , + r.getString(ename)) But this doesn't work because the sql expression that the JdbcRDD expects has to have two ?s to represent the lower and upper bound. How can I run my query through the JdbcRDD? Regards, Alaa Ali
Re: MongoDB Bulk Inserts
I tried using RDD#mapPartitions but my job completes prematurely and without error as if nothing gets done. What I have is fairly simple sc .textFile(inputFile) .map(parser.parse) .mapPartitions(bulkLoad) But the Iterator[T] of mapPartitions is always empty, even though I know map is generating records. On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta soumya.sima...@gmail.com wrote: On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson ben.d.tho...@gmail.com wrote: I'm trying to use MongoDB as a destination for an ETL I'm writing in Spark. It appears I'm gaining a lot of overhead in my system databases (and possibly in the primary documents themselves); I can only assume it's because I'm left to using PairRDD.saveAsNewAPIHadoopFile. - Is there a way to batch some of the data together and use Casbah natively so I can use bulk inserts? Why cannot you write Mongo in a RDD#mapPartition ? - Is there maybe a less hacky way to load to MongoDB (instead of using saveAsNewAPIHadoopFile)? If the latency (time by which all data should be in Mongo) is not a concern you can try a separate process that uses Akka/Casbah to write from HDFS into Mongo.
Running Spark application from Tomcat
I have a Spark java application that I run in local-mode. As such it runs without any issues. Now, I would like to run it as a webservice from Tomcat. The first issue I had with this was that the spark-assembly jar contains javax.servlet, which Tomcat does not allow. Therefore I removed javax.servlet from the jar file. Now, I get an Exception like this: java.lang.RuntimeException: class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not org.apache.hadoop.security.GroupMappingServiceProvider org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1921) org.apache.hadoop.security.Groups.init(Groups.java:64) org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:36) org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:109) org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) org.apache.spark.SparkContext.init(SparkContext.scala:228) org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) Any ideas? Thanks, Andreas
Re: Spark SQL with Apache Phoenix lower and upper Bound
Awesome, thanks Josh, I missed that previous post of yours! But your code snippet shows a select statement, so what I can do is just run a simple select with a where clause if I want to, and then run my data processing on the RDD to mimic the aggregation I want to do with SQL, right? Also, another question, I still haven't tried this out, but I'll actually be using this with PySpark, so I'm guessing the PhoenixPigConfiguration and newHadoopRDD can be defined in PySpark as well? Regards, Alaa Ali On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin jmaho...@interset.com wrote: Hi Alaa Ali, In order for Spark to split the JDBC query in parallel, it expects an upper and lower bound for your input data, as well as a number of partitions so that it can split the query across multiple tasks. For example, depending on your data distribution, you could set an upper and lower bound on your timestamp range, and spark should be able to create new sub-queries to split up the data. Another option is to load up the whole table using the PhoenixInputFormat as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate functions, but it does let you load up whole tables as RDDs. I've previously posted example code here: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E There's also an example library implementation here, although I haven't had a chance to test it yet: https://github.com/simplymeasured/phoenix-spark Josh On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote: I want to run queries on Apache Phoenix which has a JDBC driver. The query that I want to run is: select ts,ename from random_data_date limit 10 But I'm having issues with the JdbcRDD upper and lowerBound parameters (that I don't actually understand). Here's what I have so far: import org.apache.spark.rdd.JdbcRDD import java.sql.{Connection, DriverManager, ResultSet} val url=jdbc:phoenix:zookeeper val sql = select ts,ename from random_data_date limit ? val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql, 5, 10, 2, r = r.getString(ts) + , + r.getString(ename)) But this doesn't work because the sql expression that the JdbcRDD expects has to have two ?s to represent the lower and upper bound. How can I run my query through the JdbcRDD? Regards, Alaa Ali
Re: Spark SQL with Apache Phoenix lower and upper Bound
Ali, just create a BIGINT column with numeric values in phoenix and use sequences http://phoenix.apache.org/sequences.html to populate it automatically I included the setup below in case someone starts from scratch Prerequisites: - export JAVA_HOME, SCALA_HOME and install sbt - install hbase in standalone mode http://hbase.apache.org/book/quickstart.html - add phoenix jar http://phoenix.apache.org/download.html to hbase lib directory - start hbase and create a table in phoenix http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html to verify everything is working - install spark in standalone mode, and verify that it works using spark shell http://spark.apache.org/docs/latest/quick-start.html 1. create a sequence http://phoenix.apache.org/sequences.html in phoenix: $PHOENIX_HOME/hadoop1/bin/sqlline.py localhost CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence; 2.add a BIGINT column called e.g. id to your table in phoenix CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR); 3. add some values UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR my_schema.my_sequence, 'foo'); UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR my_schema.my_sequence, 'bar'); 4. create jdbc adapter (following SimpleApp setup in Spark-GettingStarted-StandAlone applications https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications ): //SparkToJDBC.scala import java.sql.DriverManager import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Date; import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD object SparkToJDBC { def main(args: Array[String]) { val sc = new SparkContext(local, phoenix) try{ val rdd = new JdbcRDD(sc,() = { Class.forName(org.apache.phoenix.jdbc.PhoenixDriver).newInstance() DriverManager.getConnection(jdbc:phoenix:localhost, , ) }, SELECT id, name FROM test.orders WHERE id = ? AND id = ?, 1, 100, 3, (r:ResultSet) = { processResultSet(r) } ).cache() println(#); println(rdd.count()); println(#); } catch { case _: Throwable = println(Could not connect to database) } sc.stop() } def processResultSet(rs: ResultSet){ val rsmd = rs.getMetaData() val numberOfColumns = rsmd.getColumnCount() var i = 1 while (i = numberOfColumns) { val colName = rsmd.getColumnName(i) val tableName = rsmd.getTableName(i) val name = rsmd.getColumnTypeName(i) val caseSen = rsmd.isCaseSensitive(i) val writable = rsmd.isWritable(i) println(Information for column + colName) println(Column is in table + tableName) println(column type is + name) println() i += 1 } while (rs.next()) { var i = 1 while (i = numberOfColumns) { val s = rs.getString(i) System.out.print(s + ) i += 1 } println() } } } 5. build SparkToJDBC.scala sbt package 6. execute spark job: note: don't forget to add phoenix jar using --jars option like this: ../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/* *phoenix-3.1.0-client-hadoop2.**jar *--class SparkToJDBC --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar regards Alex On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin jmaho...@interset.com wrote: Hi Alaa Ali, In order for Spark to split the JDBC query in parallel, it expects an upper and lower bound for your input data, as well as a number of partitions so that it can split the query across multiple tasks. For example, depending on your data distribution, you could set an upper and lower bound on your timestamp range, and spark should be able to create new sub-queries to split up the data. Another option is to load up the whole table using the PhoenixInputFormat as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate functions, but it does let you load up whole tables as RDDs. I've previously posted example code here: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E There's also an example library implementation here, although I haven't had a chance to test it yet: https://github.com/simplymeasured/phoenix-spark Josh On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote: I want to run queries on Apache Phoenix which has a JDBC driver. The query that I want to run is: select ts,ename
Persist kafka streams to text file
Hello I am trying to read kafka stream to a text file by running spark from my IDE (IntelliJ IDEA) . The code is similar as a previous thread on persisting stream to a text file. I am new to spark or scala. I believe the spark is on local mode as the console shows 14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration: spark.app.name=local-mode I got the following errors. It is related to Tachyon. But I don't know if I have tachyon or not. 14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create tachyon dir null failed java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998 after 5 attempts at tachyon.client.TachyonFS.connect(TachyonFS.java:293) at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011) at tachyon.client.TachyonFS.exist(TachyonFS.java:633) at org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117) at org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106) at org.apache.spark.storage.TachyonBlockManager.init(TachyonBlockManager.scala:57) at org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88) at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: tachyon.org.apache.thrift.TException: Failed to connect to master localhost/127.0.0.1:19998 after 5 attempts at tachyon.master.MasterClient.connect(MasterClient.java:178) at tachyon.client.TachyonFS.connect(TachyonFS.java:290) ... 28 more Caused by: tachyon.org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185) at tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) at tachyon.master.MasterClient.connect(MasterClient.java:156) ... 29 more Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180) ... 31 more 14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/driver I looked at the code. It has the following part. Is that a problem? .persist(StorageLevel.OFF_HEAP) Any advice? Thank you! J
Persist kafka streams to text file, tachyon error?
use the right email list. -- Forwarded message -- From: Joanne Contact joannenetw...@gmail.com Date: Fri, Nov 21, 2014 at 2:32 PM Subject: Persist kafka streams to text file To: u...@spark.incubator.apache.org Hello I am trying to read kafka stream to a text file by running spark from my IDE (IntelliJ IDEA) . The code is similar as a previous thread on persisting stream to a text file. I am new to spark or scala. I believe the spark is on local mode as the console shows 14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration: spark.app.name=local-mode I got the following errors. It is related to Tachyon. But I don't know if I have tachyon or not. 14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create tachyon dir null failed java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998 after 5 attempts at tachyon.client.TachyonFS.connect(TachyonFS.java:293) at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011) at tachyon.client.TachyonFS.exist(TachyonFS.java:633) at org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117) at org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106) at org.apache.spark.storage.TachyonBlockManager.init(TachyonBlockManager.scala:57) at org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88) at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: tachyon.org.apache.thrift.TException: Failed to connect to master localhost/127.0.0.1:19998 after 5 attempts at tachyon.master.MasterClient.connect(MasterClient.java:178) at tachyon.client.TachyonFS.connect(TachyonFS.java:290) ... 28 more Caused by: tachyon.org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185) at tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) at tachyon.master.MasterClient.connect(MasterClient.java:156) ... 29 more Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180) ... 31 more 14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to create tachyon dir in /tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/driver I looked at the code. It has the following part. Is that a problem? .persist(StorageLevel.OFF_HEAP) Any advice? Thank you! J
Re: SparkSQL - can we add new column(s) to parquet files
I would expect an SQL query on c would fail because c would not be known in the schema of the older Parquet file. What I'd be very interested in is how to add a new column as an incremental new parquet file, and be able to somehow join the existing and new file, in an efficient way. IE, somehow guarantee that for every row in the old parquet file, that the corresponding rows in the new file would be stored in the same node, so that joins are local. On Fri, Nov 21, 2014 at 10:03 AM, Sadhan Sood sadhan.s...@gmail.com wrote: We create the table definition by reading the parquet file for schema and store it in hive metastore. But if someone adds a new column to the schema, and if we rescan the schema from the new parquet files and update the table definition, would it still work if we run queries on the table ? So, old table has - Int a, Int b new table - Int a, Int b, String c but older parquet files don't have String c, so on querying the table would it return me null for column c from older files and data from newer files or fail?
Re: Another accumulator question
Hi Nathan, It sounds like what you're asking for has already been filed as https://issues.apache.org/jira/browse/SPARK-664 Does that ticket match what you're proposing? Andrew On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: We've done this with reduce - that definitely works. I've reworked the logic to use accumulators because, when it works, it's 5-10x faster On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote: This sounds more like a use case for reduce? or fold? it sounds like you're kind of cobbling together the same function on accumulators, when reduce/fold are simpler and have the behavior you suggest. On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think I understand what is going on here, but I was hoping someone could confirm (or explain reality if I don't) what I'm seeing. We are collecting data using a rather sizable accumulator - essentially, an array of tens of thousands of entries. All told, about 1.3m of data. If I understand things correctly, it looks to me like, when our job is done, a copy of this array is retrieved from each individual task, all at once, for combination on the client - which means, with 400 tasks to the job, each collection is using up half a gig of memory on the client. Is this true? If so, does anyone know a way to get accumulators to accumulate as results collect, rather than all at once at the end, so we only have to hold a few in memory at a time, rather than all 400? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
RE: Using TF-IDF from MLlib
Thanks for the info Andy. A big help. One thing - I think you can figure out which document is responsible for which vector without checking in more code. Start with a PairRDD of [doc_id, doc_string] for each document and split that into one RDD for each column. The values in the doc_string RDD get split and turned into a Seq and fed to TFIDF. You can take the resulting RDD[Vector]s and zip them with the doc_id RDD. Presto! Best regards, Ron
Re: Using TF-IDF from MLlib
Yeah, I initially used zip but I was wondering how reliable it is. I mean, it's the order guaranteed? What if some mode fail, and the data is pulled out from different nodes? And even if it can work, I found this implicit semantic quite uncomfortable, don't you? My0.2c Le ven 21 nov. 2014 15:26, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com a écrit : Thanks for the info Andy. A big help. One thing - I think you can figure out which document is responsible for which vector without checking in more code. Start with a PairRDD of [doc_id, doc_string] for each document and split that into one RDD for each column. The values in the doc_string RDD get split and turned into a Seq and fed to TFIDF. You can take the resulting RDD[Vector]s and zip them with the doc_id RDD. Presto! Best regards, Ron
Book: Data Analysis with SparkR
Is the a book on SparkR for the absolute terrified beginner? I use R for my daily analysis and I am interested in a detailed guide to using SparkR for data analytics: like a book or online tutorials. If there's any please direct me to the address. Thanks, Daniel -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion
Ah yes. I found it too in the manual. Thanks for the help anyway! Since BigDecimal is just a wrapper around BigInt, let's also convert to BigInt to Decimal. I created a ticket. https://issues.apache.org/jira/browse/SPARK-4549 Jianshi On Fri, Nov 21, 2014 at 11:30 PM, Yin Huai huaiyin@gmail.com wrote: Hello Jianshi, The reason of that error is that we do not have a Spark SQL data type for Scala BigInt. You can use Decimal for your case. Thanks, Yin On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got an error during rdd.registerTempTable(...) saying scala.MatchError: scala.BigInt Looks like BigInt cannot be used in SchemaRDD, is that correct? So what would you recommend to deal with it? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Missing parents for stage (Spark Streaming)
When I submit a Spark Streaming job, I see these INFO logs printing frequently: 14/11/21 18:53:17 INFO DAGScheduler: waiting: Set(Stage 216) 14/11/21 18:53:17 INFO DAGScheduler: failed: Set() 14/11/21 18:53:17 INFO DAGScheduler: Missing parents for Stage 216: List() 14/11/21 18:53:17 INFO DAGScheduler: Submitting Stage 216 (MappedRDD[1733] at map at MappedDStream.scala:35), which is now runnable I have a feeling this means there is some error with a Map I created as a broadcast variable, but I'm not sure. How can I figure out what this is referring to? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Missing-parents-for-stage-Spark-Streaming-tp19530.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Book: Data Analysis with SparkR
Hi Daniel, Thanks for your email! We don't have a book (yet?) specifically on SparkR, but here's a list of helpful tutorials / links you can check out (I am listing them in roughly basic - advanced order): - AMPCamp5 SparkR exercises http://ampcamp.berkeley.edu/5/exercises/sparkr.html. This covers the basics of SparkR's API, performs basic analytics, and visualizes the results. - SparkR examples https://github.com/amplab-extras/SparkR-pkg/tree/master/examples. We have K-means, logistic regression, MNIST solver, \pi estimation, word count and other examples available. - Running SparkR on EC2 https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2. This entry details the steps to run a SparkR program on an EC2 cluster. Finally, we have a talk at the AMPCamp http://ampcamp.berkeley.edu/5/ today on SparkR, whose video slides will be available soon on the website -- it covers the basics of the interface what you can do with it. Additionally, you could direct any SparkR questions to our sparkr-dev https://groups.google.com/forum/#!forum/sparkr-dev mailing list. Let us know if you have further questions. Zongheng On Fri Nov 21 2014 at 3:48:53 PM Emaasit daniel.emaa...@gmail.com wrote: Is the a book on SparkR for the absolute terrified beginner? I use R for my daily analysis and I am interested in a detailed guide to using SparkR for data analytics: like a book or online tutorials. If there's any please direct me to the address. Thanks, Daniel -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MongoDB Bulk Inserts
bulkLoad has the connection to MongoDB ? On Fri, Nov 21, 2014 at 4:34 PM, Benny Thompson ben.d.tho...@gmail.com wrote: I tried using RDD#mapPartitions but my job completes prematurely and without error as if nothing gets done. What I have is fairly simple sc .textFile(inputFile) .map(parser.parse) .mapPartitions(bulkLoad) But the Iterator[T] of mapPartitions is always empty, even though I know map is generating records. On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta soumya.sima...@gmail.com wrote: On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson ben.d.tho...@gmail.com wrote: I'm trying to use MongoDB as a destination for an ETL I'm writing in Spark. It appears I'm gaining a lot of overhead in my system databases (and possibly in the primary documents themselves); I can only assume it's because I'm left to using PairRDD.saveAsNewAPIHadoopFile. - Is there a way to batch some of the data together and use Casbah natively so I can use bulk inserts? Why cannot you write Mongo in a RDD#mapPartition ? - Is there maybe a less hacky way to load to MongoDB (instead of using saveAsNewAPIHadoopFile)? If the latency (time by which all data should be in Mongo) is not a concern you can try a separate process that uses Akka/Casbah to write from HDFS into Mongo.
allocating different memory to different executor for same application
Hello Experts, I have 5 worker machines with different size of RAM. is there a way to configure it with different executor memory? Currently I see that all worker spins up 1 executor with same amount of memory. Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/allocating-different-memory-to-different-executor-for-same-application-tp19534.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Another accumulator question
Im not sure if it's an exact match, or just very close :-) I don't think our problem is the workload on the driver, I think it's just memory - so while the solution proposed there would work, it would also be sufficient for our purposes, I believe, simply to clear each block as soon as it's added into the canonical version, and try to do so as soon as possible - but I could be misunderstanding some of the timing, I'm still investigating. Though to combine on the worker before returning, as he suggests, would probably be even better. On Fri, Nov 21, 2014 at 6:08 PM, Andrew Ash and...@andrewash.com wrote: Hi Nathan, It sounds like what you're asking for has already been filed as https://issues.apache.org/jira/browse/SPARK-664 Does that ticket match what you're proposing? Andrew On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: We've done this with reduce - that definitely works. I've reworked the logic to use accumulators because, when it works, it's 5-10x faster On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen so...@cloudera.com wrote: This sounds more like a use case for reduce? or fold? it sounds like you're kind of cobbling together the same function on accumulators, when reduce/fold are simpler and have the behavior you suggest. On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I think I understand what is going on here, but I was hoping someone could confirm (or explain reality if I don't) what I'm seeing. We are collecting data using a rather sizable accumulator - essentially, an array of tens of thousands of entries. All told, about 1.3m of data. If I understand things correctly, it looks to me like, when our job is done, a copy of this array is retrieved from each individual task, all at once, for combination on the client - which means, with 400 tasks to the job, each collection is using up half a gig of memory on the client. Is this true? If so, does anyone know a way to get accumulators to accumulate as results collect, rather than all at once at the end, so we only have to hold a few in memory at a time, rather than all 400? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
spark-sql broken
After taking today's build from master branch I started getting this error when run spark-sql: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. I used following command for building: ./make-distribution.sh --tgz -Pyarn -Dyarn.version=2.4.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests Is there anything I am missing? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broken-tp19536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) Here is my setup: 1) Latest spark 1.2 branch build 2) Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3) Added hive-site.xml to \conf 4) Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 42) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :202) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:230) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv. scala:38) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh riftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr iftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Precondition s at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
Spark streaming job failing after some time.
I have seen similar posts on this issue but could not find solution. Apologies if this has been discussed here before. I am running a spark streaming job with yarn on a 5 node cluster. I am using following command to submit my streaming job. spark-submit --class class_name --master yarn-cluster --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar After running for some time, the job stops. The application log shows following two errors: 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) and later... Failed to list files for dir: /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169) Note: I am building my jar on my local with spark dependency added in pom.xml and running it on cluster running spark. -Pankaj
Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1)Latest spark 1.2 branch build 2)Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3)Added hive-site.xml to \conf 4)Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 42) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :202) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:230) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv. scala:38) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh riftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr iftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Precondition s at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)