JavaSparkContext: dependency on ui/
I notice that there is a dependency from the SparkContext on the "createLiveUI" functionality. Is that really required? Or is there a more minimal JAvaSparkContext we can create? Im packaging a jar with a spark client and would rather avoid resource/ dependencys as they might be trickier to maintain than just class deps alone. java.lang.Exception: Could not find resource path for Web UI: org/apache/spark/ui/static at org.apache.spark.ui.JettyUtils$.createStaticHandler(JettyUtils.scala:182) at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:73) at org.apache.spark.ui.SparkUI.(SparkUI.scala:81) at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:215) at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:157) at org.apache.spark.SparkContext.(SparkContext.scala:445) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) -- jay vyas
Spark jobs without a login
Hi spark: Is it possible to avoid reliance on a login user when running a spark job? I'm running out a container that doesnt supply a valid user name, and thus, I'm getting the following exception: org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:675) I'm not too worries about this - but it seems like it might be nice if maybe we could specify a user name as part of sparks context or as part of an external parameter rather then having to use the java based user/group extractor. -- jay vyas
Re: Unit Testing
yes there certainly is, so long as eclipse has the right plugins and so on to run scala programs. You're really asking two questions: (1) Can I use a modern IDE to develop spark apps and (2) can we easily unit test spark streaming apps. the answer is yes to both... Regarding your IDE: I like to use intellij with the set plugins for scala development. It allows you to run everything from inside the IDE. I've written up setup instructions here: http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html Now, regarding local unit testing: As an example, here is a unit test for confirming that spark can write to cassandra. https://github.com/jayunit100/SparkStreamingApps/blob/master/src/test/scala/TestTwitterCassandraETL.scala The key here is to just set your local master in the unit test, like so sc.setMaster(local[2]) local[2] gaurantees that you'll have a producer and a consumer, so that you don't get a starvation scenario. On Wed, Aug 12, 2015 at 7:31 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to run spark streaming methods in standalone eclipse environment to test out the functionality? -- jay vyas
Re: Amazon DynamoDB Spark
In general the simplest way is that you can use the Dynamo Java API as is and call it inside a map(), and use the asynchronous put() Dynamo api call . On Aug 7, 2015, at 9:08 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, Is there a way using DynamoDB in spark application? I have to persist my results to DynamoDB. Thanx, yasemin -- hiç ender hiç - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to build Spark with my own version of Hadoop?
As you know, the hadoop versions and so on are available in the spark build files, iirc the top level pox.xml has all the maven variables for versions. So I think if you just build hadoop locally (i.e. build it as it to 2.2.1234-SNAPSHOT and mvn install it), you should be able to change the corresponding varaible in the top level spark pom.xml. . Of course this is a pandoras box where now you need to also deploy your custom YARN on your cluster, make sure it matches the spark target, and so on (if your running spark on YARN). RPMs and DEB packages tend to be useful for this kind of thing, since you can easily sync the /etc/ config files and uniformly manage/upgrade versions etc. ... Thus... if your really serious about building a custom distribution, mixing matching hadoop components separately, you might want to consider using Apache BigTop, just bring this up on that mailing list... We curate a hadoop distribution builder that builds spark, hadoop, hive, ignite, kafka, zookeeper, hbase and so on... Since bigtop has all the tooling necessary to fully build, test, and deploy on VMs/containers your hadoop bits, it might make your life a little easier. On Tue, Jul 21, 2015 at 11:11 PM, Dogtail Ray spark.ru...@gmail.com wrote: Hi, I have modified some Hadoop code, and want to build Spark with the modified version of Hadoop. Do I need to change the compilation dependency files? How to then? Great thanks! -- jay vyas
Re: Spark Streaming on top of Cassandra?
hi. I have a spark streaming - cassandra application which you can probably borrow pretty easily. You can always rewrite a part of it in java if you need to , or else, you can just use scala (see the blog post below if you want a java style dev workflow w/ scala using intellij)/ This application implements a spark stream w twitter and ETLs it into either a file queue or cassandra (see the commented out cassandra snippet). https://github.com/jayunit100/SparkStreamingApps/blob/master/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala Cassandra sink works really well with the spark context compile time bindings . Maybe just clone this repo down and use it as a blueprint :) There is a blog post here about how to set up your IDE so that the dev workflow is very similar to that of standard java http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html good luck !. On Thu, May 21, 2015 at 4:24 PM, tshah77 tejasrs...@gmail.com wrote: Can some one provide example of Spark Streaming using Java? I have cassandra running but did not configure spark but would like to create Dstream. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-top-of-Cassandra-tp1283p22978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- jay vyas
Re: Re: spark streaming printing no output
empty folders generally means that you need to just increase the window intervals; i.e. spark streaming saveAsTxtFiles will save folders for each interval regardless On Wed, Apr 15, 2015 at 5:03 AM, Shushant Arora shushantaror...@gmail.com wrote: Its printing on console but on HDFS all folders are still empty . On Wed, Apr 15, 2015 at 2:29 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks !! Yes message types on this console is seen on another console. When I closed another console, spark streaming job is printing messages on console . Isn't the message written on a port using netcat be avaible for multiple consumers? On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote: Looks the message is consumed by the another console?( can see messages typed on this port from another console.) -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-04-15 17:11 *To:* Akhil Das ak...@sigmoidanalytics.com *CC:* user@spark.apache.org *Subject:* Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong. -- jay vyas
Re: Submitting to a cluster behind a VPN, configuring different IP address
yup a related JIRA is here https://issues.apache.org/jira/browse/SPARK-5113 which you might want to leave a comment in. This can be quite tricky we found ! but there are a host of env variable hacks you can use when launching spark masters/slaves. On Thu, Apr 2, 2015 at 5:18 PM, Michael Quinlan mq0...@gmail.com wrote: I was able to hack this on my similar setup issue by running (on the driver) $ sudo hostname ip Where ip is the same value set in the spark.driver.host property. This isn't a solution I would use universally and hope the someone can fix this bug in the distribution. Regards, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-to-a-cluster-behind-a-VPN-configuring-different-IP-address-tp9360p22363.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- jay vyas
Re: Single threaded laptop implementation beating a 128 node GraphX cluster on a 1TB data set (128 billion nodes) - What is a use case for GraphX then? when is it worth the cost?
Just the same as spark was disrupting the hadoop ecosystem by changing the assumption that you can't rely on memory in distributed analytics...now maybe we are challenging the assumption that big data analytics need to distributed? I've been asking the same question lately and seen similarly that spark performs quite reliably and well on local single node system even for an app which I ran for a streaming app which I ran for ten days in a row... I almost felt guilty that I never put it on a cluster! On Mar 30, 2015 5:51 AM, Steve Loughran ste...@hortonworks.com wrote: Note that even the Facebook four degrees of separation paper went down to a single machine running WebGraph (http://webgraph.di.unimi.it/) for the final steps, after running jobs in there Hadoop cluster to build the dataset for that final operation. The computations were performed on a 24-core machine with 72 GiB of memory and 1 TiB of disk space.6 The first task was to import the Facebook graph(s) into a compressed form for WebGraph [4], so that the multiple scans required by HyperANF’s diffusive process could be carried out relatively quickly. Some toolkits/libraries are optimised for that single dedicated use —yet are downstream of the raw data; where memory reads $L1-$L3 cache locality becomes the main performance problem, and where synchronisation techniques like BSP aren't necessarily needed. On 29 Mar 2015, at 23:18, Eran Medan ehrann.meh...@gmail.com wrote: Hi Sean, I think your point about the ETL costs are the wining argument here. but I would like to see more research on the topic. What I would like to see researched - is ability to run a specialized set of common algorithms in fast-local-mode just like a compiler optimizer can decide to inline some methods, or rewrite a recursive function as a for loop if it's in tail position, I would say that the future of GraphX can be that if a certain algorithm is a well known one (e.g. shortest paths) and can be run locally faster than on a distributed set (taking into account bringing all the data locally) then it will do so. Thanks! On Sat, Mar 28, 2015 at 1:34 AM, Sean Owen so...@cloudera.com wrote: (I bet the Spark implementation could be improved. I bet GraphX could be optimized.) Not sure about this one, but in core benchmarks often start by assuming that the data is local. In the real world, data is unlikely to be. The benchmark has to include the cost of bringing all the data to the local computation too, since the point of distributed computation is bringing work to the data. Specialist implementations for a special problem should always win over generalist, and Spark is a generalist. Likewise you can factor matrices way faster in a GPU than in Spark. These aren't entirely either/or propositions; you can use Rust or GPU in a larger distributed program. Typically a real-world problem involves more than core computation: ETL, security, monitoring. Generalists are more likely to have an answer to hand for these. Specialist implementations do just one thing, and they typically have to be custom built. Compare the cost of highly skilled developer time to generalist computing resources; $1m buys several dev years but also rents a small data center. Speed is an important issue but by no means everything in the real world, and these are rarely mutually exclusive options in the OSS world. This is a great piece of work, but I don't think it's some kind of argument against distributed computing. On Fri, Mar 27, 2015 at 6:32 PM, Eran Medan ehrann.meh...@gmail.com wrote: Remember that article that went viral on HN? (Where a guy showed how GraphX / Giraph / GraphLab / Spark have worse performance on a 128 cluster than on a 1 thread machine? if not here is the article - http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html) Well as you may recall, this stirred up a lot of commotion in the big data community (and Spark/GraphX in particular) People (justly I guess) blamed him for not really having “big data”, as all of his data set fits in memory, so it doesn't really count. So he took the challenge and came with a pretty hard to argue counter benchmark, now with a huge data set (1TB of data, encoded using Hilbert curves to 154GB, but still large). see at - http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html He provided the source here https://github.com/frankmcsherry/COST as an example His benchmark shows how on a 128 billion edges graph, he got X2 to X10 faster results on a single threaded Rust based implementation So, what is the counter argument? it pretty much seems like a blow in the face of Spark / GraphX etc, (which I like and use on a daily basis) Before I dive into re-validating his benchmarks with my own use cases. What is your opinion on this? If this is the case, then what IS the use case for
Re: Untangling dependency issues in spark streaming
thanks for posting this! Ive ran into similar issues before, and generally its a bad idea to swap the libraries out and pray fot the best, so the shade functionality is probably the best feature. Unfortunately, im not sure how well SBT and Gradle support shading... how do folks using next gen build tools solve this problem? On Sun, Mar 29, 2015 at 3:10 AM, Neelesh neele...@gmail.com wrote: Hi, My streaming app uses org.apache.httpcomponent:httpclient:4.3.6, but spark uses 4.2.6 , and I believe thats what's causing the following error. I've tried setting spark.executor.userClassPathFirst spark.driver.userClassPathFirst to true in the config, but that does not solve it either. Finally I had to resort to relocating classes using maven shade plugin while building my apps uber jar, using relocations relocation patternorg.apache.http/pattern shadedPatternorg.shaded.apache.http/shadedPattern /relocation /relocations Hope this is useful to others in the same situation. It would be really great to deal with this the right way (like tomcat or any other servlet container - classloader hierarchy etc). Caused by: java.lang.NoSuchFieldError: INSTANCE at org.apache.http.impl.io.DefaultHttpRequestWriterFactory.init(DefaultHttpRequestWriterFactory.java:52) at org.apache.http.impl.io.DefaultHttpRequestWriterFactory.init(DefaultHttpRequestWriterFactory.java:56) at org.apache.http.impl.io.DefaultHttpRequestWriterFactory.clinit(DefaultHttpRequestWriterFactory.java:46) at org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.init(ManagedHttpClientConnectionFactory.java:72) at org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.init(ManagedHttpClientConnectionFactory.java:84) at org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.clinit(ManagedHttpClientConnectionFactory.java:59) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$InternalConnectionFactory.init(PoolingHttpClientConnectionManager.java:494) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:149) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:138) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:114) and ... Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.http.impl.conn.ManagedHttpClientConnectionFactory at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$InternalConnectionFactory.init(PoolingHttpClientConnectionManager.java:494) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:149) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:138) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.init(PoolingHttpClientConnectionManager.java:114) -- jay vyas
Re: Apache Ignite vs Apache Spark
-https://wiki.apache.org/incubator/IgniteProposal has I think been updated recently and has a good comparison. - Although grid gain has been around since the spark days, Apache Ignite is quite new and just getting started I think so - you will probably want to reach out to the developers for details on ignites roadmap because there might be interesting details not yet codified. On Feb 26, 2015, at 1:08 PM, Sean Owen so...@cloudera.com wrote: Ignite is the renaming of GridGain, if that helps. It's like Oracle Coherence, if that helps. These do share some similarities -- fault tolerant, in-memory, distributed processing. The pieces they're built on differ, the architecture differs, the APIs differ. So fairly different in particulars. I never used the above, so can't be much more useful. On Thu, Feb 26, 2015 at 5:46 PM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: Can someone with experience briefly share or summarize the differences between Ignite and Spark? Are they complementary? Totally unrelated? Overlapping? Seems like ignite has reached version 1.0, I have never heard of it until a few days ago and given what is advertised, it sounds pretty interesting but I am unsure how this relates to or differs from Spark. Thanks! Ognen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and message ordering
This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks! -- jay vyas
Re: Strongly Typed SQL in Spark
Ah, nevermind, I just saw http://spark.apache.org/docs/1.2.0/sql-programming-guide.html (language integrated queries) which looks quite similar to what i was thinking about. I'll give that a whirl... On Wed, Feb 11, 2015 at 7:40 PM, jay vyas jayunit100.apa...@gmail.com wrote: Hi spark. is there anything in the works for a typesafe HQL like API for building spark queries from case classes ? i.e. where, given a domain object Product with a cost associated with it , we can do something like: query.select(Product).filter({ _.cost 50.00f }).join(ProductMetaData).by(product,meta=product.id=meta.id). toSchemaRDD ? I know the above snippet is totally wacky but, you get the idea :) -- jay vyas -- jay vyas
Strongly Typed SQL in Spark
Hi spark. is there anything in the works for a typesafe HQL like API for building spark queries from case classes ? i.e. where, given a domain object Product with a cost associated with it , we can do something like: query.select(Product).filter({ _.cost 50.00f }).join(ProductMetaData).by(product,meta=product.id=meta.id). toSchemaRDD ? I know the above snippet is totally wacky but, you get the idea :) -- jay vyas
SparkSQL DateTime
Hi spark ! We are working on the bigpetstore-spark implementation in apache bigtop, and want to implement idiomatic date/time usage for SparkSQL. It appears that org.joda.time.DateTime isnt in SparkSQL's rolodex of reflection types. I'd rather not force an artificial dependency on hive dates just for dealing with time stamps. Whats the simplest and cleanest way to map non-spark time values into SparkSQL friendly time values? - One option could be a custom SparkSQL type, i guess? - Any plan to have native spark sql support for Joda Time or (yikes) java.util.Calendar ? -- jay vyas
Re: Discourse: A proposed alternative to the Spark User list
Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with Spark Core 1.2.0 SBT project in IntelliJ
I find importing a working SBT project into IntelliJ is the way to go. How did you load the project into intellij? On Jan 13, 2015, at 4:45 PM, Enno Shioji eshi...@gmail.com wrote: Had the same issue. I can't remember what the issue was but this works: libraryDependencies ++= { val sparkVersion = 1.2.0 Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-streaming % sparkVersion % provided, org.apache.spark %% spark-streaming-twitter % sparkVersion % provided, org.apache.spark %% spark-streaming-kafka % sparkVersion % provided, javax.servlet % javax.servlet-api % 3.0.1 % provided ) } In order to run classes in main source in Intellij, you must invoke it from a source under test as Intellij won't provide the provided scope libraries when running code in main source (but it will for sources under test). With this config you can sbt assembly in order to get the fat jar without Spark jars. ᐧ On Tue, Jan 13, 2015 at 12:16 PM, Night Wolf nightwolf...@gmail.com wrote: Hi, I'm trying to load up an SBT project in IntelliJ 14 (windows) running 1.7 JDK, SBT 0.13.5 -I seem to be getting errors with the project. The build.sbt file is super simple; name := scala-spark-test1 version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.2.0 Then I have a super simple test class; package test import org.apache.spark.{SparkContext, SparkConf} case class Blah(s: Int, d: String) object Test1 { def main(args: Array[String]): Unit = { val sparkconf = new SparkConf().setMaster(local[4]).setAppName(test-spark) val sc = new SparkContext(sparkconf) val rdd = sc.parallelize(Seq( Blah(1,dsdsd), Blah(2,daaa), Blah(3,dhghghgh) )) rdd.collect().foreach(println) } } When I try to run the Test1 object in IntelliJ I get the following error; Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:73) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667) at org.apache.spark.HttpServer.start(HttpServer.scala:60) at org.apache.spark.HttpFileServer.initialize(HttpFileServer.scala:45) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:304) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:232) at test.Test1$.main(Test1.scala:10) at test.Test1.main(Test1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: java.lang.ClassNotFoundException: javax.servlet.http.HttpServletResponse at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more For whatever reason it seems that IntelliJ isnt pulling in these deps. Doing an sbt run works fine. Looking at the project structure it seems that 7 libs dont get marked as a dependency for my module... But they are on the dep tree http://pastebin.com/REkQh5ux image.png Is this something to do with the libs and scoping or shading in Spark and its associated libs? Has anyone else seen this issue? Cheers, NW
Re: Spark Streaming Threading Model
So , at any point does a stream stop producing RDDs ? If not, is there a possibility, if the batching isnt working or is broken, that your disk / RAM will fill up to the brim w/ unprocessed RDD backlog? On Fri, Dec 19, 2014 at 1:29 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Batches will wait for the previous batch to finish. The monitoring console will show you the backlog of waiting batches. From: Asim Jalis asimja...@gmail.com Date: Friday, December 19, 2014 at 1:16 PM To: user user@spark.apache.org Subject: Spark Streaming Threading Model Q: In Spark Streaming if your DStream transformation and output action take longer than the batch duration will the system process the next batch in another thread? Or will it just wait until the first batch’s RDD is processed? In other words does it build up a queue of buffered RDDs awaiting processing or does it just process them? Asim -- jay vyas
Re: Unit testing and Spark Streaming
https://github.com/jayunit100/SparkStreamingCassandraDemo On this note, I've built a framework which is mostly pure so that functional unit tests can be run composing mock data for Twitter statuses, with just regular junit... That might be relevant also. I think at some point we should come up with a robust test driven framework for building stream apps... And the idea of Scala test with the injection and comparison you did might be a good start. Thanks for starting this dialogue! On Dec 12, 2014, at 9:18 AM, Emre Sevinc emre.sev...@gmail.com wrote: On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote: How can the log level in test mode be reduced (or extended when needed) ? Hello Eric, The following might be helpful for reducing the log messages during unit testing: http://stackoverflow.com/a/2736/236007 -- Emre Sevinç https://be.linkedin.com/in/emresevinc
Re: Spark-Streaming: output to cassandra
Here's an example of a Cassandra etl that you can follow which should exit on its own. I'm using it as a blueprint for revolving spark streaming apps on top of. For me, I kill the streaming app w system.exit after a sufficient amount of data is collected. That seems to work for most any scenario... But you I guess could also kill on the stream handler side as well if you are writing a custom dstream. https://github.com/jayunit100/SparkBlueprint/blob/master/src/main/scala/sparkapps/tweetstream/Processor.scala On Dec 5, 2014, at 1:50 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Batch is the batch duration that you are specifying while creating the StreamingContext, so at the end of every batch's computation the data will get flushed to Cassandra, and why are you stopping your program with Ctrl + C? You can always specify the time with the sc.awaitTermination(Duration) Thanks Best Regards On Fri, Dec 5, 2014 at 11:53 AM, m.sar...@accenture.com wrote: Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination(); which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector. Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable)); so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: How to execute a custom python library on spark
a quick thought on this: I think this is distro dependent also, right? We ran into a similar issue in https://issues.apache.org/jira/browse/BIGTOP-1546 where it looked like the python libraries might be overwritten on launch. On Tue, Nov 25, 2014 at 3:09 PM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, I have written few datastructures as classes like following.. So, here is my code structure: project/foo/foo.py , __init__.py /bar/bar.py, __init__.py bar.py imports foo as from foo.foo import * /execute/execute.py imports bar as from bar.bar import * Ultimately I am executing execute.py as pyspark execute.py And this works fine locally.. but as soon I submit it on cluster... I see modules missing error.. I tried to send each and every file using --py-files flag (foo.py bar.py ) and other helper files.. But even then it complaints that module is not found So, the question is.. When one is building a library which is suppose to execute on top of spark, how should the imports and library be structured so that it works fine on spark. When to use pyspark and when to use spark submit to execute python scripts/module Bonus points if one can point an example library and how to run it :) Thanks -- jay vyas
Re: Code works in Spark-Shell but Fails inside IntelliJ
This seems pretty standard: your IntelliJ classpath isn't matched to the correct ones that are used in spark shell Are you using the SBT plugin? If not how are you putting deps into IntelliJ? On Nov 20, 2014, at 7:35 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys I am at AmpCamp 2014 at UCB right now :-) Funny Issue... This code works in Spark-Shell but throws a funny exception in IntelliJ CODE val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.parquet.binaryAsString, true) val wikiData = sqlContext.parquetFile(/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet) wikiData.registerTempTable(wikiData) sqlContext.sql(SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username '' GROUP BY username ORDER BY cnt DESC LIMIT 10).collect().foreach(println) RESULTS [Waacstats,2003] [Cydebot,949] [BattyBot,939] [Yobot,890] [Addbot,853] [Monkbot,668] [ChrisGualtieri,438] [RjwilmsiBot,387] [OccultZone,377] [ClueBot NG,353] INTELLIJ CODE = object ParquetSql { def main(args: Array[String]) { val sconf = new SparkConf().setMaster(local).setAppName(MedicalSideFx-NamesFoodSql) val sc = new SparkContext(sconf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.parquet.binaryAsString, true) val wikiData = sqlContext.parquetFile(/Users/sansub01/mycode/knowledge/spark_ampcamp_2014/data/wiki_parquet) wikiData.registerTempTable(wikiData) val results = sqlContext.sql(SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username '' GROUP BY username ORDER BY cnt DESC LIMIT 10) results.collect().foreach(println) } } INTELLIJ ERROR == Exception in thread main java.lang.IncompatibleClassChangeError: Found interface org.apache.spark.serializer.Serializer, but class was expected at org.apache.spark.sql.parquet.ParquetFilters$.serializeFilterExpressions(ParquetFilters.scala:244) at org.apache.spark.sql.parquet.ParquetTableScan.execute(ParquetTableOperations.scala:109) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:48) at org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.scala:45) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:151) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.scala:127) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) at org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:126) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:171) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at org.medicalsidefx.common.utils.ParquetSql$.main(ParquetSql.scala:18) at org.medicalsidefx.common.utils.ParquetSql.main(ParquetSql.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Re: Does Spark Streaming calculate during a batch?
1) Your have a receiver thread. That thread might use alot of CPU, or not, depending on how you implement the thread in onStart. 2) Every 5 minutes, spark will submit a job which process every RDD which was created (i.e using the store() call) in the receiver . That job will run asynchronously to the receiver, which is still working to produce new RDDs for the next batch, So, maybe you're monitoring the CPU only on the spark workers which is running the batch jobs, and not on the spark worker which is doing the RDD ingestion? On Thu, Nov 13, 2014 at 10:35 AM, Michael Campbell michael.campb...@gmail.com wrote: I was running a proof of concept for my company with spark streaming, and the conclusion I came to is that spark collects data for the batch-duration, THEN starts the data-pipeline calculations. My batch size was 5 minutes, and the CPU was all but dead for 5, then when the 5 minutes were up the CPU's would spike for a while presumably doing the calculations. Is this presumption true, or is it running the data through the calculation pipeline before the batch is up? What could lead to the periodic CPU spike - I had a reduceByKey, so was it doing that only after all the batch data was in? Thanks -- jay vyas
Re: Streaming: getting total count over all windows
I would think this should be done at the application level. After all, the core functionality of SparkStreaming is to capture RDDs in some real time interval and process them - not to aggregate their results. But maybe there is a better way... On Thu, Nov 13, 2014 at 8:28 PM, SK skrishna...@gmail.com wrote: Hi, I am using the following code to generate the (score, count) for each window: val score_count_by_window = topic.map(r = r._2) // r._2 is the integer score .countByValue() score_count_by_window.print() E.g. output for a window is as follows, which means that within the Dstream for that window, there are 2 rdds with score 0; 3 with score 1, and 1 with score -1. (0, 2) (1, 3) (-1, 1) I would like to get the aggregate count for each score over all windows until program terminates. I tried countByValueAndWindow() but the result is same as countByValue() (i.e. it is producing only per window counts). reduceByWindow also does not produce the result I am expecting. What is the correct way to sum up the counts over multiple windows? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-getting-total-count-over-all-windows-tp1.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- jay vyas
Re: Spark streaming cannot receive any message from Kafka
Yup , very important that n1 for spark streaming jobs, If local use local[2] The thing to remember is that your spark receiver will take a thread to itself and produce data , so u need another thread to consume it . In a cluster manager like yarn or mesos, the word thread Is not used anymore, I guess has different meaning- you need 2 or more free compute slots, and that should be guaranteed by looking to see how many free node managers are running etc. On Nov 12, 2014, at 7:53 PM, Shao, Saisai saisai.s...@intel.com wrote: Did you configure Spark master as local, it should be local[n], n 1 for local mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you can try that. I’ve tested with latest master, it’s OK. Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Thursday, November 13, 2014 8:45 AM To: Bill Jay Cc: u...@spark.incubator.apache.org Subject: Re: Spark streaming cannot receive any message from Kafka Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Re: random shuffle streaming RDDs?
A use case would be helpful? Batches of RDDs from Streams are going to have temporal ordering in terms of when they are processed in a typical application ... , but maybe you could shuffle the way batch iterations work On Nov 3, 2014, at 11:59 AM, Josh J joshjd...@gmail.com wrote: When I'm outputting the RDDs to an external source, I would like the RDDs to be outputted in a random shuffle so that even the order is random. So far what I understood is that the RDDs do have a type of order, in that the order for spark streaming RDDs would be the order in which spark streaming read the tuples from source (e.g. ordered by roughly when the producer sent the tuple in addition to any latency) On Mon, Nov 3, 2014 at 8:48 AM, Sean Owen so...@cloudera.com wrote: I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh
Re: Transforming the Dstream vs transforming each RDDs in the Dstream.
you know :-) -kr, Gerard. On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We have been implementing several Spark Streaming jobs that are basically processing data and inserting it into Cassandra, sorting it among different keyspaces. We've been following the pattern: dstream.foreachRDD(rdd = val records = rdd.map(elem = record(elem)) targets.foreach(target = records.filter{record = isTarget(target,record)}.writeToCassandra(target,table)) ) I've been wondering whether there would be a performance difference in transforming the dstream instead of transforming the RDD within the dstream with regards to how the transformations get scheduled. Instead of the RDD-centric computation, I could transform the dstream until the last step, where I need an rdd to store. For example, the previous transformation could be written as: val recordStream = dstream.map(elem = record(elem)) targets.foreach{target = recordStream.filter(record = isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} Would be a difference in execution and/or performance? What would be the preferred way to do this? Bonus question: Is there a better (more performant) way to sort the data in different buckets instead of filtering the data collection times the #buckets? thanks, Gerard. -- jay vyas
Re: real-time streaming
a REAL TIME stream, by definition, delivers data every X seconds. you can easily do this with spark. roughly here is the way to create a stream gobbler and attach a spark app to read its data every X seconds - Write a Runnable thread which reads data from a source. Test that it works independently. - Add that thread into a DStream Handler, and implement onStart() such that the thread above is launched in the onStart(), andadd logic to onStop() to safely destroy the above thread. - Set the window time (i.e. to 5 seconds) - Start your spark streaming context, and run a forEachRDD (...) in your spark app. - MAke sure that you launch with 2 or more workers. On Tue, Oct 28, 2014 at 1:44 PM, ll duy.huynh@gmail.com wrote: the spark tutorial shows that we can create a stream that reads new files from a directory. that seems to have some lag time, as we have to write the data to file first and then wait until spark stream picks it up. what is the best way to implement REAL 'REAL-TIME' streaming for analysis in real time? for example, like streaming videos, sounds, images, etc continuously? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/real-time-streaming-tp17526.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- jay vyas
Re: Streams: How do RDDs get Aggregated?
Hi Spark ! I found out why my RDD's werent coming through in my spark stream. It turns out you need the onStart() needs to return , it seems - i.e. you need to launch the worker part of your start process in a thread. For example def onStartMock():Unit ={ val future = new Thread(new Runnable() { def run() { for(x - 1 until 10) { val newMem = Runtime.getRuntime.freeMemory()/12188091; if(newMem != lastMem){ System.out.println(in thread : + newMem); } lastMem=newMem; store(mockStatus); } }}); Hope that helps somebody in the same situation. FYI Its in the docs :) * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { * def onStart() { * // Setup stuff (start threads, open sockets, etc.) to start receiving data. * // Must start new thread to receive data, as onStart() must be non-blocking. * * // Call store(...) in those threads to store received data into Spark's memory. * * // Call stop(...), restart(...) or reportError(...) on any thread based on how * // different errors needs to be handled. * * // See corresponding method documentation for more details * } * * def onStop() { * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. * } * } * }}}
Re: Streams: How do RDDs get Aggregated?
Oh - and one other note on this, which appears to be the case. If , in your stream forEachRDD implementation, you do something stupid (like call rdd.count()) tweetStream.foreachRDD((rdd,lent)= { tweetStream.repartition(1) numTweetsCollected+=1; //val count = rdd.count() DONT DO THIS ! You can also get stuck in a situation where your RDD processor blocks infinitely. And for twitter specific stuff, make sure to look at modifying the TwitterInputDStream class so that it implements the stuff from SPARK-2464, which can lead to infinite stream reopening as well. On Tue, Oct 21, 2014 at 11:02 AM, jay vyas jayunit100.apa...@gmail.com wrote: Hi Spark ! I found out why my RDD's werent coming through in my spark stream. It turns out you need the onStart() needs to return , it seems - i.e. you need to launch the worker part of your start process in a thread. For example def onStartMock():Unit ={ val future = new Thread(new Runnable() { def run() { for(x - 1 until 10) { val newMem = Runtime.getRuntime.freeMemory()/12188091; if(newMem != lastMem){ System.out.println(in thread : + newMem); } lastMem=newMem; store(mockStatus); } }}); Hope that helps somebody in the same situation. FYI Its in the docs :) * {{{ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { * def onStart() { * // Setup stuff (start threads, open sockets, etc.) to start receiving data. * // Must start new thread to receive data, as onStart() must be non-blocking. * * // Call store(...) in those threads to store received data into Spark's memory. * * // Call stop(...), restart(...) or reportError(...) on any thread based on how * // different errors needs to be handled. * * // See corresponding method documentation for more details * } * * def onStop() { * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. * } * } * }}} -- jay vyas
Re: How do you write a JavaRDD into a single file
sounds more like a use case for using collect... and writing out the file in your program? On Mon, Oct 20, 2014 at 6:53 PM, Steve Lewis lordjoe2...@gmail.com wrote: Sorry I missed the discussion - although it did not answer the question - In my case (and I suspect the askers) the 100 slaves are doing a lot of useful work but the generated output is small enough to be handled by a single process. Many of the large data problems I have worked process a lot of data but end up with a single report file - frequently in a format specified by preexisting downstream code. I do not want a separate hadoop merge step for a lot of reasons starting with better control of the generation of the file. However toLocalIterator is exactly what I need. Somewhat off topic - I am being overwhelmed by getting a lot of emails from the list - is there s way to get a daily summary which might be a lot easier to keep up with On Mon, Oct 20, 2014 at 3:23 PM, Sean Owen so...@cloudera.com wrote: This was covered a few days ago: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-write-a-RDD-into-One-Local-Existing-File-td16720.html The multiple output files is actually essential for parallelism, and certainly not a bad idea. You don't want 100 distributed workers writing to 1 file in 1 place, not if you want it to be fast. RDD and JavaRDD already expose a method to iterate over the data, called toLocalIterator. It does not require that the RDD fit entirely in memory. On Mon, Oct 20, 2014 at 6:13 PM, Steve Lewis lordjoe2...@gmail.com wrote: At the end of a set of computation I have a JavaRDDString . I want a single file where each string is printed in order. The data is small enough that it is acceptable to handle the printout on a single processor. It may be large enough that using collect to generate a list might be unacceptable. the saveAsText command creates multiple files with names like part, part0001 This was bed behavior in Hadoop for final output and is also bad for Spark. A more general issue is whether is it possible to convert a JavaRDD into an iterator or iterable over then entire data set without using collect or holding all data in memory. In many problems where it is desirable to parallelize intermediate steps but use a single process for handling the final result this could be very useful. -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- jay vyas
Streams: How do RDDs get Aggregated?
Hi spark ! I dont quite yet understand the semantics of RDDs in a streaming context very well yet. Are there any examples of how to implement CustomInputDStreams, with corresponding Receivers in the docs ? Ive hacked together a custom stream, which is being opened and is consuming data internally, however, it is not empty RDDs, even though I am calling store(...) mutliple times - however, Im relying on the default implementation of store(...) which may be a mistake on my end. By making my slide duration small, I can make sure that indeed the job finishes - however im not quite sure how we're supposed to shuttle data from the ReceiverInputDStream into RDDs ? Thanks!
Re: Does Ipython notebook work with spark? trivial example does not work. Re: bug with IPython notebook?
PySpark definetly works for me in ipython notebook. A good way to debug is do setMaster(local) in your python sc object, see if that works. Then from there, modify it to point to the real spark server. Also, I added a hack where i did sys.path.insert the path to pyspark in my python note book to get it working properly. You can try these instructions out if you want which i recently put together based on some other stuff online + a few minor modifications . http://jayunit100.blogspot.com/2014/07/ipython-on-spark.html On Thu, Oct 9, 2014 at 2:50 PM, Andy Davidson a...@santacruzintegration.com wrote: I wonder if I am starting iPython notebook incorrectly. The example in my original email does not work. It looks like stdout is not configured correctly If I submit it as a python.py file It works correctly Any idea how I what the problem is? Thanks Andy From: Andrew Davidson a...@santacruzintegration.com Date: Tuesday, October 7, 2014 at 4:23 PM To: user@spark.apache.org user@spark.apache.org Subject: bug with IPython notebook? Hi I think I found a bug in the iPython notebook integration. I am not sure how to report it I am running spark-1.1.0-bin-hadoop2.4 on an AWS ec2 cluster. I start the cluster using the launch script provided by spark I start iPython notebook on my cluster master as follows and use an ssh tunnel to open the notebook in a browser running on my local computer ec2-user@ip-172-31-20-107 ~]$ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 /root/spark/bin/pyspark Bellow is the code my notebook executes Bug list: 1. Why do I need to create a SparkContext? If I run pyspark interactively The context is created automatically for me 2. The print statement causes the output to be displayed in the terminal I started pyspark, not in the notebooks output Any comments or suggestions would be greatly appreciated Thanks Andy import sys from operator import add from pyspark import SparkContext # only stand alone jobs should create a SparkContext sc = SparkContext(appName=pyStreamingSparkRDDPipe”) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def echo(data): print python recieved: %s % (data) # output winds up in the shell console in my cluster (ie. The machine I launched pyspark from) rdd.foreach(echo) print we are done -- jay vyas
Re: Spark inside Eclipse
For intelliJ + SBT, also you can follow the directions http://jayunit100.blogspot.com/2014/07/set-up-spark-application-devleopment.html . ITs really easy to run spark in an IDE . The process for eclipse is virtually identical. On Fri, Oct 3, 2014 at 10:03 AM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: cool thanks will set this up and report back how things went regards sanjay -- *From:* Daniel Siegmann daniel.siegm...@velos.io *To:* Ashish Jain ashish@gmail.com *Cc:* Sanjay Subramanian sanjaysubraman...@yahoo.com; user@spark.apache.org user@spark.apache.org *Sent:* Thursday, October 2, 2014 6:52 AM *Subject:* Re: Spark inside Eclipse You don't need to do anything special to run in local mode from within Eclipse. Just create a simple SparkConf and create a SparkContext from that. I have unit tests which execute on a local SparkContext, and they work from inside Eclipse as well as SBT. val conf = new SparkConf().setMaster(local).setAppName(sWhatever) val sc = new SparkContext(sparkConf) Keep in mind you can only have one local SparkContext at a time, otherwise you will get some weird errors. If you have tests running sequentially, make sure to close the SparkContext in your tear down method. If tests run in parallel you'll need to share the SparkContext between tests. For unit testing, you can make use of SparkContext.parallelize to set up your test inputs and RDD.collect to retrieve the outputs. On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain ashish@gmail.com wrote: Hello Sanjay, This can be done, and is a very effective way to debug. 1) Compile and package your project to get a fat jar 2) In your SparkConf use setJars and give location of this jar. Also set your master here as local in SparkConf 3) Use this SparkConf when creating JavaSparkContext 4) Debug your program like you would any normal program. Hope this helps. Thanks Ashish On Oct 1, 2014 4:35 PM, Sanjay Subramanian sanjaysubraman...@yahoo.com.invalid wrote: hey guys Is there a way to run Spark in local mode from within Eclipse. I am running Eclipse Kepler on a Macbook Pro with Mavericks Like one can run hadoop map/reduce applications from within Eclipse and debug and learn. thanks sanjay -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- jay vyas
Re: Unit Testing (JUnit) with Spark
I've been working some on building spark blueprints, and recently tried to generalize one for easy blueprints of spark apps. https://github.com/jayunit100/SparkBlueprint.git It runs the spark app's main method in a unit test, and builds in SBT. You can easily try it out and improve on it. Obviously, calling a main method is the wrong kind of coupling for a unit test, but it works pretty good in a simple CI environment. I'll improve it eventually by injecting the SparkContext and validating the RDD directly, in a next iteration. Pull requests welcome :) On Tue, Jul 29, 2014 at 11:29 AM, soumick86 sdasgu...@dstsystems.com wrote: Is there any example out there for unit testing a Spark application in Java? Even a trivial application like word count will be very helpful. I am very new to this and I am struggling to understand how I can use JavaSpark Context for JUnit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- jay vyas
Spark over graphviz (SPARK-1015, SPARK-975)
Hi spark. I see there has been some work around graphviz visualization for spark jobs. 1) I'm wondering if anyone actively maintaining this stuff, and if so what the best docs are for it - or else, if there is interest in an upstream JIRA for updating the graphviz APIs it. 2) Also, am curious about utilities for visualizing/optimizing the flow of data through an RDD at runtime and where those are in the existing codebase. Any thoughts around pipeline visualization for spark would be appreciated. I see some conversations about it in JIRAs but not sure what the future is for this , possibly I could lend a hand if there are any loose ends needing to be tied. -- jay vyas
RDD.pipe(...)
According to the api docs for the pipe operator, def pipe(command: String): RDD http://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/rdd/RDD.html [String]: Return an RDD created by piping elements to a forked external process. However, its not clear to me: Will the outputted RDD capture the standard out from the process as its output (i assume that is the most common implementation)? Incidentally, I have not been able to use the pipe command to run an external process yet, so any hints on that would be appreciated. -- jay vyas
Re: RDD.pipe(...)
Nevermind :) I found my answer in the docs for the PipedRDD /** * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. */ private[spark] class PipedRDD[T: ClassTag]( So, this is essentially an implementation of something analgous to hadoop's streaming api. On Sun, Jul 20, 2014 at 4:09 PM, jay vyas jayunit100.apa...@gmail.com wrote: According to the api docs for the pipe operator, def pipe(command: String): RDD http://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/rdd/RDD.html [String]: Return an RDD created by piping elements to a forked external process. However, its not clear to me: Will the outputted RDD capture the standard out from the process as its output (i assume that is the most common implementation)? Incidentally, I have not been able to use the pipe command to run an external process yet, so any hints on that would be appreciated. -- jay vyas -- jay vyas
Re: Error with spark-submit (formatting corrected)
I think I know what is happening to you. I've looked some into this just this week, and so its fresh in my brain :) hope this helps. When no workers are known to the master, iirc, you get this message. I think this is how it works. 1) You start your master 2) You start a slave, and give it master url as an argument. 3) The slave then binds to a random port 4) The slave then does a handshake with master, which you can see in the slave logs (it sais something like sucesfully connected to master at …. Actualy, i think tha master also logs that it now is aware of a slave running on ip:port… So in your case, I suspect, none of the slaves have connected to the master, so the job sits idle. This is similar to the yarn scenario of submitting a job to a resource manager with no node-managers running. On Jul 17, 2014, at 6:57 PM, ranjanp piyush_ran...@hotmail.com wrote: Hi, I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2 workers) cluster. From the Web UI at the master, I see that the workers are registered. But when I try running the SparkPi example from the master node, I get the following message and then an exception. 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master spark://10.1.3.7:7077... 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I searched a bit for the above warning, and found and found that others have encountered this problem before, but did not see a clear resolution except for this link: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444 Based on the suggestion there I tried supplying --executor-memory option to spark-submit but that did not help. Any suggestions. Here are the details of my set up. - 3 nodes (each with 4 CPU cores and 7 GB memory) - 1 node configured as Master, and the other two configured as workers - Firewall is disabled on all nodes, and network communication between the nodes is not a problem - Edited the conf/spark-env.sh on all nodes to set the following: SPARK_WORKER_CORES=3 SPARK_WORKER_MEMORY=5G - The Web UI as well as logs on master show that Workers were able to register correctly. Also the Web UI correctly shows the aggregate available memory and CPU cores on the workers: URL: spark://vmsparkwin1:7077 Workers: 2 Cores: 6 Total, 0 Used Memory: 10.0 GB Total, 0.0 B Used Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE I try running the SparkPi example first using the run-example (which was failing) and later directly using the spark-submit as shown below: $ export MASTER=spark://vmsparkwin1:7077 $ echo $MASTER spark://vmsparkwin1:7077 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10 The following is the full screen output: 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser) 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started 14/07/17 01:20:14 INFO Remoting: Starting remoting 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839] 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at C:\cygwin\tmp\spark-local-20140717012014-b606 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842) 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49842 with 294.9 MB RAM 14/07/17 01:20:14 INFO BlockManagerMaster: Registered BlockManager 14/07/17 01:20:14 INFO HttpServer: Starting HTTP Server 14/07/17 01:20:14 INFO HttpBroadcast: Broadcast server started at http://10.1.3.7:49843 14/07/17 01:20:14 INFO HttpFileServer: HTTP File server directory is
Re: SPARK_WORKER_PORT (standalone cluster)
Now I see the answer to this. Spark slaves are start on random ports, and tell the master where they are. then the master acknowledges them. (worker logs) Starting Spark worker :43282 (master logs) Registering worker on :43282 with 8 cores, 16.5 GB RAM Thus, the port is random because the slaves can be ephemeral. Since the master is fixed, though, a new slave can reconnect at any time. On Mon, Jul 14, 2014 at 10:01 PM, jay vyas jayunit100.apa...@gmail.com wrote: Hi spark ! What is the purpose of the randomly assigned SPARK_WORKER_PORT from the documentation it sais to join a cluster, but its not clear to me how a random port could be used to communicate with other members of a spark pool. This question might be grounded in my ignorance ... if so please just point me to the right documentation if im mising something obvious :) thanks ! -- jay vyas -- jay vyas
SPARK_WORKER_PORT (standalone cluster)
Hi spark ! What is the purpose of the randomly assigned SPARK_WORKER_PORT from the documentation it sais to join a cluster, but its not clear to me how a random port could be used to communicate with other members of a spark pool. This question might be grounded in my ignorance ... if so please just point me to the right documentation if im mising something obvious :) thanks ! -- jay vyas