Re: Communication between Driver and Executors
Hi, so I didn't manage to get the Broadcast variable with a new value distributed to my executors in YARN mode. In local mode it worked fine, but when running on YARN either nothing happened (when unpersist() was called on the driver) or I got a TimeoutException (when called on the executor). I finally dropped the use of broadcast variables and added a HTTP polling mechanism from the executors to the driver. I find that a bit suboptimal, in particular since there is this whole Akka infrastructure already running and I should be able to just send messages around. However, Spark does not seem to encourage this. (In general I find that private is a bit overused in the Spark codebase...) Thanks Tobias
Re: Is there setup and cleanup function in spark?
Hi, On Fri, Nov 14, 2014 at 2:49 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, then we need another trick. let's have an *implicit lazy var connection/context* around our code. And setup() will trigger the eval and initialization. Due to lazy evaluation, I think having setup/teardown is a bit tricky. In particular teardown, because it is not easy to execute code after all computation is done. You can check http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p11009.html for an example of what worked for me. Tobias
Re: Communication between Driver and Executors
Hi, On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: I wonder if SparkConf is dynamically updated on all worker nodes or only during initialization. It can be used to piggyback information. Otherwise I guess you are stuck with Broadcast. Primarily I have had these issues moving legacy MR operators to Spark where MR piggybacks on Hadoop conf pretty heavily, in spark Native application its rarely required. Do you have a usecase like that? My usecase is http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html – that is, notifying my Spark executors that the StreamingContext has been shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the actual execution, just all the Spark-internal timers etc.) I need to do this properly or processing will go on for a very long time. I have been trying to mis-use broadcast as in - create a class with a boolean var, set to true - query this boolean on the executors as a prerequisite to process the next item - when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery). This is very dirty, but it works with a local[*] master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors. Any idea what could go wrong on YARN with this approach – or what is a good way to do this? Thanks Tobias
Re: Communication between Driver and Executors
Hi again, On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer t...@preferred.jp wrote: I have been trying to mis-use broadcast as in - create a class with a boolean var, set to true - query this boolean on the executors as a prerequisite to process the next item - when I want to shutdown, I set the boolean to false and unpersist the broadcast variable (which will trigger re-delivery). This is very dirty, but it works with a local[*] master. Unfortunately, when deployed on YARN, the new value will never arrive at my executors. In fact, it seems as if change mutable object (like mutable list) and unpersist in order to trigger redeploy only works locally. When running on YARN, even after an unpersist, the value will always be identical to what I shipped first. Now I wonder what unpersist actually does in that case. Must I call unpersist from an executor or from the driver? Thanks Tobias
StreamingContext does not stop
Hi, I am processing a bunch of HDFS data using the StreamingContext (Spark 1.1.0) which means that all files that exist in the directory at start() time are processed in the first batch. Now when I try to stop this stream processing using `streamingContext.stop(false, false)` (that is, even with stopGracefully = false), it has no effect. The stop() call blocks and data processing continues (probably it would stop after the batch, but that would be too long since all my data is in that batch). I am not exactly sure if this is generally true or only for the first batch. Also I observed that stopping the stream processing during the first batch does occasionally lead to a very long time until the stop takes place (even if there is no data present at all). Has anyone experienced something similar? In my processing code, do I have to do something particular (like checking for the state of the StreamingContext) to allow the interruption? It is quite important for me that stopping the stream processing takes place rather quickly. Thanks Tobias
Re: StreamingContext does not stop
Hi, I guess I found part of the issue: I said dstream.transform(rdd = { rdd.foreachPartition(...); rdd }) instead of dstream.transform(rdd = { rdd.mapPartitions(...) }), that's why stop() would not stop the processing. Now with the new version a non-graceful shutdown works in the sense that Spark does not wait for my processing to complete; job generator, job scheduler, job executor etc. all seem to be shut down fine, just the threads that do the actual processing are not. Even after streamingContext.stop() is done, I see logging output from my processing task. Is there any way to signal to my processing tasks that they should stop the processing? Thanks Tobias
Re: Spark streaming cannot receive any message from Kafka
Bill, However, when I am currently using Spark 1.1.0. the Spark streaming job cannot receive any messages from Kafka. I have not made any change to the code. Do you see any suspicious messages in the log output? Tobias
Re: Strange behavior of spark-shell while accessing hdfs
Hi, On Tue, Nov 11, 2014 at 2:04 PM, hmxxyy hmx...@gmail.com wrote: If I run bin/spark-shell without connecting a master, it can access a hdfs file on a remote cluster with kerberos authentication. [...] However, if I start the master and slave on the same host and using bin/spark-shell --master spark://*.*.*.*:7077 run the same commands [... ] org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: *.*.*.*.com/98.138.236.95; destination host is: *.*.*.*:8020; When you give no master, it is local[*], so Spark will (implicitly?) authenticate to HDFS from your local machine using local environment variables, key files etc., I guess. When you give a spark://* master, Spark will run on a different machine, where you have not yet authenticated to HDFS, I think. I don't know how to solve this, though, maybe some Kerberos token must be passed on to the Spark cluster? Tobias
Re: Best practice for multi-user web controller in front of Spark
Hi, also there is Spindle https://github.com/adobe-research/spindle which was introduced on this list some time ago. I haven't looked into it deeply, but you might gain some valuable insights from their architecture, they are also using Spark to fulfill requests coming from the web. Tobias
Re: filtering out non English tweets using TwitterUtils
Hi, On Wed, Nov 12, 2014 at 5:42 AM, SK skrishna...@gmail.com wrote: But getLang() is one of the methods of twitter4j.Status since version 3.0.6 according to the doc at: http://twitter4j.org/javadoc/twitter4j/Status.html#getLang-- What version of twitter4j does Spark Streaming use? 3.0.3 https://github.com/apache/spark/blob/master/external/twitter/pom.xml#L53 Tobias
Re: convert ListString to dstream
Josh, On Tue, Nov 11, 2014 at 7:43 AM, Josh J joshjd...@gmail.com wrote: I have some data generated by some utilities that returns the results as a ListString. I would like to join this with a Dstream of strings. How can I do this? I tried the following though get scala compiler errors val list_scalaconverted = ssc.sparkContext.parallelize(listvalues.toArray()) Your `listvalues` seems to be a java.util.List, not a scala.collection.immutable.List, right? In that case, toArray() will return a Array[Object], not an Array[String], which leads to the error you see. Have a look at http://www.scala-lang.org/api/current/index.html#scala.collection.JavaConversions$ and convert your Java list to a Scala list. Tobias
Re: Mapping SchemaRDD/Row to JSON
Akshat On Tue, Nov 11, 2014 at 4:12 AM, Akshat Aranya aara...@gmail.com wrote: Does there exist a way to serialize Row objects to JSON. I can't think of any other way than the one you proposed. A Row is more or less an Array[Object], so you need to read JSON key and data type from the schema. Tobias
Re: netty on classpath when using spark-submit
Hi, On Wed, Nov 5, 2014 at 10:23 AM, Tobias Pfeiffer wrote: On Tue, Nov 4, 2014 at 8:33 PM, M. Dale wrote: From http://spark.apache.org/docs/latest/configuration.html it seems that there is an experimental property: spark.files.userClassPathFirst Thank you very much, I didn't know about this. Unfortunately, it doesn't change anything. With this setting both true and false (as indicated by the Spark web interface) and no matter whether local[N] or yarn-client or yarn-cluster mode are used with spark-submit, the classpath looks the same and the netty class is loaded from the Spark jar. Can I use this setting with spark-submit at all? Has anyone used this setting successfully or can advice me on how to use it correctly? Thanks Tobias
Re: netty on classpath when using spark-submit
Markus, thanks for your help! On Tue, Nov 4, 2014 at 8:33 PM, M. Dale medal...@yahoo.com.invalid wrote: Tobias, From http://spark.apache.org/docs/latest/configuration.html it seems that there is an experimental property: spark.files.userClassPathFirst Thank you very much, I didn't know about this. Unfortunately, it doesn't change anything. With this setting both true and false (as indicated by the Spark web interface) and no matter whether local[N] or yarn-client or yarn-cluster mode are used with spark-submit, the classpath looks the same and the netty class is loaded from the Spark jar. Can I use this setting with spark-submit at all? Thanks Tobias
Re: hadoop_conf_dir when running spark on yarn
Hi, On Mon, Nov 3, 2014 at 1:29 PM, Amey Chaugule ambr...@gmail.com wrote: I thought that only applied when you're trying to run a job using spark-submit or in the shell... And how are you starting your Yarn job, if not via spark-submit? Tobias
Re: different behaviour of the same code
Hi, On Fri, Oct 31, 2014 at 4:31 PM, lieyan lie...@yahoo.com wrote: The code are here: LogReg.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/LogReg.scala Then I click the Run button of the IDEA, and I get the following error message errlog.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/errlog.txt . But when I export the jar file, and use *spark-submit --class net.yanl.spark.LogReg log_reg.jar 15*. The program works finely. I have not used the spark built-in cluster manager and I don't know how application jar distribution is done in it. However, it seems to me that when you use spark-submit, then spark-submit takes care of distributing your jar file properly to all the cluster nodes, that's why it works fine. When you run it from your IDE, it seems not to do that, that's why some classes are not there on all cluster nodes and you run into ClassNotFoundExceptions. If you change the master to local[3] instead of spark://master.local:7077 and run it from IDEA, does it work? Tobias
netty on classpath when using spark-submit
Hi, I tried hard to get a version of netty into my jar file created with sbt assembly that works with all my libraries. Now I managed that and was really happy, but it seems like spark-submit puts an older version of netty on the classpath when submitting to a cluster, such that my code ends up with an NoSuchMethodError: Code: val a = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, http://localhost;) val f = new File(a.getClass.getProtectionDomain(). getCodeSource().getLocation().getPath()) println(f.getAbsolutePath) println(headers: + a.headers()) When executed with sbt run: ~/.ivy2/cache/io.netty/netty/bundles/netty-3.9.4.Final.jar headers: org.jboss.netty.handler.codec.http.DefaultHttpHeaders@64934069 When executed with spark-submit: ~/spark-1.1.0-bin-hadoop2.4/lib/spark-assembly-1.1.0-hadoop2.4.0.jar Exception in thread main java.lang.NoSuchMethodError: org.jboss.netty.handler.codec.http.DefaultHttpRequest.headers()Lorg/jboss/netty/handler/codec/http/HttpHeaders; ... How can I get the old netty version off my classpath? Thanks Tobias
Re: NonSerializable Exception in foreachRDD
Harold, just mentioning it in case you run into it: If you are in a separate thread, there are apparently stricter limits to what you can and cannot serialize: val someVal future { // be very careful with defining RDD operations using someVal here val myLocalVal = someVal // use myLocalVal instead } On Thu, Oct 30, 2014 at 4:55 PM, Harold Nguyen har...@nexgate.com wrote: In Spark Streaming, when I do foreachRDD on my DStreams, I get a NonSerializable exception when I try to do something like: DStream.foreachRDD( rdd = { var sc.parallelize(Seq((test, blah))) }) Is this the code you are actually using? var sc.parallelize(...) doesn't really look like valid Scala to me. Tobias
spark-submit results in NoClassDefFoundError
Hi, I am trying to get my Spark application to run on YARN and by now I have managed to build a fat jar as described on http://markmail.org/message/c6no2nyaqjdujnkq (which is the only really usable manual on how to get such a jar file). My code runs fine using sbt test and sbt run, but when running ~/spark-1.1.0-bin-hadoop2.4/bin/spark-submit \ --class my.spark.MyClass --master local[3] \ target/scala-2.10/myclass-assembly-1.0.jar I get: Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoClassDefFoundError: com/typesafe/scalalogging/slf4j/Logger at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2615) at java.lang.Class.getMethod0(Class.java:2856) at java.lang.Class.getMethod(Class.java:1668) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.typesafe.scalalogging.slf4j.Logger at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 7 more ABRT problem creation: 'success' It seems to run into an error before it does anything with my jar? I am using com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2 instead of com.typesafe %% scalalogging-slf4j% 1.1.0 in my SBT file, could that be a reason? Thanks Tobias
Re: spark-submit results in NoClassDefFoundError
Hi again, On Thu, Oct 30, 2014 at 11:50 AM, Tobias Pfeiffer t...@preferred.jp wrote: Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoClassDefFoundError: com/typesafe/scalalogging/slf4j/Logger It turned out scalalogging was not included in the fat jar due to https://github.com/sbt/sbt-assembly/issues/116. I am using com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2 instead of com.typesafe %% scalalogging-slf4j% 1.1.0 in my SBT file, could that be a reason? So yes, that was the reason, in a way... however, I decided to include scala in the fat jar instead of modifying all my logging code... Tobias
Re: Processing order in Spark
Sean, thanks, I didn't know about repartitionAndSortWithinPartitions, that seems very helpful! Tobias
Combined HDFS/Kafka Processing
Hi, I have a setting where data arrives in Kafka and is stored to HDFS from there (maybe using Camus or Flume). I want to write a Spark Streaming app where - first all files in a that HDFS directory are processed, - and then the stream from Kafka is processed, starting with the first item that was not yet in HDFS. The order of the data is somehow important, so I should really *first* do the HDFS processing (which might take a while, by the way) and *then* start stream processing. Does anyone have any suggestions on how to implement this? Should I write a custom receiver, a custom input stream, can I just use built-in mechanisms? I would be happy to learn about any ideas. Thanks Tobias
Re: Record-at-a-time model for Spark Streaming
Jianneng, On Wed, Oct 8, 2014 at 8:44 AM, Jianneng Li jiannen...@berkeley.edu wrote: I understand that Spark Streaming uses micro-batches to implement streaming, while traditional streaming systems use the record-at-a-time processing model. The performance benefit of the former is throughput, and the latter is latency. I'm wondering what it would take to implement record-at-a-time for Spark Streaming? Would it be something that is feasible to prototype in one or two months? I think this is so much against the fundamental design concept of Spark Streaming that there would be nothing left of Spark Streaming when you are done with it. Spark is fundamentally based on the idea of an RDD, that is, distributed storage of data, and Spark Streaming basically a wrapper that stores incoming data as an RDD and then processes it as a batch. One item at a time does not match this model. Even if you *were* able to prototype something, I think performance would be abysmal. Tobias
Re: dynamic sliding window duration
Hi, On Wed, Oct 8, 2014 at 4:50 AM, Josh J joshjd...@gmail.com wrote: I have a source which fluctuates in the frequency of streaming tuples. I would like to process certain batch counts, rather than batch window durations. Is it possible to either 1) define batch window sizes Cf. http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-td2085.html 2) dynamically adjust the duration of the sliding window? That's not possible AFAIK, because you can't change anything in the processing pipeline after StreamingContext has been started. Tobias
Re: Using GraphX with Spark Streaming?
Arko, On Sat, Oct 4, 2014 at 1:40 AM, Arko Provo Mukherjee arkoprovomukher...@gmail.com wrote: Apologies if this is a stupid question but I am trying to understand why this can or cannot be done. As far as I understand that streaming algorithms need to be different from batch algorithms as the streaming algorithms are generally incremental. Hence the question whether the RDD transformations can be extended to streaming or not. I don't think that streaming algorithms are generally incremental in Spark Streaming. In fact, data is collected and every N seconds (minutes/...), the data collected during that interval is batch-processed as with normal batch operations. In fact, using data previously obtained from the stream (in previous intervals) is a bit more complicated than plain batch processing. If the graph you want to create only uses data from one interval/batch, that should be dead simple. You might want to have a look at https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams Tobias
All-time stream re-processing
Hi, I have a setup (in mind) where data is written to Kafka and this data is persisted in HDFS (e.g., using camus) so that I have an all-time archive of all stream data ever received. Now I want to process that all-time archive and when I am done with that, continue with the live stream, using Spark Streaming. (In a perfect world, Kafka would have infinite storage and I would always use the Kafka receiver, starting from offset 0.) Does anyone have an idea how to realize such a setup? Would I write a custom receiver that first reads the HDFS file and then connects to Kafka? Is there an existing solution for that use case? Thanks Tobias
Re: All-time stream re-processing
Hi, On Wed, Sep 24, 2014 at 7:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: So you have a single Kafka topic which has very high retention period ( that decides the storage capacity of a given Kafka topic) and you want to process all historical data first using Camus and then start the streaming process ? I don't necessarily want to process the historical data using Camus, but I want to keep it forever (longer than Kafka's retention period) and process the stored data and the stream. (I don't really care about how the data got into HDFS, be it Camus or something else, but I assume that Kafka can't store it forever.) Imagine that I receive all tweets posted to Twitter, they go into my Kafka instance and are archived to HDFS. Now a user logs in and I want to display to that user a) all posts that have ever mentioned him/her and b) continue to update that list from the current stream. (In that order.) This happens for a number of users, so it's a process that needs to be repeatable with different Spark operations. The challenge is, Camus and Spark are two different consumer for Kafka topic and both maintains their own consumed offset different way. Camus stores offset in HDFS, and Spark Consumer in ZK. What I understand, you need something which identify till which point Camus pulled ( for a given partitions of topic) and want to start Spark receiver from there ? I think I need such a thing. Also, I think Camus stores those offsets, so in theory it should be possible to consume all HDFS files, read the offset, then start Kafka processing from that offset. That sounds very lambda architecture-ish to me, so I was wondering if someone has realized a similar setup. Thanks Tobias
Re: rsync problem
Hi, I assume you unintentionally did not reply to the list, so I'm adding it back to CC. How do you submit your job to the cluster? Tobias On Thu, Sep 25, 2014 at 2:21 AM, rapelly kartheek kartheek.m...@gmail.com wrote: How do I find out whether a node in the cluster is a master or slave?? Till now I was thinking that slaves file under the conf folder makes the difference. Also, the MASTER_MASTER_IP in the spark-env.sh file. what else differentiates a slave from the master?? On Wed, Sep 24, 2014 at 10:46 PM, rapelly kartheek kartheek.m...@gmail.com wrote: The job execution is taking place perfectly. Previously, all my print statements used to be stored in spark/work/*/stdout file. But, now after doing the rsync, I find that none of the prtint statements are getting reflected in the stdout file under work folder. But, when I go to the code, I find the statements in the code. But, they are not reflected into the stdout file as before. Can you please tell me where I went wrong. All I want is to see my mofication in the code getting relected in output . On Wed, Sep 24, 2014 at 10:22 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I have a very important and fundamental doubt: I have rsynced the entire spark folder from the master to all slaves in the cluster. When I execute a job, its working perfectly. But, when I rsync the entire spark folder of the master to all the slaves, is it not that I am sending the master configurations to all the slaves and making the slaves behave like master?? First of all, is it correct to rsync the entire spark folder?? But, if I change only one file, then how do I rsync it to all?? On Fri, Sep 19, 2014 at 8:44 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you Soumya Simantha and Tobias. I've deleted the contents of the work folder in all the nodes. Now its working perfectly as it was before. Thank you Karthik On Fri, Sep 19, 2014 at 4:46 PM, Soumya Simanta soumya.sima...@gmail.com wrote: One possible reason is maybe that the checkpointing directory $SPARK_HOME/work is rsynced as well. Try emptying the contents of the work folder on each node and try again. On Fri, Sep 19, 2014 at 4:53 AM, rapelly kartheek kartheek.m...@gmail.com wrote: I * followed this command:rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname:* *path/to/destdirectory. Anyway, for now, I did it individually for each node.* I have copied to each node at a time individually using the above command. So, I guess the copying may not contain any mixture of files. Also, as of now, I am not facing any MethodNotFound exceptions. But, there is no job execution taking place. After sometime, one by one, each goes down and the cluster shuts down. On Fri, Sep 19, 2014 at 2:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:17 PM, rapelly kartheek kartheek.m...@gmail.com wrote: , * you have copied a lot of files from various hosts to username@slave3:path* only from one node to all the other nodes... I don't think rsync can do that in one command as you described. My guess is that now you have a wild mixture of jar files all across your cluster which will lead to fancy exceptions like MethodNotFound etc., that's maybe why your cluster is not working correctly. Tobias
Re: how long does it take executing ./sbt/sbt assembly
Hi, http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-building-project-with-sbt-assembly-is-extremely-slow-td13152.html -- Maybe related to this? Tobias
Re: rsync problem
Hi, On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: This worked perfectly. But, I wanted to simultaneously rsync all the slaves. So, added the other slaves as following: rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname :path/to/destdirectory username@slave2:path username@slave3:path and so on. The rsync man page says rsync [OPTION...] SRC... [USER@]HOST:DEST so as I understand your command, you have copied a lot of files from various hosts to username@slave3:path. I don't think rsync can copy to various locations at once. Tobias
spark-submit: fire-and-forget mode?
Hi, I am wondering: Is it possible to run spark-submit in a mode where it will start an application on a YARN cluster (i.e., driver and executors run on the cluster) and then forget about it in the sense that the Spark application is completely independent from the host that ran the spark-submit command and will not be affected if that controlling machine shuts down etc.? I was using spark-submit with YARN in cluster mode, but spark-submit stayed in the foreground and as far as I understood, it terminated the application on the cluster when spark-submit was Ctrl+C'ed. Thanks Tobias
Re: spark-submit: fire-and-forget mode?
Hi, thanks for everyone's replies! On Thu, Sep 18, 2014 at 7:37 AM, Sandy Ryza sandy.r...@cloudera.com wrote: YARN cluster mode should have the behavior you're looking for. The client process will stick around to report on things, but should be able to be killed without affecting the application. If this isn't the behavior you're observing, and your application isn't failing for a different reason, there's a bug. Sandy, yes, you are right; I must have mis-interpreted some results/behavior when I was trying this before. On Thu, Sep 18, 2014 at 1:19 PM, Andrew Or and...@databricks.com wrote: Thanks Tobias, I have filed a JIRA for it. Great, thanks for opening the issue! I think that's a very useful thing to have. Tobias
Re: diamond dependency tree
Hi, On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Is it possible to express a diamond DAG and have the leaf dependency evaluate only once? Well, strictly speaking your graph is not a tree, and also the meaning of leaf is not totally clear, I'd say. So say data flows left to right (and the dependencies are oriented right to left): [image: Inline image 1] Is it possible to run d.collect() and have a evaluate its iterator only once? If you say a.cache() (or a.persist()) then it will be evaluated only once and then the cached data will be used for later accesses. Tobias
Re: Multi-tenancy for Spark (Streaming) Applications
Hi, by now I understood maybe a bit better how spark-submit and YARN play together and how Spark driver and slaves play together on YARN. Now for my usecase, as described on https://spark.apache.org/docs/latest/submitting-applications.html, I would probably have a end-user-facing gateway that submits my Spark (Streaming) application to the YARN cluster in yarn-cluster mode. I have a couple of questions regarding that setup: * That gateway does not need to be written in Scala or Java, it actually has no contact with the Spark libraries; it is just executing a program on the command line (./spark-submit ...), right? * Since my application is a streaming application, it won't finish by itself. What is the best way to terminate the application on the cluster from my gateway program? Can I just send SIGTERM to the spark-submit program? Is it recommended? * I guess there are many possibilities to achieve that, but what is a good way to send commands/instructions to the running Spark application? If I want to push some commands from the gateway to the Spark driver, I guess I need to get its IP address - how? If I want the Spark driver to pull its instructions, what is a good way to do so? Any suggestions? Thanks, Tobias
Re: Announcing Spark 1.1.0!
Hi, On Fri, Sep 12, 2014 at 9:12 AM, Patrick Wendell pwend...@gmail.com wrote: I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is the second release on the API-compatible 1.X line. It is Spark's largest release ever, with contributions from 171 developers! Great, congratulations!! The release notes read great! Seems like if I wait long enough for new Spark releases, my applications will build themselves in the end ;-) Tobias
Re: Spark Streaming and database access (e.g. MySQL)
Hi, On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote: if (rdd.take (1).size == 1) { rdd foreachPartition { iterator = I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)? Thanks Tobias
Re: Multi-tenancy for Spark (Streaming) Applications
Hi, On Thu, Sep 4, 2014 at 10:33 AM, Tathagata Das tathagata.das1...@gmail.com wrote: In the current state of Spark Streaming, creating separate Java processes each having a streaming context is probably the best approach to dynamically adding and removing of input sources. All of these should be able to to use a YARN cluster for resource allocation. So, for example, I would write a server application that accepts a command like createNewInstance and then calls spark-submit, pushing my actual application to the YARN cluster? Or could I use spark-jobserver? Thanks Tobias
Re: Spark streaming for synchronous API
Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
Re: Spark streaming for synchronous API
Hi, On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. That is not exactly a Spark Streaming use case, I think. Spark Streaming pulls data from some source (like a queue), then processes all data collected in a certain interval in a mini-batch, and stores that data somewhere. It is not well suited for handling request-response cycles in a synchronous way; you might consider using plain Spark (without Streaming) for that. For example, you could use the unfiltered http://unfiltered.databinder.net/Unfiltered.html library and within request handling do some RDD operation, returning the output as HTTP response. This works fine as multiple threads can submit Spark jobs concurrently https://spark.apache.org/docs/latest/job-scheduling.html You could also check https://github.com/adobe-research/spindle -- that seems to be similar to what you are doing. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Note that low latency for lots of calls is maybe not something that Spark was built for. Even if you do close to nothing data processing, you may not get below 200ms or so due to the overhead of submitting jobs etc., from my experience. Tobias
Re: Spark streaming for synchronous API
Hi, On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: So I guess where I was coming from was the assumption that starting up a new job to be listening on a particular queue topic could be done asynchronously. No, with the current state of Spark Streaming, all data sources and the processing pipeline must be fixed when you start your StreamingContext. You cannot add new data sources dynamically at the moment, see http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-for-Spark-Streaming-Applications-td13398.html For example, let’s say there’s a particular topic T1 in a Kafka queue. If I have a new set of requests coming from a particular client A, I was wondering if I could create a partition A. The streaming job is submitted to listen to T1.A and will write to a topic T2.A, which the REST endpoint would be listening on. That doesn't seem like a good way to use Kafka. It may be possible, but I am pretty sure you should create a new topic T_A instead of a partition A in an existing topic. With some modifications of Spark Streaming's KafkaReceiver you *might* be able to get it to work as you imagine, but it was not meant to be that way, I think. Also, you will not get low latency, because Spark Streaming processes data in batches of fixed interval length (say, 1 second) and in the worst case your query will wait up to 1 second before processing even starts. If I understand correctly what you are trying to do (which I am not sure about), I would probably recommend to choose a bit of a different architecture; in particular given that you cannot dynamically add data sources. Tobias
Re: Recursion
Hi, On Fri, Sep 5, 2014 at 6:16 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Does Spark support recursive calls? Can you give an example of which kind of recursion you would like to use? Tobias
Re: How to list all registered tables in a sql context?
Hi, On Sat, Sep 6, 2014 at 1:40 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Err... there's no such feature? The problem is that the SQLContext's `catalog` member is protected, so you can't access it from outside. If you subclass SQLContext, and make sure that `catalog` is always a `SimpleCatalog`, you can check `catalog.tables` (which is a HashMap). Tobias
Re: advice on spark input development - python or scala?
Hi, On Thu, Sep 4, 2014 at 11:49 PM, Johnny Kelsey jkkel...@semblent.com wrote: As a concrete example, we have a python class (part of a fairly large class library) which, as part of its constructor, also creates a record of itself in the cassandra key space. So we get an initialised class a row in a table on the cluster. My problem is this: should we even be doing this? I think the problem you describe is not related to any programming language. This is a design decision and/or good/bad programming, but it has nothing to do with Python or Scala, if I am not mistaken. Personally, I am a big fan of Scala because it's concise and provides me with type checking at compile time. However, Scala might be harder to learn than Python (in particular if you are already using Python) and while execution of Scala code may be faster, the compiler is a quite heavy (in terms of hardware requirements) and compile time is a bit lengthy, I'd say. Tobias
Re: RDDs
Hello, On Wed, Sep 3, 2014 at 6:02 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Can someone tell me what kind of operations can be performed on a replicated rdd?? What are the use-cases of a replicated rdd. I suggest you read https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds as an introduction, it lists a lot of the transformations and output operations you can use. Personally, I also found it quite helpful to read the paper about RDDs: http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf One basic doubt that is bothering me from long time: what is the difference between an application and job in the Spark parlance. I am confused b'cas of Hadoop jargon. OK, someone else might answer that. I am myself confused with application, job, task, stage etc. ;-) Tobias
Multi-tenancy for Spark (Streaming) Applications
Hi, I am not sure if multi-tenancy is the right word, but I am thinking about a Spark application where multiple users can, say, log into some web interface and specify a data processing pipeline with streaming source, processing steps, and output. Now as far as I know, there can be only one StreamingContext per JVM and also I cannot add sources or processing steps once it has been started. Are there any ideas/suggestinos for how to achieve a dynamic adding and removing of input sources and processing pipelines? Do I need a separate 'java' process per user? Also, can I realize such a thing when using YARN for dynamic allocation? Thanks Tobias
Re: Spark Streaming - how to implement multiple calculation using the same data set
Hi, On Wed, Sep 3, 2014 at 6:54 AM, salemi alireza.sal...@udo.edu wrote: I was able to calculate the individual measures separately and know I have to merge them and spark streaming doesn't support outer join yet. Can't you assign some dummy key (e.g., index) before your processing and then join on that key using a function from http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions ? Tobias
Re: Trying to run SparkSQL over Spark Streaming
Hi, On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1...@gmail.com wrote: If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Couldn't you do dstream1.join(dstream2).foreachRDD(...)? Tobias
Re: Trying to run SparkSQL over Spark Streaming
Hi again, On Tue, Aug 26, 2014 at 10:13 AM, Tobias Pfeiffer t...@preferred.jp wrote: On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1...@gmail.com wrote: If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Couldn't you do dstream1.join(dstream2).foreachRDD(...)? Ah, I guess you meant something like SELECT * FROM dstream1 JOIN dstream2 WHERE ...? I don't know if that is possible. Doesn't seem easy to me; I don't think that's doable with the current codebase... Tobias
Re: Spark Stream + HDFS Append
Hi, On Mon, Aug 25, 2014 at 9:56 AM, Dean Chen deanch...@gmail.com wrote: We are using HDFS for log storage where logs are flushed to HDFS every minute, with a new file created for each hour. We would like to consume these logs using spark streaming. The docs state that new HDFS will be picked up, but does Spark Streaming support HDFS appends? I don't think so. The docs at http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext say that even for new files, Files must be written to the monitored directory by 'moving' them from another location within the same file system. So I don't think you can just append to your files. Tobias
Re: multiple windows from the same DStream ?
Hi, computations are triggered by an output operation. No output operation, no computation. Therefore in your code example, On Thu, Aug 21, 2014 at 11:58 PM, Josh J joshjd...@gmail.com wrote: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); Duration windowLength = new Duration(3); Duration slideInterval = new Duration(3); JavaPairDStreamString,String windowMessages1 = messages.window(windowLength,slideInterval); JavaPairDStreamString,String windowMessages2 = messages.window(windowLength,slideInterval); nothing would actually happen. However, if you add output operations, you can use the same window multiple times (in which case caching the data might make sense). So if your windowLength and slideInterval are the same, then there would be no point in having two of them, you could just say: windowMessages1.saveAsHadoopFiles(...) // output operation 1 windowMessages1.print() // output operation 2 windowMessages1.map(someOtherFancyOperation).print() // output operation 3 after processing By default, these output operations are processed one after another. There is a undocumented parameter spark.streaming.concurrentJobs (cf. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-questions-td1494.html) that allows to run output operations in parallel. I haven't used it, though. Tobias
Re: Trying to run SparkSQL over Spark Streaming
Hi, On Thu, Aug 21, 2014 at 3:11 PM, praveshjain1991 praveshjain1...@gmail.com wrote: The part that you mentioned */the variable `result ` is of type DStream[Row]. That is, the meta-information from the SchemaRDD is lost and, from what I understand, there is then no way to learn about the column names of the returned data, as this information is only encoded in the SchemaRDD/* Because a DStream[Row] is basically like DStream[Array[Object]]. You lose all the information about data types in your result and there is no way to recover it, once the schema is inaccessible. If you want to process the data later on, you will have to check types and make assertions about the statements that were issued before etc. Tobias
Re: Trying to run SparkSQL over Spark Streaming
Hi, On Thu, Aug 21, 2014 at 2:19 PM, praveshjain1991 praveshjain1...@gmail.com wrote: Using Spark SQL with batch data works fine so I'm thinking it has to do with how I'm calling streamingcontext.start(). Any ideas what is the issue? Here is the code: Please have a look at http://apache-spark-user-list.1001560.n3.nabble.com/Some-question-about-SQL-and-streaming-tp9229p9254.html If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown. Tobias
Re: type issue: found RDD[T] expected RDD[A]
Hi, On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com wrote: I think the type of the data contained in your RDD needs to be a known case class and not abstract for createSchemaRDD. This makes sense when you think it needs to know about the fields in the object to create the schema. Exactly this. The actual message pointing to that is: inferred type arguments [T] do not conform to method createSchemaRDD's type parameter bounds [A : Product] All case classes are automatically subclasses of Product, but otherwise you will have to extend Product and add the required methods yourself. Tobias
Re: Data loss - Spark streaming and network receiver
Hi Wei, On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com wrote: Since our application cannot tolerate losing customer data, I am wondering what is the best way for us to address this issue. 1) We are thinking writing application specific logic to address the data loss. To us, the problem seems to be caused by that Kinesis receivers advanced their checkpoint before we know for sure the data is replicated. For example, we can do another checkpoint ourselves to remember the kinesis sequence number for data that has been processed by spark streaming. When Kinesis receiver is restarted due to worker failures, we restarted it from the checkpoint we tracked. This sounds pretty much to me like the way Kafka does it. So, I am not saying that the stock KafkaReceiver does what you want (it may or may not), but it should be possible to update the offset (corresponds to sequence number) in Zookeeper only after data has been replicated successfully. I guess replace Kinesis by Kafka is not in option for you, but you may consider pulling Kinesis data into Kafka before processing with Spark? Tobias
Re: [Spar Streaming] How can we use consecutive data points as the features ?
Hi, On Sat, Aug 16, 2014 at 3:29 AM, Yan Fang yanfang...@gmail.com wrote: If all consecutive data points are in one batch, it's not complicated except that the order of data points is not guaranteed in the batch and so I have to use the timestamp in the data point to reach my goal. However, when the consecutive data points spread in two or more batches, how can I do this? You *could* use window operations. If there is an upper limit to how many batches you might want to look at, you can instead consider a window that is large enough and thereby avoid using updateStateByKey. Tobias
Re: spark streaming : what is the best way to make a driver highly available
Hi, On Thu, Aug 14, 2014 at 5:49 AM, salemi alireza.sal...@udo.edu wrote: what is the best way to make a spark streaming driver highly available. I would also be interested in that. In particular for Streaming applications where the Spark driver is running for a long time, this might be important, I think. Thanks Tobias
Fwd: Task closures and synchronization
Uh, for some reason I don't seem to automatically reply to the list any more. Here is again my message to Tom. -- Forwarded message -- Tom, On Wed, Aug 13, 2014 at 5:35 AM, Tom Vacek minnesota...@gmail.com wrote: This is a back-to-basics question. How do we know when Spark will clone an object and distribute it with task closures versus synchronize access to it. For example, the old rookie mistake of random number generation: import scala.util.Random val randRDD = sc.parallelize(0 until 1000).map(ii = Random.nextGaussian) One can check to see that each partition contains a different set of random numbers, so the RNG obviously was not cloned, but access was synchronized. In this case, Random is a singleton object; Random.nextGaussian is like a static method of a Java class. The access is not synchronized (unless I misunderstand synchronized), but each Spark worker will use a JVM-local instance of the Random object. You don't actually close over the Random object in this case. In fact, this is one way to have node-local state (e.g., for DB connection pooling). However: val myMap = collection.mutable.Map.empty[Int,Int] sc.parallelize(0 until 100).mapPartitions(it = {it.foreach(ii = myMap(ii) = ii); Array(myMap).iterator}).collect This shows that each partition got a copy of the empty map and filled it in with its portion of the rdd. In this case, myMap is an instance of the Map class, so it will be serialized and shipped around. In fact, if you did `val Random = new scala.util.Random()` in your code above, then this object would also be serialized and treated just as myMap. (NB. No, it is not. Spark hangs for me when I do this and doesn't return anything...) Tobias
Re: Spark Streaming example on your mesos cluster
Hi, On Wed, Aug 13, 2014 at 4:24 AM, Zia Syed xia.s...@gmail.com wrote: I dont particularly see any errors on my logs, either on console, or on slaves. I see slave downloads the spark-1.0.2-bin-hadoop1.tgz file and unpacks them as well. Mesos Master shows quiet alot of Tasks created and Finished. I dont see any output on my console of the Word Counts, like in get in the Spark version. Any suggestions/ideas how i can make it work? You have to check the logs on the Mesos slaves in /tmp/mesos/slaves/***/frameworks/ -- I guess that you are missing the jar that your application is packed in. Tobias
Re: [spark-streaming] kafka source and flow control
Hi, On Mon, Aug 11, 2014 at 9:41 PM, Gwenhael Pasquiers gwenhael.pasqui...@ericsson.com wrote: We intend to apply other operations on the data later in the same spark context, but our first step is to archive it. Our goal is somth like this Step 1 : consume kafka Step 2 : archive to hdfs AND send to step 3 Step 3 : transform data Step 4 : save transformed data to HDFS as input for M/R I see. Well I think Spark Streaming may be well suited for that purpose. To us it looks like a great flaw if, in streaming mode, spark-streaming cannot slow down it’s consumption depending on the available resources. On Mon, Aug 11, 2014 at 10:10 PM, Gwenhael Pasquiers gwenhael.pasqui...@ericsson.com wrote: I think the kind of self-regulating system you describe would be too difficult to implement and probably unreliable (even more with the fact that we have multiple slaves). Isn't slow down its consumption depending on the available resources a self-regulating system? I don't see how you can adapt to available resources without measuring your execution time and then change how much you consume. Did you have any particular form of adaption in mind? Tobias
Fwd: Trying to make sense of the actual executed code
(Forgot to include the mailing list in my reply. Here it is.) Hi, On Thu, Aug 7, 2014 at 7:55 AM, Tom thubregt...@gmail.com wrote: When I look at the output, I see that there are several stages, and several tasks per stage. The tasks have a TID, I do not see such a thing for a stage. They should have. In my logs, for example, I see something like INFO scheduler.DAGScheduler - Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at SimpleSpark.scala:21), which has no missing parents INFO scheduler.DAGScheduler - Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at SimpleSpark.scala:21), which is now runnable But what I really want to know is the following: Which map, shuffle and reduces are performed in which order/where can I see the actual executed code per task/stage. In between files/rdd's would be a bonus! I would also be interested in that, although I think it's quite hard to understand what is actually being executed. I dug a bit into that yesterday, and even the simple WordCount (flatMap, map, reduceByKey, max) is already quite tough to understand. For example, reduceByKey consists of three transformations (local reduceByKey, repartition by key, another local reduceByKey), one of which happens in one stage, the other two in a different stage. I would love to see a good visualization of that (I wonder how the developers got their head around that without such a tool), but I am not aware of any. Tobias
Re: How true is this about spark streaming?
Hi, that quoted statement doesn't make too much sense for me, either. Maybe if you had a link for us that shows the context (Google doesn't reveal anything but this conversation), we could evaluate that statement better. Tobias On Tue, Jul 29, 2014 at 5:53 PM, Sean Owen so...@cloudera.com wrote: I'm not sure I understand this, maybe because the context is missing. An RDD is immutable, so there is no such thing as writing to an RDD. I'm not sure which aspect is being referred to as single-threaded. Is this the Spark Streaming driver? What is the difference between streaming into Spark and reading from the stream? Streaming data into Spark means Spark reads the stream. A mini batch of data is exposed as an RDD, but the stream processing continues while it is operated on. Saving the RDDs is one of the most basic operations exposed by streaming: http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations No, you do not stop the stream processing to persist it. In fact you couldn't. On that basis, no, this sounds fairly wrong. On Tue, Jul 29, 2014 at 1:37 AM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello folks: I came across a thread that said A Spark RDD read/write access is driven by a context object and is single threaded. You cannot stream into Spark and read from the stream at the same time. You have to stop the stream processing, snapshot the RDD and continue Can you please offer some insights? Thanks, Rohit Pujari Solutions Engineer, Hortonworks rpuj...@hortonworks.com 716-430-6899 CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Spark as a application library vs infra
Mayur, I don't know if I exactly understand the context of what you are asking, but let me just mention issues I had with deploying. * As my application is a streaming application, it doesn't read any files from disk, so therefore I have no Hadoop/HDFS in place and I there is no need for it, either. There should be no dependency on Hadoop or HDFS, since you can perfectly run Spark applications without it. * I use Mesos and so far I always had the downloaded Spark distribution accessible for all machines (e.g., via HTTP) and then added my application code by uploading a jar built with `sbt assembly`. As the Spark code itself must not be contained in that jar file, I had to add '% provided' in the sbt file, which in turn prevented me from running the application locally from IntelliJ IDEA (it would not find the libraries marked with provided), I always had to use `sbt run`. * When using Mesos, on the Spark slaves the Spark jar is loaded before the application jar, and so the log4j file from the Spark jar is used instead of my custom one (that is different when running locally), so I had to edit that file in the Spark distribution jar to customize logging of my Spark nodes. I wonder if the two latter problems would vanish if the Spark libraries were bundled together with the application. (That would be your approach #1, I guess.) Tobias
Re: Get Spark Streaming timestamp
Bill, Spark Streaming's DStream provides overloaded methods for transform() and foreachRDD() that allow you to access the timestamp of a batch: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream I think the timestamp is the end of the batch, not the beginning. For example, I compute runtime taking the difference between now() and the time I get as a parameter in foreachRDD(). Tobias On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill
Re: How to do an interactive Spark SQL
Hi, as far as I know, after the Streaming Context has started, the processing pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL statement is transformed into RDD operations when the Streaming Context starts, I think there is no way to change the statement that is executed on the current stream after the StreamingContext has started. Tobias On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote: For example, this is what I tested and work on local mode, what it does is it get data and sql query both from kafka and do sql on each RDD and output the result back to kafka again I defined a var called *sqlS. * In the streaming part as you can see I change the sql statement if it consumes a sql message from kafka then next time when you do *sql(sqlS) *it execute the updated sql query. But this code doesn't work in cluster because sqlS is not updated on all the workers from what I understand. So my question is how do I change the sqlS value at runtime and make all the workers pick the latest value. *var sqlS = select count(*) from records* val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ import sqlContext.createSchemaRDD //val tt = Time(5000) val topicpMap = collection.immutable.HashMap(topic - numParts.toInt, sqltopic - 2) val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS = t._2;* false } else true }).map(t = getRecord(t._2.split(#))) val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer) val brokerString = ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,) KafkaSpark.props.put(metadata.broker.list, brokerString) val config = new ProducerConfig(KafkaSpark.props) val producer = new Producer[String, String](config) val result = recordsStream.foreachRDD((recRDD) = { val schemaRDD = sqlContext.createSchemaRDD(recRDD) schemaRDD.registerAsTable(tName) val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s + r.mkString(,) + \n }) producer.send(new KeyedMessage[String, String](outputTopic, sSQL: $sqlS \n $result)) }) ssc.start() ssc.awaitTermination() On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: spark streaming rate limiting from kafka
Bill, numPartitions means the number of Spark partitions that the data received from Kafka will be split to. It has nothing to do with Kafka partitions, as far as I know. If you create multiple Kafka consumers, it doesn't seem like you can specify which consumer will consume which Kafka partitions. Instead, Kafka (at least with the interface that is exposed by the Spark Streaming API) will do something called rebalance and assign Kafka partitions to consumers evenly, you can see this in the client logs. When using multiple Kafka consumers with auto.offset.reset = true, please expect to run into this one: https://issues.apache.org/jira/browse/SPARK-2383 Tobias On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[ U], valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map[String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: Distribute data from Kafka evenly on cluster
Hi, as far as I know, rebalance is triggered from Kafka in order to distribute partitions evenly. That is, to achieve the opposite of what you are seeing. I think it would be interesting to check the Kafka logs for the result of the rebalance operation and why you see what you are seeing. I know that in the client logs it says which partitions of a topic were assigned to this particular consumer, maybe you can have a look. Tobias On Fri, Jul 18, 2014 at 11:42 PM, Chen Song chen.song...@gmail.com wrote: Speaking of this, I have another related question. In my spark streaming job, I set up multiple consumers to receive data from Kafka, with each worker from one partition. Initially, Spark is intelligent enough to associate each worker to each partition, to make data consumption distributed. After running for a while, consumers rebalance themselves and some workers start reading partitions which were with others. This leads to a situation that some worker read from multiple partitions and some don't read at all. Because of data volume, this causes heap pressure on some workers. Any thoughts on why rebalance is triggered and how to monitor to avoid that? On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, unfortunately, when I go the above approach, I run into this problem: http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver will retry again and again, but will eventually fail, leading to unprocessed data and, worse, the task never terminating. There is nothing exotic about my setup; one Zookeeper node, one Kafka broker, so I am wondering if other people have seen this error before and, more important, how to fix it. When I don't use the approach of multiple kafkaStreams, I don't get this error, but also work is never distributed in my cluster... Thanks Tobias On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote: Thank you very much for the link, that was very helpful! So, apparently the `topics: Map[String, Int]` parameter controls the number of partitions that the data is initially added to; the number N in val kafkaInputs = (1 to N).map { _ = ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1)) } val union = ssc.union(kafkaInputs) controls how many connections are made to Kafka. Note that the number of Kafka partitions for that topic must be at least N for this to work. Thanks Tobias -- Chen Song
Re: Include permalinks in mail footer
On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I often find myself wanting to reference one thread from another, or from a JIRA issue. Right now I have to google the thread subject and find the link that way. +1
Re: spark streaming rate limiting from kafka
Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: can't print DStream after reduce
Hi, thanks for creating the issue. It feels like in the last week, more or less half of the questions about Spark Streaming rooted in setting the master to local ;-) Tobias On Wed, Jul 16, 2014 at 11:03 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, right, copied from the wrong browser tab i guess. Thanks! TD On Tue, Jul 15, 2014 at 5:57 PM, Michael Campbell michael.campb...@gmail.com wrote: I think you typo'd the jira id; it should be https://issues.apache.org/jira/browse/SPARK-2475 Check whether #cores #receivers in local mode On Mon, Jul 14, 2014 at 3:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The problem is not really for local[1] or local. The problem arises when there are more input streams than there are cores. But I agree, for people who are just beginning to use it by running it locally, there should be a check addressing this. I made a JIRA for this. https://issues.apache.org/jira/browse/SPARK-2464 TD On Sun, Jul 13, 2014 at 4:26 PM, Sean Owen so...@cloudera.com wrote: How about a PR that rejects a context configured for local or local[1]? As I understand it is not intended to work and has bitten several people. On Jul 14, 2014 12:24 AM, Michael Campbell michael.campb...@gmail.com wrote: This almost had me not using Spark; I couldn't get any output. It is not at all obvious what's going on here to the layman (and to the best of my knowledge, not documented anywhere), but now you know you'll be able to answer this question for the numerous people that will also have it. On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com wrote: Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: Announcing Spark 1.0.1
Hi, congratulations on the release! I'm always pleased to see how features pop up in new Spark versions that I had added for myself in a very hackish way before (such as JSON support for Spark SQL). I am wondering if there is any good way to learn early about what is going to be in upcoming versions, except than tracking JIRA...? Tobias On Tue, Jul 15, 2014 at 12:50 AM, Philip Ogren philip.og...@oracle.com wrote: Hi Patrick, This is great news but I nearly missed the announcement because it had scrolled off the folder view that I have Spark users list messages go to. 40+ new threads since you sent the email out on Friday evening. You might consider having someone on your team create a spark-announcement list so that it is easier to disseminate important information like this release announcement. Thanks again for all your hard work. I know you and the rest of the team are getting a million requests a day Philip On 07/11/2014 07:35 PM, Patrick Wendell wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi, I experienced exactly the same problems when using SparkContext with local[1] master specification, because in that case one thread is used for receiving data, the others for processing. As there is only one thread running, no processing will take place. Once you shut down the connection, the receiver thread will be used for processing. Any chance you run into the same issue? Tobias On Mon, Jul 14, 2014 at 11:45 AM, kytay kaiyang@gmail.com wrote: Hi Akhil Das Thanks. I tried the codes. and it works. There's a problem with my socket codes that is not flushing the content out, and for the test tool, Hercules, I have to close the socket connection to flush the content out. I am going to troubleshoot why nc works, and the codes and test tool don't. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Some question about SQL and streaming
Hi, I think it would be great if we could do the string parsing only once and then just apply the transformation for each interval (reducing the processing overhead for short intervals). Also, one issue with the approach above is that transform() has the following signature: def transform(transformFunc: RDD[T] = RDD[U]): DStream[U] and therefore, in my example val result = lines.transform((rdd, time) = { // execute statement rdd.registerAsTable(data) sqlc.sql(query) }) the variable `result ` is of type DStream[Row]. That is, the meta-information from the SchemaRDD is lost and, from what I understand, there is then no way to learn about the column names of the returned data, as this information is only encoded in the SchemaRDD. I would love to see a fix for this. Thanks Tobias
Re: Use Spark Streaming to update result whenever data come
Bill, I haven't worked with Yarn, but I would try adding a repartition() call after you receive your data from Kafka. I would be surprised if that didn't help. On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, I was using Spark 0.9 before and the master I used was yarn-standalone. In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not sure whether it is the reason why more machines do not provide better scalability. What is the difference between these two modes in terms of efficiency? Thanks! On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Re: Some question about SQL and streaming
Siyuan, I do it like this: // get data from Kafka val ssc = new StreamingContext(...) val kvPairs = KafkaUtils.createStream(...) // we need to wrap the data in a case class for registerAsTable() to succeed val lines = kvPairs.map(_._2).map(s = StringWrapper(s)) val result = lines.transform((rdd, time) = { // execute statement rdd.registerAsTable(data) sqlc.sql(query) }) Don't know if it is the best way, but it works. Tobias On Thu, Jul 10, 2014 at 4:21 AM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm a new user to spark. I would like to know is there an example of how to user spark SQL and spark streaming together? My use case is I want to do some SQL on the input stream from kafka. Thanks! Best, Siyuan
Re: Use Spark Streaming to update result whenever data come
Bill, good to know you found your bottleneck. Unfortunately, I don't know how to solve this; until know, I have used Spark only with embarassingly parallel operations such as map or filter. I hope someone else might provide more insight here. Tobias On Thu, Jul 10, 2014 at 9:57 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Now I did the re-partition and ran the program again. I find a bottleneck of the whole program. In the streaming, there is a stage marked as *combineByKey at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly executed. However, during some batches, the number of executors allocated to this step is only 2 although I used 300 workers and specified the partition number as 300. In this case, the program is very slow although the data that are processed are not big. Do you know how to solve this issue? Thanks! On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, I haven't worked with Yarn, but I would try adding a repartition() call after you receive your data from Kafka. I would be surprised if that didn't help. On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, I was using Spark 0.9 before and the master I used was yarn-standalone. In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not sure whether it is the reason why more machines do not provide better scalability. What is the difference between these two modes in terms of efficiency? Thanks! On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Re: Spark-streaming-kafka error
Bill, have you packaged org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 into your application jar? If I remember correctly, it's not bundled with the downloadable compiled version of Spark. Tobias On Wed, Jul 9, 2014 at 8:18 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I used sbt to package a code that uses spark-streaming-kafka. The packaging succeeded. However, when I submitted to yarn, the job ran for 10 seconds and there was an error in the log file as follows: Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ Does anyone know the reason for this? Thanks! Bill
Re: Use Spark Streaming to update result whenever data come
Bill, do the additional 100 nodes receive any tasks at all? (I don't know which cluster you use, but with Mesos you could check client logs in the web interface.) You might want to try something like repartition(N) or repartition(N*2) (with N the number of your nodes) after you receive your data. Tobias On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tobias, Thanks for the suggestion. I have tried to add more nodes from 300 to 400. It seems the running time did not get improved. On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, can't you just add more nodes in order to speed up the processing? Tobias On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a problem of using Spark Streaming to accept input data and update a result. The input of the data is from Kafka and the output is to report a map which is updated by historical data in every minute. My current method is to set batch size as 1 minute and use foreachRDD to update this map and output the map at the end of the foreachRDD function. However, the current issue is the processing cannot be finished within one minute. I am thinking of updating the map whenever the new data come instead of doing the update when the whoe RDD comes. Is there any idea on how to achieve this in a better running time? Thanks! Bill
Re: Which is the best way to get a connection to an external database per task in Spark Streaming?
Juan, I am doing something similar, just not insert into SQL database, but issue some RPC call. I think mapPartitions() may be helpful to you. You could do something like dstream.mapPartitions(iter = { val db = new DbConnection() // maybe only do the above if !iter.isEmpty iter.map(item = { db.call(...) // do some cleanup if !iter.hasNext here item }) }).count() // force output Keep in mind though that the whole idea about RDDs is that operations are idempotent and in theory could be run on multiple hosts (to take the result from the fastest server) or multiple times (to deal with failures/timeouts) etc., which is maybe something you want to deal with in your SQL. Tobias On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi list, I'm writing a Spark Streaming program that reads from a kafka topic, performs some transformations on the data, and then inserts each record in a database with foreachRDD. I was wondering which is the best way to handle the connection to the database so each worker, or even each task, uses a different connection to the database, and then database inserts/updates would be performed in parallel. - I understand that using a final variable in the driver code is not a good idea because then the communication with the database would be performed in the driver code, which leads to a bottleneck, according to http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ - I think creating a new connection in the call() method of the Function passed to foreachRDD is also a bad idea, because then I wouldn't be reusing the connection to the database for each batch RDD in the DStream - I'm not sure that a broadcast variable with the connection handler is a good idea in case the target database is distributed, because if the same handler is used for all the nodes of the Spark cluster then than could have a negative effect in the data locality of the connection to the database. - From http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html I understand that by using an static variable and referencing it in the call() method of the Function passed to foreachRDD we get a different connection per Spark worker, I guess it's because there is a different JVM per worker. But then all the tasks in the same worker would share the same database handler object, am I right? - Another idea is using updateStateByKey() using the database handler as the state, but I guess that would only work for Serializable database handlers, and for example not for an org.apache.hadoop.hbase.client.HTable object. So my question is, which is the best way to get a connection to an external database per task in Spark Streaming? Or at least per worker. In http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html there is a partial solution to this question, but there the database handler object is missing. This other question http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html is closer to mine, but there is no answer for it yet Thanks in advance, Greetings, Juan
Re: Distribute data from Kafka evenly on cluster
Hi, unfortunately, when I go the above approach, I run into this problem: http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver will retry again and again, but will eventually fail, leading to unprocessed data and, worse, the task never terminating. There is nothing exotic about my setup; one Zookeeper node, one Kafka broker, so I am wondering if other people have seen this error before and, more important, how to fix it. When I don't use the approach of multiple kafkaStreams, I don't get this error, but also work is never distributed in my cluster... Thanks Tobias On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote: Thank you very much for the link, that was very helpful! So, apparently the `topics: Map[String, Int]` parameter controls the number of partitions that the data is initially added to; the number N in val kafkaInputs = (1 to N).map { _ = ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1)) } val union = ssc.union(kafkaInputs) controls how many connections are made to Kafka. Note that the number of Kafka partitions for that topic must be at least N for this to work. Thanks Tobias
Re: Kafka - streaming from multiple topics
Sergey, On Fri, Jul 4, 2014 at 1:06 AM, Sergey Malov sma...@collective.com wrote: On the other hand, under the hood KafkaInputDStream which is create with this KafkaUtils call, calls ConsumerConnector.createMessageStream which returns a Map[String, List[KafkaStream] keyed by topic. It is, however, not exposed. I wonder if this is a bug. After all, KafkaUtils.createStream() returns a DStream[(String, String)], which pretty much looks like it should be a (topic - message) mapping. However, for me, the key is always null. Maybe you could consider filing a bug/wishlist report? Tobias
Visualize task distribution in cluster
Hi, I am using Mesos to run my Spark tasks. I would be interested to see how Spark distributes the tasks in the cluster (nodes, partitions) and which nodes are more or less active and do what kind of tasks, and how long the transfer of data and jobs takes. Is there any way to get this information from Spark? Thanks Tobias
Re: spark streaming rate limiting from kafka
Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias
Re: Could not compute split, block not found
Bill, let's say the processing time is t' and the window size t. Spark does not *require* t' t. In fact, for *temporary* peaks in your streaming data, I think the way Spark handles it is very nice, in particular since 1) it does not mix up the order in which items arrived in the stream, so items from a later window will always be processed later, and 2) because an increase in data will not be punished with high load and unresponsive systems, but with disk space consumption instead. However, if all of your windows require t' t processing time (and it's not because you are waiting, but because you actually do some computation), then you are in bad luck, because if you start processing the next window while the previous one is still processed, you have less resources for each and processing will take even longer. However, if you are only waiting (e.g., for network I/O), then maybe you can employ some asynchronous solution where your tasks return immediately and deliver their result via a callback later? Tobias On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Your suggestion is very helpful. I will definitely investigate it. Just curious. Suppose the batch size is t seconds. In practice, does Spark always require the program to finish processing the data of t seconds within t seconds' processing time? Can Spark begin to consume the new batch before finishing processing the next batch? If Spark can do them together, it may save the processing time and solve the problem of data piling up. Thanks! Bill On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: If your batch size is one minute and it takes more than one minute to process, then I guess that's what causes your problem. The processing of the second batch will not start after the processing of the first is finished, which leads to more and more data being stored and waiting for processing; check the attached graph for a visualization of what I think may happen. Can you maybe do something hacky like throwing away a part of the data so that processing time gets below one minute, then check whether you still get that error? Tobias On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Tobias, Thanks for your help. I think in my case, the batch size is 1 minute. However, it takes my program more than 1 minute to process 1 minute's data. I am not sure whether it is because the unprocessed data pile up. Do you have an suggestion on how to check it and solve it? Thanks! Bill On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, were you able to process all information in time, or did maybe some unprocessed data pile up? I think when I saw this once, the reason seemed to be that I had received more data than would fit in memory, while waiting for processing, so old data was deleted. When it was time to process that data, it didn't exist any more. Is that a possible reason in your case? Tobias On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: java.lang.Exception: Could not compute split, block input-0-1403893740400 not found Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Distribute data from Kafka evenly on cluster
Hi, I have a number of questions using the Kafka receiver of Spark Streaming. Maybe someone has some more experience with that and can help me out. I have set up an environment for getting to know Spark, consisting of - a Mesos cluster with 3 only-slaves and 3 master-and-slaves, - 2 Kafka nodes, - 3 Zookeeper nodes providing service to both Kafka and Mesos. My Kafka cluster has only one topic with one partition (replicated to both nodes). When I start my Kafka receiver, it successfully connects to Kafka and does the processing, but it seems as if the (expensive) function in the final foreachRDD(...) is only executed on one node of my cluster, which is not what I had in mind when setting up the cluster ;-) So first, I was wondering about the parameter `topics: Map[String, Int]` to KafkaUtils.createStream(). Apparently it controls how many connections are made from my cluster nodes to Kafka. The Kafka doc at https://kafka.apache.org/documentation.html#introduction says each message published to a topic is delivered to one consumer instance within each subscribing consumer group and If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers. The Kafka docs *also* say: Note however that there cannot be more consumer instances than partitions. This seems to imply that with only one partition, increasing the number in my Map should have no effect. However, if I increase the number of streams for my one topic in my `topics` Map, I actually *do* see that the task in my foreachRDD(...) call is now executed on multiple nodes. Maybe it's more of a Kafka question than a Spark one, but can anyone explain this to me? Should I always have more Kafka partitions than Mesos cluster nodes? So, assuming that changing the number in that Map is not what I want (although I don't know if it is), I tried to use .repartition(numOfClusterNodes) (which doesn't seem right if I want to add and remove Mesos nodes on demand). This *also* did spread the foreachRDD(...) action evenly – however, the function never seems to terminate, so I never get to process the next interval in the stream. A similar behavior can be observed when running locally, not on the cluster, then the program will not exit but instead hang after everything else has shut down. Any hints concerning this issue? Thanks Tobias
Re: Changing log level of spark
I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging, changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level.
Re: Kafka client - specify offsets?
Michael, apparently, the parameter auto.offset.reset has a different meaning in Spark's Kafka implementation than what is described in the documentation. The Kafka docs at https://kafka.apache.org/documentation.html specify the effect of auto.offset.reset as: What to do when there is no initial offset in ZooKeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer However, Spark's implementation seems to drop the part when there is no initial offset, as can be seen in https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L102 -- it will just wipe the stored offset from Zookeeper. I guess it's actually a bug, because the parameter's effect is different than what is documented, but then it's good for you (and me) because it allows to specify I want all that I can get or I want to start reading right now, even if there is an offset stored in Zookeeper. Tobias On Sun, Jun 15, 2014 at 11:27 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, there are apparently helpers to tell you the offsets https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#id-0.8.0SimpleConsumerExample-FindingStartingOffsetforReads, but I have no idea how to pass that to the Kafka stream consumer. I am interested in that as well. Tobias On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell michael.campb...@gmail.com wrote: Is there a way in the Apache Spark Kafka Utils to specify an offset to start reading? Specifically, from the start of the queue, or failing that, a specific point?
Re: Spark SQL: No function to evaluate expression
The error message *means* that there is no column called c_address. However, maybe it's a bug with Spark SQL not understanding the a.c_address syntax. Can you double-check the column name is correct? Thanks Tobias On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat zuhair.khay...@gmail.com wrote: Dear all, I am trying to run the following query on Spark SQL using some custom TPC-H tables with standalone Spark cluster configuration: SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE a.c_address b.c_address; Unfortunately I get the following error during execution: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure in TID 12 on host kw2260.kaust.edu.sa: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59) org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147) org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74) org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100) Is this a bug or am I doing something wrong? Regards, Zuhair Khayyat
Re: Kafka client - specify offsets?
Hi, there are apparently helpers to tell you the offsets https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#id-0.8.0SimpleConsumerExample-FindingStartingOffsetforReads, but I have no idea how to pass that to the Kafka stream consumer. I am interested in that as well. Tobias On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell michael.campb...@gmail.com wrote: Is there a way in the Apache Spark Kafka Utils to specify an offset to start reading? Specifically, from the start of the queue, or failing that, a specific point?
Re: Spark 1.0.0 Maven dependencies problems.
Hi, I remembered I saw this as well and found this ugly comment in my build.sbt file: On Mon, Jun 9, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Looks like this crept in again from the shaded Akka dependency. I'll propose a PR to remove it. I believe that remains the way we have to deal with the differing Netty/Jetty dependencies floating around. On Mon, Jun 9, 2014 at 9:53 AM, toivoa toivo@gmail.com wrote: I am using Maven from Eclipse dependency:tree shows [INFO] +- org.apache.spark:spark-core_2.10:jar:1.0.0:compile [INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:runtime [INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile [INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile [INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile [INFO] | | \- org.apache.zookeeper:zookeeper:jar:3.4.5:compile [INFO] | | \- jline:jline:jar:0.9.94:compile [INFO] | +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:compile [INFO] | | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:compile [INFO] | | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:compile [INFO] | | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:compile [INFO] | |\- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:compile [INFO] | +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:compile [INFO] | +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile [INFO] | | +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile [INFO] | +- com.google.guava:guava:jar:14.0.1:compile [INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile [INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile [INFO] | +- org.slf4j:slf4j-api:jar:1.7.5:compile [INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.5:compile [INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.5:compile [INFO] | +- log4j:log4j:jar:1.2.17:compile [INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.5:compile [INFO] | +- com.ning:compress-lzf:jar:1.0.0:compile [INFO] | +- org.xerial.snappy:snappy-java:jar:1.0.5:compile [INFO] | +- com.twitter:chill_2.10:jar:0.3.6:compile [INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile [INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile [INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile [INFO] | | \- org.objenesis:objenesis:jar:1.2:compile [INFO] | +- com.twitter:chill-java:jar:0.3.6:compile [INFO] | +- commons-net:commons-net:jar:2.2:compile [INFO] | +- org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | | +- org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | | | \- com.typesafe:config:jar:1.0.2:compile [INFO] | | +- io.netty:netty:jar:3.6.6.Final:compile [INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile [INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile [INFO] | +- org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | +- org.scala-lang:scala-library:jar:2.10.4:compile [INFO] | +- org.json4s:json4s-jackson_2.10:jar:3.2.6:compile [INFO] | | +- org.json4s:json4s-core_2.10:jar:3.2.6:compile [INFO] | | | +- org.json4s:json4s-ast_2.10:jar:3.2.6:compile [INFO] | | | +- com.thoughtworks.paranamer:paranamer:jar:2.6:compile [INFO] | | | \- org.scala-lang:scalap:jar:2.10.0:compile [INFO] | | | \- org.scala-lang:scala-compiler:jar:2.10.0:compile [INFO] | | |\- org.scala-lang:scala-reflect:jar:2.10.0:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-databind:jar:2.3.0:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.3.0:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.3.0:compile [INFO] | +- colt:colt:jar:1.2.0:compile [INFO] | | \- concurrent:concurrent:jar:1.3.4:compile [INFO] | +- org.apache.mesos:mesos:jar:shaded-protobuf:0.18.1:compile [INFO] | +- io.netty:netty-all:jar:4.0.17.Final:compile [INFO] | +- com.clearspring.analytics:stream:jar:2.5.1:compile [INFO] | +- com.codahale.metrics:metrics-core:jar:3.0.0:compile [INFO] | +- com.codahale.metrics:metrics-jvm:jar:3.0.0:compile [INFO] | +- com.codahale.metrics:metrics-json:jar:3.0.0:compile [INFO] | +- com.codahale.metrics:metrics-graphite:jar:3.0.0:compile [INFO] | +-
Re: Spark 1.0.0 Maven dependencies problems.
Hi, (Apparently Google Mail is quite eager to send out mails when Ctrl+Enter is hit by accident. Sorry for the previous email.) I remembered I saw this as well and found this ugly comment in my build.sbt file: /* * [...], there is still a problem with some classes * in javax.servlet (from org.eclipse.jetty.orbit) being signed, while others * (from org.mortbay.jetty) are not. When using sbt run, this is not a * problem, but IntelliJ IDEA refuses to run the main classes, raising a * SecurityException. In order to deal with that, remove the signature from * org.eclipse.jetty.orbit/javax.servlet/jars/javax.servlet-3.0.0.v201112011016.jar * file manually. */ Tobias On Tue, Jun 10, 2014 at 10:47 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I remembered I saw this as well and found this ugly comment in my build.sbt file: On Mon, Jun 9, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Looks like this crept in again from the shaded Akka dependency. I'll propose a PR to remove it. I believe that remains the way we have to deal with the differing Netty/Jetty dependencies floating around. On Mon, Jun 9, 2014 at 9:53 AM, toivoa toivo@gmail.com wrote: I am using Maven from Eclipse dependency:tree shows [INFO] +- org.apache.spark:spark-core_2.10:jar:1.0.0:compile [INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:runtime [INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile [INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile [INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile [INFO] | | \- org.apache.zookeeper:zookeeper:jar:3.4.5:compile [INFO] | | \- jline:jline:jar:0.9.94:compile [INFO] | +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:compile [INFO] | | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:compile [INFO] | | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:compile [INFO] | | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:compile [INFO] | |\- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:compile [INFO] | +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:compile [INFO] | +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile [INFO] | | +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile [INFO] | +- com.google.guava:guava:jar:14.0.1:compile [INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile [INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile [INFO] | +- org.slf4j:slf4j-api:jar:1.7.5:compile [INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.5:compile [INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.5:compile [INFO] | +- log4j:log4j:jar:1.2.17:compile [INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.5:compile [INFO] | +- com.ning:compress-lzf:jar:1.0.0:compile [INFO] | +- org.xerial.snappy:snappy-java:jar:1.0.5:compile [INFO] | +- com.twitter:chill_2.10:jar:0.3.6:compile [INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile [INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile [INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile [INFO] | | \- org.objenesis:objenesis:jar:1.2:compile [INFO] | +- com.twitter:chill-java:jar:0.3.6:compile [INFO] | +- commons-net:commons-net:jar:2.2:compile [INFO] | +- org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | | +- org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | | | \- com.typesafe:config:jar:1.0.2:compile [INFO] | | +- io.netty:netty:jar:3.6.6.Final:compile [INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile [INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile [INFO] | +- org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:compile [INFO] | +- org.scala-lang:scala-library:jar:2.10.4:compile [INFO] | +- org.json4s:json4s-jackson_2.10:jar:3.2.6:compile [INFO] | | +- org.json4s:json4s-core_2.10:jar:3.2.6:compile [INFO] | | | +- org.json4s:json4s-ast_2.10:jar:3.2.6:compile [INFO] | | | +- com.thoughtworks.paranamer:paranamer:jar:2.6:compile [INFO] | | | \- org.scala-lang:scalap:jar:2.10.0:compile [INFO] | | | \- org.scala-lang:scala-compiler:jar:2.10.0:compile [INFO] | | |\- org.scala-lang:scala-reflect:jar:2.10.0:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-databind:jar
Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
Gaurav, I am not sure that the * expands to what you expect it to do. Normally the bash expands * to a space-separated string, not colon-separated. Try specifying all the jars manually, maybe? Tobias On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta gaurav.d...@gmail.com wrote: Hi, I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster. I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster. I am using the following command to run my code on YARN mode: SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1 Below is the error message I am getting: 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL) 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0) 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:247) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at
Re: Are scala.MatchError messages a problem?
Jeremy, On Mon, Jun 9, 2014 at 10:22 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: When you use match, the match must be exhaustive. That is, a match error is thrown if the match fails. Ahh, right. That makes sense. Scala is applying its strong typing rules here instead of no ceremony... but isn't the idea that type errors should get picked up at compile time? I suppose the compiler can't tell there's not complete coverage, but it seems strange to throw that at runtime when it is literally the 'default case'. You can use subclasses of sealed traits to get a compiler warning for non-exhaustive matches: http://stackoverflow.com/questions/11203268/what-is-a-sealed-trait I don't know if it can be applied for regular expression matching, though... Tobias
Re: Classpath errors with Breeze
Hi, I had a similar problem; I was using `sbt assembly` to build a jar containing all my dependencies, but since my file system has a problem with long file names (due to disk encryption), some class files (which correspond to functions in Scala) where not included in the jar I uploaded. Although, thinking about it, that would result in a ClassNotFound exception, not NoSuchMethod. Have you built your code against a different version of the library than the jar you use in EC2? Tobias On Mon, Jun 9, 2014 at 1:52 PM, dlaw dieterich.law...@gmail.com wrote: I'm having some trouble getting a basic matrix multiply to work with Breeze. I'm pretty sure it's related to my classpath. My setup is a cluster on AWS with 8 m3.xlarges. To create the cluster I used the provided ec2 scripts and Spark 1.0.0. I've made a gist with the relevant pieces of my app: https://gist.github.com/dieterichlawson/e5e3ab158a09429706e0 The app was created as detailed in the quick start guide. When I run it I get an error that says the method to multiply a dense matrix by a dense matrix does not exist: 14/06/09 04:49:09 WARN scheduler.TaskSetManager: Lost TID 90 (task 0.0:13) 14/06/09 04:49:09 INFO scheduler.TaskSetManager: Loss was due to java.lang.NoSuchMethodError: breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$; [duplicate 46] I've tried a bunch of different things, including playing with the CLASSPATH and ADD_JARS environment variables, the --jars option on spark-submit, the version of breeze and scala, etc... I've also tried it in the spark-shell. It works there, so I don't really know what's going on. Any thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to shut down Spark Streaming with Kafka properly?
Hi, I am trying to use Spark Streaming with Kafka, which works like a charm -- except for shutdown. When I run my program with sbt run-main, sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future { ... }` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2. Does anyone have *any* idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. Thanks Tobias
Re: How to shut down Spark Streaming with Kafka properly?
Sean, your patch fixes the issue, thank you so much! (This is the second time within one week I run into network libraries not shutting down threads properly, I'm really glad your code fixes the issue.) I saw your pull request is closed, but not merged yet. Can I do anything to get your fix into Spark? Open an issue, send a pull request myself etc.? Thanks Tobias
Re: A single build.sbt file to start Spark REPL?
Hi, I guess it should be possible to dig through the scripts bin/spark-shell, bin/spark-submit etc. and convert them to a long sbt command that you can run. I just tried sbt run-main org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main but that fails with Failed to initialize compiler: object scala.runtime in compiler mirror not found. ** Note that as of 2.8 scala does not assume use of the java classpath. ** For the old behavior pass -usejavacp to scala, or if using a Settings ** object programatically, settings.usejavacp.value = true. Would be happy to learn about a way to do that, too. Tobias On Tue, Jun 3, 2014 at 11:56 AM, Alexy Khrabrov al...@scalable.pro wrote: The usual way to use Spark with SBT is to package a Spark project using sbt package (e.g. per Quick Start) and submit it to Spark using the bin/ scripts from Sark distribution. For plain Scala project, you don’t need to download anything, you can just get a build.sbt file with dependencies and e.g. say “console” which will start a Scala REPL with the dependencies on the class path. Is there a way to avoid downloading Spark tarball completely, by defining the spark-core dependency in build.sbt, and using `run` or `console` to invoke Spark REPL from sbt? I.e. the goal is: create a single build.sbt file, such that if you run sbt in its directory, and then say run/console (with optional parameters), it will download all Spark dependencies and start the REPL. Should work on a fresh machine where Spark tarball had never been untarred. A+
Spark Streaming on Mesos, various questions
Hi, with the hints from Gerard I was able to get my locally working Spark code running on Mesos. Thanks! Basically, on my local dev machine, I use sbt assembly to create a fat jar (which is actually not so fat since I use ... % 'provided' in my sbt file for the Spark dependencies), upload it to my cluster and run it using java -cp myApplicationCode.jar:spark-assembly-1.0.0-SNAPSHOT.jar mypackage.MainClass I can see in my Mesos master web interface how the tasks are added and distributed to the slaves and in the driver program I can see the final results, that is very nice. Now, as the next step, I wanted to get Spark Streaming running. That worked out by now, but I have various questions. I'd be happy if someone could help me out with some answers. 1. I wrongly assumed that when using ssc.socketTextStream(), the driver would connect to the specified server. It does not; apparently one of the slaves does ;-) Does that mean that before any DStream processing can be done, all the received data needs to be sent to the other slaves? What about the extreme case dstream.filter(x = false); would all the data be transferred to other hosts, just to be discarded there? 2. How can I reduce the logging? It seems like for every chunk received from the socketTextStream, I get a line INFO BlockManagerInfo: Added input-0-1400739888200 in memory on ..., that's very noisy. Also, when the foreachRDD() is processed every N seconds, I get a lot of output. 3. In my (non-production) cluster, I have six slaves, two of which have 2G of RAM, the other four just 512M. So far, I have not seen Mesos ever give a job to one of the four low-mem machines. Is 512M just not enough for *any* task, or is there a rationale like they are not cool enough to play with the Big Guys built into Mesos? 4. I don't have any HDFS or shared disk space. What does this mean for Spark Streaming's default storage level MEMORY_AND_DISK_SER_2? 5. My prototype example for Spark Streaming is a simple word count: val wordCounts = ssc.socketTextStream(...).flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _) wordCounts.print() However, (with a batchDuration of five seconds) this only works correctly if I run the application in Mesos coarse mode. In the default fine-grained mode, I will always receive 0 as word count (that is, a wrong result), and a lot of warnings like W0522 06:57:23.578400 12824 sched.cpp:901] Attempting to launch task 7 with an unknown offer 20140520-102159-2154735808-5050-1108-7891 Can anyone explain this behavior? Thanks, Tobias
ClassNotFoundException with Spark/Mesos (spark-shell works fine)
Hi, I have set up a cluster with Mesos (backed by Zookeeper) with three master and three slave instances. I set up Spark (git HEAD) for use with Mesos according to this manual: http://people.apache.org/~pwendell/catalyst-docs/running-on-mesos.html Using the spark-shell, I can connect to this cluster and do simple RDD operations, but the same code in a Scala class and executed via sbt run-main works only partially. (That is, count() works, count() after flatMap() does not.) Here is my code: https://gist.github.com/tgpfeiffer/7d20a4d59ee6e0088f91 The file SparkExamplesScript.scala, when pasted into spark-shell, outputs the correct count() for the parallelized list comprehension, as well as for the flatMapped RDD. The file SparkExamplesMinimal.scala contains exactly the same code, and also the MASTER configuration and the Spark Executor are the same. However, while the count() for the parallelized list is displayed correctly, I receive the following error when asking for the count() of the flatMapped RDD: - 14/05/21 09:47:49 INFO scheduler.DAGScheduler: Submitting Stage 1 (FlatMappedRDD[1] at flatMap at SparkExamplesMinimal.scala:34), which has no missing parents 14/05/21 09:47:49 INFO scheduler.DAGScheduler: Submitting 8 missing tasks from Stage 1 (FlatMappedRDD[1] at flatMap at SparkExamplesMinimal.scala:34) 14/05/21 09:47:49 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 8 tasks 14/05/21 09:47:49 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 8 on executor 20140520-102159-2154735808-5050-1108-1: mesos9-1 (PROCESS_LOCAL) 14/05/21 09:47:49 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 1779147 bytes in 37 ms 14/05/21 09:47:49 WARN scheduler.TaskSetManager: Lost TID 8 (task 1.0:0) 14/05/21 09:47:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: spark.SparkExamplesMinimal$$anonfun$2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) - Can anyone explain to me where this comes from or how I might further track the problem down? Thanks, Tobias
Re: I want to filter a stream by a subclass.
On Thu, May 22, 2014 at 8:07 AM, Tathagata Das tathagata.das1...@gmail.com wrote: records.filter { _.isInstanceOf[Orange] } .map { _.asInstanceOf[Orange] } I think a Scala-ish way would be records.flatMap(_ match { case i: Int= Some(i) case _ = None })
Re: RDD union of a window in Dstream
Hi, On Wed, May 21, 2014 at 9:42 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: I want to do union of all RDDs in each window of DStream. A window *is* a union of all RDDs in the respective time interval. The documentation says a DStream is represented as a sequence of RDDs. However, data from a certain time interval will always be contained in *one* RDD, not a sequence of RDDs (AFAIK). Regards Tobias