RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data
This is most likely due to the internal implementation of ALS in MLib. Probably for each parallel unit of execution (partition in Spark terms) the implementation allocates and uses a RAM buffer where it keeps interim results during the ALS iterations If we assume that the size of that internal RAM buffer is fixed per Unit of Execution then Total RAM (20 partitions x fixed RAM buffer) Total RAM (100 partitions x fixed RAM buffer) From: Aniruddh Sharma [mailto:asharma...@gmail.com] Sent: Wednesday, July 8, 2015 12:22 PM To: user@spark.apache.org Subject: Out of Memory Errors on less number of cores in proportion to Partitions in Data Hi, I am new to Spark. I have done following tests and I am confused in conclusions. I have 2 queries. Following is the detail of test Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 executors and my Rating data set has 20 partitions. It works. In order to increase parallelism, I did 100 partitions instead of 20 and now program does not work and it throws out of memory error. Query a): As I had 4 cores on each machine , but my number of partitions are 10 in each executor and my cores are not sufficient for partitions. Is it supposed to give memory errors when this kind of misconfiguration.If there are not sufficient cores and processing cannot be done in parallel, can different partitions not be processed sequentially and operation could have become slow rather than throwing memory error. Query b) If it gives error, then error message is not meaningful Here my DAG was very simple and I could trace that lowering number of partitions is working, but if on misconfiguration of cores it throws error, then how to debug it in complex DAGs as error does not tell explicitly that problem could be due to low number of cores. If my understanding is incorrect, then kindly explain the reasons of error in this case Thanks and Regards Aniruddh
RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data
Are you sure you have actually increased the RAM (how exactly did you do that and does it show in Spark UI) Also use the SPARK UI and the driver console to check the RAM allocated for each RDD and RDD partion in each of the scenarios Re b) the general rule is num of partitions = 2 x num of CPU cores All partitions are operated in parallel (by independently running JVM Threads), however if you have substantially higher num of partitions (JVM Threads) than num of core then you will get what happens in any JVM or OS – there will be switching between the Threads and some of them will be in a suspended mode waiting for free core (Thread contexts also occupy additional RAM ) From: Aniruddh Sharma [mailto:asharma...@gmail.com] Sent: Wednesday, July 8, 2015 12:52 PM To: Evo Eftimov Subject: Re: Out of Memory Errors on less number of cores in proportion to Partitions in Data Thanks for your revert... I increased executor memory from 4GB to 35 GB and still out of memory error happens. So it seems it may not be entirely due to more buffers due to more partitions. Query a) Is there a way to debug at more granular level from user code perspective where things could go wrong. Query b) In general my query is lets suppose it is not ALS (or some iterative algorithm). Lets say it is some sample RDD but which 1 partitions and each executor has 50 partitions and each machine has 4 physical cores.So do 4 physical cores parallely try to process these 50 partitions (doing multitasking) or will it work in a way that 4 cores will first process first 4 partitions and then next 4 partitions and so on. Thanks and Regards Aniruddh On Wed, Jul 8, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote: This is most likely due to the internal implementation of ALS in MLib. Probably for each parallel unit of execution (partition in Spark terms) the implementation allocates and uses a RAM buffer where it keeps interim results during the ALS iterations If we assume that the size of that internal RAM buffer is fixed per Unit of Execution then Total RAM (20 partitions x fixed RAM buffer) Total RAM (100 partitions x fixed RAM buffer) From: Aniruddh Sharma [mailto:asharma...@gmail.com] Sent: Wednesday, July 8, 2015 12:22 PM To: user@spark.apache.org Subject: Out of Memory Errors on less number of cores in proportion to Partitions in Data Hi, I am new to Spark. I have done following tests and I am confused in conclusions. I have 2 queries. Following is the detail of test Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 executors and my Rating data set has 20 partitions. It works. In order to increase parallelism, I did 100 partitions instead of 20 and now program does not work and it throws out of memory error. Query a): As I had 4 cores on each machine , but my number of partitions are 10 in each executor and my cores are not sufficient for partitions. Is it supposed to give memory errors when this kind of misconfiguration.If there are not sufficient cores and processing cannot be done in parallel, can different partitions not be processed sequentially and operation could have become slow rather than throwing memory error. Query b) If it gives error, then error message is not meaningful Here my DAG was very simple and I could trace that lowering number of partitions is working, but if on misconfiguration of cores it throws error, then how to debug it in complex DAGs as error does not tell explicitly that problem could be due to low number of cores. If my understanding is incorrect, then kindly explain the reasons of error in this case Thanks and Regards Aniruddh
RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data
Also try to increase the number of partions gradually – not in one big jump from 20 to 100 but adding e.g. 10 at a time and see whether there is a correlation with adding more RAM to the executors From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Wednesday, July 8, 2015 1:26 PM To: 'Aniruddh Sharma'; 'user@spark.apache.org' Subject: RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data Are you sure you have actually increased the RAM (how exactly did you do that and does it show in Spark UI) Also use the SPARK UI and the driver console to check the RAM allocated for each RDD and RDD partion in each of the scenarios Re b) the general rule is num of partitions = 2 x num of CPU cores All partitions are operated in parallel (by independently running JVM Threads), however if you have substantially higher num of partitions (JVM Threads) than num of core then you will get what happens in any JVM or OS – there will be switching between the Threads and some of them will be in a suspended mode waiting for free core (Thread contexts also occupy additional RAM ) From: Aniruddh Sharma [mailto:asharma...@gmail.com] Sent: Wednesday, July 8, 2015 12:52 PM To: Evo Eftimov Subject: Re: Out of Memory Errors on less number of cores in proportion to Partitions in Data Thanks for your revert... I increased executor memory from 4GB to 35 GB and still out of memory error happens. So it seems it may not be entirely due to more buffers due to more partitions. Query a) Is there a way to debug at more granular level from user code perspective where things could go wrong. Query b) In general my query is lets suppose it is not ALS (or some iterative algorithm). Lets say it is some sample RDD but which 1 partitions and each executor has 50 partitions and each machine has 4 physical cores.So do 4 physical cores parallely try to process these 50 partitions (doing multitasking) or will it work in a way that 4 cores will first process first 4 partitions and then next 4 partitions and so on. Thanks and Regards Aniruddh On Wed, Jul 8, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote: This is most likely due to the internal implementation of ALS in MLib. Probably for each parallel unit of execution (partition in Spark terms) the implementation allocates and uses a RAM buffer where it keeps interim results during the ALS iterations If we assume that the size of that internal RAM buffer is fixed per Unit of Execution then Total RAM (20 partitions x fixed RAM buffer) Total RAM (100 partitions x fixed RAM buffer) From: Aniruddh Sharma [mailto:asharma...@gmail.com] Sent: Wednesday, July 8, 2015 12:22 PM To: user@spark.apache.org Subject: Out of Memory Errors on less number of cores in proportion to Partitions in Data Hi, I am new to Spark. I have done following tests and I am confused in conclusions. I have 2 queries. Following is the detail of test Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 executors and my Rating data set has 20 partitions. It works. In order to increase parallelism, I did 100 partitions instead of 20 and now program does not work and it throws out of memory error. Query a): As I had 4 cores on each machine , but my number of partitions are 10 in each executor and my cores are not sufficient for partitions. Is it supposed to give memory errors when this kind of misconfiguration.If there are not sufficient cores and processing cannot be done in parallel, can different partitions not be processed sequentially and operation could have become slow rather than throwing memory error. Query b) If it gives error, then error message is not meaningful Here my DAG was very simple and I could trace that lowering number of partitions is working, but if on misconfiguration of cores it throws error, then how to debug it in complex DAGs as error does not tell explicitly that problem could be due to low number of cores. If my understanding is incorrect, then kindly explain the reasons of error in this case Thanks and Regards Aniruddh
RE: foreachRDD vs. forearchPartition ?
That was a) fuzzy b) insufficient – one can certainly use forach (only) on DStream RDDs – it works as empirical observation As another empirical observation: For each partition results in having one instance of the lambda/closure per partition when e.g. publishing to output systems like message brokers, databases and file systems - that increases the level of parallelism of your output processing As an architect I deal with gazillions of products and don’t have time to read the source code of all of them to make up for documentation deficiencies. On the other hand I believe you have been involved in writing some of the code so be a good boy and either answer this question properly or enhance the product documentation of that area of the system From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, July 8, 2015 2:52 PM To: dgoldenberg; user@spark.apache.org Subject: Re: foreachRDD vs. forearchPartition ? These are quite different operations. One operates on RDDs in DStream and one operates on partitions of an RDD. They are not alternatives. On Wed, Jul 8, 2015, 2:43 PM dgoldenberg dgoldenberg...@gmail.com wrote: Is there a set of best practices for when to use foreachPartition vs. foreachRDD? Is it generally true that using foreachPartition avoids some of the over-network data shuffling overhead? When would I definitely want to use one method vs. the other? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE:
spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL re “A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.” I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” messages from specific sliding window and adding them to the Batch RDD This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency Can you also elaborate why you consider the dstream.window approach more “reliable” From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 12:56 PM To: Anand Nalya Cc: spark users Subject: Re: Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
RE:
Requirements – then see my abstracted interpretation – what else do you need in terms of Requirements …: “Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:” A formal requirements spec derived from the above - I think the actual requirement here is picking up and adding Specific (filtered) Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” (on top of that all) messages from sliding window and adding them to the Batch RDD. Such requiremet should be defined as the Frequency of Updates to the Batch RDD and what these updates are e.g. specific filtered messages and then using dstream.window() can be made equal to that frequency Essentialy the update frequency can range from the filtered messages of Every Single DStream RDD to the filetered messages of a SLIDING WINDOW Secondly what do you call “mutable uniniong” That was his initial code var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } Here is how it looks when Persisting the result from evet union – supposed to produce NEW PERSTINET IMMUTABLE Batch RDD – why is that supposed to be less “stable/reliable” – what are the exact tectnical reasons for that var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)).cashe() } From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 1:55 PM To: Evo Eftimov Cc: Anand Nalya; spark users Subject: Re: Evo, I'd let the OP clarify the question. I'm not in position of clarifying his requirements beyond what's written on the question. Regarding window vs mutable union: window is a well-supported feature that accumulates messages over time. The mutable unioning of RDDs is bound to operational trouble as there're no warranties tied to data preservation and it's unclear how one can produce 'cuts' of that union ready to be served for some process/computation. Intuitively, it will 'explode' at some point. -kr, Gerard. On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL re “A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.” I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” messages from specific sliding window and adding them to the Batch RDD This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency Can you also elaborate why you consider the dstream.window approach more “reliable” From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 12:56 PM To: Anand Nalya Cc: spark users Subject: Re: Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around
R on spark
I had a look at the new R on Spark API / Feature in Spark 1.4.0 For those skilled in the art (of R and distributed computing) it will be immediately clear that ON is a marketing ploy and what it actually is is TO ie Spark 1.4.0 offers INTERFACE from R TO DATA stored in Spark in distributed fashion and some distributed queries which can be initiated FROM R and run on that data within Spark - these are essentially certain types of SQL style queries In order to deserve the ON label, RSpark has to be able to run ON Spark most of the Statistical Analysis and Machine Learning Algos as found in the R engine. This is absolutely not the case at the moment. As an example of what type of Solution/Architecture I am referring to you can review Revolution Analytics (recently acquired by Microsoft) and some other open source frameworks for running R ON distributed clusters -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/R-on-spark-tp23512.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming: limit number of nodes
Ok so you are running Spark in a Standalone Mode then Then for every Worker process on every node (you can run more than one Worker per node) you will have an Executor waiting for jobs …. As far as I am concerned I think there are only two ways to achieve what you need: 1. Simply shutdown the spark worker processes / demons on the nodes you want to keep free from spark workloads OR run two separate Spark clusters one with e.g. 2 workers and one with e..g 5 workers – small jobs go to cluster 1 and big jobs to cluster 2 2. Try to set spark.executor.cores BUT that limits the number of cores per Executor rather than the total cores for the job and hence will probably not yield the effect you need From: Wojciech Pituła [mailto:w.pit...@gmail.com] Sent: Wednesday, June 24, 2015 10:49 AM To: Evo Eftimov; user@spark.apache.org Subject: Re: Spark Streaming: limit number of nodes Ok, thanks. I have 1 worker process on each machine but I would like to run my app on only 3 of them. Is it possible? śr., 24.06.2015 o 11:44 użytkownik Evo Eftimov evo.efti...@isecc.com napisał: There is no direct one to one mapping between Executor and Node Executor is simply the spark framework term for JVM instance with some spark framework system code running in it A node is a physical server machine You can have more than one JVM per node And vice versa you can have Nodes without any JVM running on them. How? BY specifying the number of executors to be less than the number of nodes So if you specify number of executors to be 1 and you have 5 nodes, ONE executor will run on only one of them The above is valid for Spark on YARN For spark in standalone mode the number of executors is equal to the number of spark worker processes (daemons) running on each node From: Wojciech Pituła [mailto:w.pit...@gmail.com] Sent: Tuesday, June 23, 2015 12:38 PM To: user@spark.apache.org Subject: Spark Streaming: limit number of nodes I have set up small standalone cluster: 5 nodes, every node has 5GB of memory an 8 cores. As you can see, node doesn't have much RAM. I have 2 streaming apps, first one is configured to use 3GB of memory per node and second one uses 2GB per node. My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 so I could lanuch third app. Is it possible to limit number of nodes(executors) that app wil get from standalone cluster?
RE: Spark Streaming: limit number of nodes
There is no direct one to one mapping between Executor and Node Executor is simply the spark framework term for JVM instance with some spark framework system code running in it A node is a physical server machine You can have more than one JVM per node And vice versa you can have Nodes without any JVM running on them. How? BY specifying the number of executors to be less than the number of nodes So if you specify number of executors to be 1 and you have 5 nodes, ONE executor will run on only one of them The above is valid for Spark on YARN For spark in standalone mode the number of executors is equal to the number of spark worker processes (daemons) running on each node From: Wojciech Pituła [mailto:w.pit...@gmail.com] Sent: Tuesday, June 23, 2015 12:38 PM To: user@spark.apache.org Subject: Spark Streaming: limit number of nodes I have set up small standalone cluster: 5 nodes, every node has 5GB of memory an 8 cores. As you can see, node doesn't have much RAM. I have 2 streaming apps, first one is configured to use 3GB of memory per node and second one uses 2GB per node. My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 so I could lanuch third app. Is it possible to limit number of nodes(executors) that app wil get from standalone cluster?
RE: Web UI vs History Server Bugs
Probably your application has crashed or was terminated without invoking the stop method of spark context - in such cases it doesn't create the empty flag file which apparently tells the history server that it can safely show the log data - simpy go to some of the other dirs of the history server to see what the name of the flag file was and then create it manually in the dirs of the missing apps - then they will appear in the history server ui From: Steve Loughran [mailto:ste...@hortonworks.com] Sent: Monday, June 22, 2015 7:22 PM To: Jonathon Cai Cc: user@spark.apache.org Subject: Re: Web UI vs History Server Bugs well, I'm afraid you've reached the limits of my knowledge ... hopefully someone else can answer On 22 Jun 2015, at 16:37, Jonathon Cai jonathon@yale.edu wrote: No, what I'm seeing is that while the cluster is running, I can't see the app info after the app is completed. That is to say, when I click on the application name on master:8080, no info is shown. However, when I examine the same file on the History Server, the application information opens fine. On Sat, Jun 20, 2015 at 6:47 AM, Steve Loughran ste...@hortonworks.com wrote: On 17 Jun 2015, at 19:10, jcai jonathon@yale.edu wrote: Hi, I am running this on Spark stand-alone mode. I find that when I examine the web UI, a couple bugs arise: 1. There is a discrepancy between the number denoting the duration of the application when I run the history server and the number given by the web UI (default address is master:8080). I checked more specific details, including task and stage durations (when clicking on the application), and these appear to be the same for both avenues. 2. Sometimes the web UI on master:8080 is unable to display more specific information for an application that has finished (when clicking on the application), even when there is a log file in the appropriate directory. But when the history server is opened, it is able to read this file and output information. There's a JIRA open on the history server caching incomplete work...if you click on the link to a job while it's in progress, you don't get any updates later. does this sound like what you are seeing?
Spark Streaming 1.3.0 ERROR LiveListenerBus
Spark Streaming 1.3.0 on YARN during Job Execution keeps generating the following error while the application is running: ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException etc etc Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) This exception does NOT make the streaming job fail. Secondly, the streaming job does NOT perform ANY operations with HDFS and is really a basic streaming job to test a new Spark 1.3.0 deployment on CDH 5.4 The above exception is thrown by the Spark framework itself. I have seen some posts related to that exception which are about Spark Batch and confirmed as bug in spark which however is not critical. And btw this error appears well before any attempted stop of the spark streaming context and hence is not related to trying to stop the context Does anybody know how to get rid off this in Spark Streaming 1.3.0 on YARN on CDH 5.4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-1-3-0-ERROR-LiveListenerBus-tp23411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Machine Learning on GraphX
What is GraphX: - It can be viewed as a kind of Distributed, Parallel, Graph Database - It can be viewed as Graph Data Structure (Data Structures 101 from your CS course) - It features some off the shelve algos for Graph Processing and Navigation (Algos and Data Structures 101) and the implementation of these takes advantage of the distributed parallel nature of GrapphX Any of the MLib algos can be applied to ANY data structure from time series to graph to matrix/tabular etc – it is up to your needs and imagination As an example – Clustering – you can apply it to Graph Data Structure BUT you may also leverage the Graph inherent connection/clustering properties and Graph algos taking advantage of that Instead of e.g. the run of the mill K-Means which is ok for te.g. time series, matrix etc data structures From: Timothée Rebours [mailto:t.rebo...@gmail.com] Sent: Thursday, June 18, 2015 10:44 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Machine Learning on GraphX Thanks for the quick answer. I've already followed this tutorial but it doesn't use GraphX at all. My goal would be to work directly on the graph, and not extracting edges and vertices from the graph as standard RDDs and then work on that with the standard MLlib's ALS, which has no interest. That's why I tried with the other implementation, but it's not optimized at all. I might have gone in the wrong direction with the ALS, but I'd like to see what's possible to do with MLlib on GraphX. Any idea ? 2015-06-18 11:19 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: This might give you a good start http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html its a bit old though. Thanks Best Regards On Thu, Jun 18, 2015 at 2:33 PM, texol t.rebo...@gmail.com wrote: Hi, I'm new to GraphX and I'd like to use Machine Learning algorithms on top of it. I wanted to write a simple program implementing MLlib's ALS on a bipartite graph (a simple movie recommendation), but didn't succeed. I found an implementation on Spark 1.1.x (https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala) of ALS on GraphX, but it is painfully slow compared to the standard implementation, and uses the deprecated (in the current version) PregelVertex class. Do we expect a new implementation ? Is there a smarter solution to do so ? Thanks, Regards, Timothée Rebours. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-GraphX-tp23388.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Timothée Rebours 13, rue Georges Bizet 78380 BOUGIVAL
RE: Spark or Storm
The only thing which doesn't make much sense in Spark Streaming (and I am not saying it is done better in Storm) is the iterative and redundant shipping of the essentially the same tasks (closures/lambdas/functions) to the cluster nodes AND re-launching them there again and again This is a legacy from Spark Batch where such approach DOES make sense So to recap - in Spark Streaming, the driver keeps serializing and transmitting the same Tasks (comprising a Job) for every new DStream RDD, which then get re-launched ie new JVM Threads launched within each Executor (JVM) and then the tasks report their final execution status to the driver (only the last has real functional purpose) An optimization (provided Spark Streaming was implemented from scratch) could be to launch the Stages/Tasks of a Streaming Job as constantly running Threads (Demons/Agents) within the Executors and leave the DStream RDD stream through them as only the final execution status for each DSTream RDD and some periodical heartbeats (of the Agents) are reported to the driver Essentially this gives you are Pipeline Architecture (of stringed Agents) which is a well known Parallel Programming Patterns especially suitable for streaming data From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, June 17, 2015 7:14 PM To: Enno Shioji Cc: Ashish Soni; ayan guha; Sabarish Sasidharan; Spark Enthusiast; Will Briggs; user; Sateesh Kavuri Subject: Re: Spark or Storm This documentation is only for writes to an external system, but all the counting you do within your streaming app (e.g. if you use reduceByKeyAndWindow to keep track of a running count) is exactly-once. When you write to a storage system, no matter which streaming framework you use, you'll have to make sure the writes are idempotent, because the storage system can't know whether you meant to write the same data again or not. But the place where Spark Streaming helps over Storm, etc is for tracking state within your computation. Without that facility, you'd not only have to make sure that writes are idempotent, but you'd have to make sure that updates to your own internal state (e.g. reduceByKeyAndWindow) are exactly-once too. Matei On Jun 17, 2015, at 8:26 AM, Enno Shioji eshi...@gmail.com wrote: The thing is, even with that improvement, you still have to make updates idempotent or transactional yourself. If you read http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-t olerance-semantics that refers to the latest version, it says: Semantics of output operations Output operations (like foreachRDD) have at-least once semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to file systems using the saveAs***Files operations (as the file will simply get overwritten with the same data), additional effort may be necessary to achieve exactly-once semantics. There are two approaches. . Idempotent updates: Multiple attempts always write the same data. For example, saveAs***Files always writes the same data to the generated files. . Transactional updates: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following. o Use the batch time (available in foreachRDD) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application. o Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update. So either you make the update idempotent, or you have to make it transactional yourself, and the suggested mechanism is very similar to what Storm does. On Wed, Jun 17, 2015 at 3:51 PM, Ashish Soni asoni.le...@gmail.com wrote: @Enno As per the latest version and documentation Spark Streaming does offer exactly once semantics using improved kafka integration , Not i have not tested yet. Any feedback will be helpful if anyone is tried the same. http://koeninger.github.io/kafka-exactly-once/#7 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of- spark-streaming.html On Wed, Jun 17, 2015 at 10:33 AM, Enno Shioji eshi...@gmail.com wrote: AFAIK KCL is *supposed* to provide fault tolerance and load balancing (plus additionally, elastic scaling unlike Storm), Kinesis providing the coordination. My understanding is that it's like a naked Storm worker process that can consequently only do map. I haven't really used it tho, so can't really comment how it compares to Spark/Storm. Maybe somebody else will be able to comment. On Wed, Jun 17, 2015 at 3:13 PM, ayan guha
RE: stop streaming context of job failure
https://spark.apache.org/docs/latest/monitoring.html also subscribe to various Listeners for various Metrcis Types e.g. Job Stats/Statuses - this will allow you (in the driver) to decide when to stop the context gracefully (the listening and stopping can be done from a completely separate thread in the driver) https://spark.apache.org/docs/latest/api/java/ org.apache.spark.ui.jobs Class JobProgressListener · Object · · org.apache.spark.ui.jobs.JobProgressListener · All Implemented Interfaces: Logging https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html , SparkListener https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html _ public class JobProgressListener extends Object implements SparkListener https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html , Logging https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html :: DeveloperApi :: Tracks task-level information to be displayed in the UI. All access to the data structures in this class must be synchronized on the class, since the UI thread and the EventBus loop may otherwise be reading and updating the internal data structures concurrently. · · From: Krot Viacheslav [mailto:krot.vyaches...@gmail.com] Sent: Tuesday, June 16, 2015 2:35 PM To: user@spark.apache.org Subject: stop streaming context of job failure Hi all, Is there a way to stop streaming context when some batch processing failed? I want to set reasonable reties count, say 10, and if failed - stop context completely. Is that possible?
RE: How does one decide no of executors/cores/memory allocation?
Best is by measuring and recording how The Performance of your solution scales as The Workload scales - recording As In Data Points recording and then you can do some times series stat analysis and visualizations For example you can start with a single box with e.g. 8 CPU cores Use e.g. 1 or two partitions and 1 executor which would correspond to 1 CPU Core (JVM Thread) processing your workload - scale the workload and see how the performance scales and record all data points Then re[eat the same for more cpu cores, ram and boxes - you get the idea? Then analyze your performance datasets in the way explained Basically this stuff is known as Performance Engineering and has nothing to do with specific product - read something on PE as well -Original Message- From: shreesh [mailto:shreesh.la...@mail.com] Sent: Tuesday, June 16, 2015 4:22 PM To: user@spark.apache.org Subject: Re: How does one decide no of executors/cores/memory allocation? I realize that there are a lot of ways to configure my application in spark. The part that is not clear is that how do I decide say for example in how many partitions should I divide my data or how much ram should I have or how many workers should one initialize? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-o f-executors-cores-memory-allocation-tp23326p23339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Join between DStream and Periodically-Changing-RDD
“turn (keep turning) your HDFS file (Batch RDD) into a stream of messages (outside spark streaming)” – what I meant by that was “turn the Updates to your HDFS dataset into Messages” and send them as such to spark streaming From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, June 15, 2015 8:38 AM To: 'Ilove Data'; 'Tathagata Das' Cc: 'Akhil Das'; 'user' Subject: RE: Join between DStream and Periodically-Changing-RDD Then go for the second option I suggested - simply turn (keep turning) your HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – then spark streaming consumes and aggregates the messages FOR THE RUNTIME LIFETIME of your application in some of the following ways: 1. Continuous Union of DStream RDDs as you also Persist the result (so it doesn’t not get discarded whioch is what happens to DStream RDDs by default in spark streaming) 2. Apply one of the Window operations e.g. aggregation on the DSream RDD – as the window is the runtime lifetime of the app And at the same time you join the DStream RDDs of your actual Streaming Data with the above continuously updated DStream RDD representing your HDFS file From: Ilove Data [mailto:data4...@gmail.com] Sent: Monday, June 15, 2015 5:19 AM To: Tathagata Das Cc: Evo Eftimov; Akhil Das; user Subject: Re: Join between DStream and Periodically-Changing-RDD @Akhil Das Join two Dstreams might not be an option since I want to join stream with historical data in HDFS folder. @Tagatha Das @Evo Eftimov Batch RDD to be reloaded is considerably huge compare to Dstream data since it is historical data. To be more specific, most of join from rdd stream to hdfs folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) in hdfs folder. So it is important to get the most updated data. Is there a workaround for that specific case? Since RDD are not mutable, do I need a K-V database for this join with historical data? On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das t...@databricks.com wrote: Another approach not mentioned is to use a function to get the RDD that is to be joined. Something like this. Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val rdd = getOrUpdateRDD(params...) rdd.join(kvFile) }) The getOrUpdateRDD() function that you implement will get called every batch interval. And you can decide to return the same RDD or an updated RDD when you want to. Once updated, if the RDD is going to be used in multiple batch intervals, you should cache it. Furthermore, if you are going to join it, you should partition it by a partitioner, then cached it and make sure that the same partitioner is used for joining. That would be more efficient, as the RDD will stay partitioned in memory, minimizing the cost of join. On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote: It depends on how big the Batch RDD requiring reloading is Reloading it for EVERY single DStream RDD would slow down the stream processing inline with the total time required to reload the Batch RDD ….. But if the Batch RDD is not that big then that might not be an issues especially in the context of the latency requirements for your streaming app Another more efficient and real-time approach may be to represent your Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark streaming app instance and keep joining with the actual Dstream RDDs You can feed your HDFS file into a Message Broker topic and consume it from there in the form of DStream RDDs which you keep aggregating over the lifetime of the spark streaming app instance From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, June 10, 2015 8:36 AM To: Ilove Data Cc: user@spark.apache.org Subject: Re: Join between DStream and Periodically-Changing-RDD RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
RE: Join between DStream and Periodically-Changing-RDD
Then go for the second option I suggested - simply turn (keep turning) your HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – then spark streaming consumes and aggregates the messages FOR THE RUNTIME LIFETIME of your application in some of the following ways: 1. Continuous Union of DStream RDDs as you also Persist the result (so it doesn’t not get discarded whioch is what happens to DStream RDDs by default in spark streaming) 2. Apply one of the Window operations e.g. aggregation on the DSream RDD – as the window is the runtime lifetime of the app And at the same time you join the DStream RDDs of your actual Streaming Data with the above continuously updated DStream RDD representing your HDFS file From: Ilove Data [mailto:data4...@gmail.com] Sent: Monday, June 15, 2015 5:19 AM To: Tathagata Das Cc: Evo Eftimov; Akhil Das; user Subject: Re: Join between DStream and Periodically-Changing-RDD @Akhil Das Join two Dstreams might not be an option since I want to join stream with historical data in HDFS folder. @Tagatha Das @Evo Eftimov Batch RDD to be reloaded is considerably huge compare to Dstream data since it is historical data. To be more specific, most of join from rdd stream to hdfs folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) in hdfs folder. So it is important to get the most updated data. Is there a workaround for that specific case? Since RDD are not mutable, do I need a K-V database for this join with historical data? On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das t...@databricks.com wrote: Another approach not mentioned is to use a function to get the RDD that is to be joined. Something like this. Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val rdd = getOrUpdateRDD(params...) rdd.join(kvFile) }) The getOrUpdateRDD() function that you implement will get called every batch interval. And you can decide to return the same RDD or an updated RDD when you want to. Once updated, if the RDD is going to be used in multiple batch intervals, you should cache it. Furthermore, if you are going to join it, you should partition it by a partitioner, then cached it and make sure that the same partitioner is used for joining. That would be more efficient, as the RDD will stay partitioned in memory, minimizing the cost of join. On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote: It depends on how big the Batch RDD requiring reloading is Reloading it for EVERY single DStream RDD would slow down the stream processing inline with the total time required to reload the Batch RDD ….. But if the Batch RDD is not that big then that might not be an issues especially in the context of the latency requirements for your streaming app Another more efficient and real-time approach may be to represent your Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark streaming app instance and keep joining with the actual Dstream RDDs You can feed your HDFS file into a Message Broker topic and consume it from there in the form of DStream RDDs which you keep aggregating over the lifetime of the spark streaming app instance From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, June 10, 2015 8:36 AM To: Ilove Data Cc: user@spark.apache.org Subject: Re: Join between DStream and Periodically-Changing-RDD RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
RE: Join between DStream and Periodically-Changing-RDD
It depends on how big the Batch RDD requiring reloading is Reloading it for EVERY single DStream RDD would slow down the stream processing inline with the total time required to reload the Batch RDD ….. But if the Batch RDD is not that big then that might not be an issues especially in the context of the latency requirements for your streaming app Another more efficient and real-time approach may be to represent your Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark streaming app instance and keep joining with the actual Dstream RDDs You can feed your HDFS file into a Message Broker topic and consume it from there in the form of DStream RDDs which you keep aggregating over the lifetime of the spark streaming app instance From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, June 10, 2015 8:36 AM To: Ilove Data Cc: user@spark.apache.org Subject: Re: Join between DStream and Periodically-Changing-RDD RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
Re: Determining number of executors within RDD
Yes i think it is ONE worker ONE executor as executor is nothing but jvm instance spawned by the worker To run more executors ie jvm instances on the same physical cluster node you need to run more than one worker on that node and then allocate only part of the sys resourced to that worker/executot Sent from Samsung Mobile div Original message /divdivFrom: maxdml max...@cs.duke.edu /divdivDate:2015/06/10 19:56 (GMT+00:00) /divdivTo: user@spark.apache.org /divdivSubject: Re: Determining number of executors within RDD /divdiv /divActually this is somehow confusing for two reasons: - First, the option 'spark.executor.instances', which seems to be only dealt with in the case of YARN in the source code of SparkSubmit.scala, is also present in the conf/spark-env.sh file under the standalone section, which would indicate that it is also available for this mode - Second, a post from Andrew Or states that this properties define the number of workers in the cluster, not the number of executors on a given worker. (http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html) Could anyone clarify this? :-) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to share large resources like dictionaries while processing data with Spark ?
Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark Batch Jobs (besides anyone can put something like that in 5 min), while I am under the impression that Dmytiy is working on Spark Streaming app Besides the Job Server is essentially for sharing the Spark Context between multiple threads Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD from any Spark Streaming App and then join DStream RDDs against them to emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub project From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Friday, June 5, 2015 12:12 AM To: Yiannis Gkoufas Cc: Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks so much, Yiannis, Olivier, Huang! On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to share large resources like dictionaries while processing data with Spark ?
And RDD.lookup() can not be invoked from Transformations e.g. maps Lookup() is an action which can be invoked only from the driver – if you want functionality like that from within Transformations executed on the cluster nodes try Indexed RDD Other options are load a Batch / Static RDD once in your Spark Streaming App and then keep joining and then e.g. filtering every incoming DStream RDD with the (big static) Batch RDD From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, June 5, 2015 3:27 PM To: 'Dmitry Goldenberg' Cc: 'Yiannis Gkoufas'; 'Olivier Girardot'; 'user@spark.apache.org' Subject: RE: How to share large resources like dictionaries while processing data with Spark ? It is called Indexed RDD https://github.com/amplab/spark-indexedrdd From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Friday, June 5, 2015 3:15 PM To: Evo Eftimov Cc: Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks everyone. Evo, could you provide a link to the Lookup RDD project? I can't seem to locate it exactly on Github. (Yes, to your point, our project is Spark streaming based). Thank you. On Fri, Jun 5, 2015 at 6:04 AM, Evo Eftimov evo.efti...@isecc.com wrote: Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark Batch Jobs (besides anyone can put something like that in 5 min), while I am under the impression that Dmytiy is working on Spark Streaming app Besides the Job Server is essentially for sharing the Spark Context between multiple threads Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD from any Spark Streaming App and then join DStream RDDs against them to emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub project From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Friday, June 5, 2015 12:12 AM To: Yiannis Gkoufas Cc: Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks so much, Yiannis, Olivier, Huang! On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to share large resources like dictionaries while processing data with Spark ?
It is called Indexed RDD https://github.com/amplab/spark-indexedrdd From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Friday, June 5, 2015 3:15 PM To: Evo Eftimov Cc: Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks everyone. Evo, could you provide a link to the Lookup RDD project? I can't seem to locate it exactly on Github. (Yes, to your point, our project is Spark streaming based). Thank you. On Fri, Jun 5, 2015 at 6:04 AM, Evo Eftimov evo.efti...@isecc.com wrote: Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark Batch Jobs (besides anyone can put something like that in 5 min), while I am under the impression that Dmytiy is working on Spark Streaming app Besides the Job Server is essentially for sharing the Spark Context between multiple threads Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD from any Spark Streaming App and then join DStream RDDs against them to emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub project From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Friday, June 5, 2015 12:12 AM To: Yiannis Gkoufas Cc: Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks so much, Yiannis, Olivier, Huang! On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming for Each RDD - Exception on Empty
Foreachpartition callback is provided with Iterator by the Spark Frameowrk – while iterator.hasNext() …… Also check whether this is not some sort of Python Spark API bug – Python seems to be the foster child here – Scala and Java are the darlings From: John Omernik [mailto:j...@omernik.com] Sent: Friday, June 5, 2015 4:08 PM To: user Subject: Spark Streaming for Each RDD - Exception on Empty Is there pythonic/sparkonic way to test for an empty RDD before using the foreachRDD? Basically I am using the Python example https://spark.apache.org/docs/latest/streaming-programming-guide.html to put records somewhere When I have data, it works fine, when I don't I get an exception. I am not sure about the performance implications of just throwing an exception every time there is no data, but can I just test before sending it? I did see one post mentioning look for take(1) from the stream to test for data, but I am not sure where I put that in this example... Is that in the lambda function? or somewhere else? Looking for pointers! Thanks! mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(parseRDD)) Using this example code from the link above: def sendPartition(iter): connection = createNewConnection() for record in iter: connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
RE: How to increase the number of tasks
It may be that your system runs out of resources (ie 174 is the ceiling) due to the following 1. RDD Partition = (Spark) Task 2. RDD Partition != (Spark) Executor 3. (Spark) Task != (Spark) Executor 4. (Spark) Task = JVM Thread 5. (Spark) Executor = JVM instance From: ÐΞ€ρ@Ҝ (๏̯͡๏) [mailto:deepuj...@gmail.com] Sent: Friday, June 5, 2015 10:48 AM To: user Subject: How to increase the number of tasks I have a stage that spawns 174 tasks when i run repartition on avro data. Tasks read between 512/317/316/214/173 MB of data. Even if i increase number of executors/ number of partitions (when calling repartition) the number of tasks launched remains fixed to 174. 1) I want to speed up this task. How do i do it ? 2) Few tasks finish in 20 mins, few in 15 and few in less than 10. Why is this behavior ? Since this is a repartition stage, it should not depend on the nature of data. Its taking more than 30 mins and i want to speed it up by throwing more executors at it. Please suggest Deepak
RE: How to increase the number of tasks
The param is for “Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when NOT set by user.” While Deepak is setting the number of partitions EXPLICITLY From: 李铖 [mailto:lidali...@gmail.com] Sent: Friday, June 5, 2015 11:08 AM To: ÐΞ€ρ@Ҝ (๏̯͡๏) Cc: Evo Eftimov; user Subject: Re: How to increase the number of tasks just multiply 2-4 with the cpu core number of the node . 2015-06-05 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: I did not change spark.default.parallelism, What is recommended value for it. On Fri, Jun 5, 2015 at 3:31 PM, 李铖 lidali...@gmail.com wrote: Did you have a change of the value of 'spark.default.parallelism'?be a bigger number. 2015-06-05 17:56 GMT+08:00 Evo Eftimov evo.efti...@isecc.com: It may be that your system runs out of resources (ie 174 is the ceiling) due to the following 1. RDD Partition = (Spark) Task 2. RDD Partition != (Spark) Executor 3. (Spark) Task != (Spark) Executor 4. (Spark) Task = JVM Thread 5. (Spark) Executor = JVM instance From: ÐΞ€ρ@Ҝ (๏̯͡๏) [mailto:deepuj...@gmail.com] Sent: Friday, June 5, 2015 10:48 AM To: user Subject: How to increase the number of tasks I have a stage that spawns 174 tasks when i run repartition on avro data. Tasks read between 512/317/316/214/173 MB of data. Even if i increase number of executors/ number of partitions (when calling repartition) the number of tasks launched remains fixed to 174. 1) I want to speed up this task. How do i do it ? 2) Few tasks finish in 20 mins, few in 15 and few in less than 10. Why is this behavior ? Since this is a repartition stage, it should not depend on the nature of data. Its taking more than 30 mins and i want to speed it up by throwing more executors at it. Please suggest Deepak -- Deepak
RE: How to share large resources like dictionaries while processing data with Spark ?
Spark uses Tachyon internally ie all SERIALIZED IN-MEMORY RDDs are kept there – so if you have a BATCH RDD which is SERIALIZED IN_MEMORY then you are using Tachyon implicitly – the only difference is that if you are using Tachyon explicitly ie as a distributed, in-memory file system you can share data between Jobs, while an RDD is ALWAYS visible within Jobs using the same Spark Context From: Charles Earl [mailto:charles.ce...@gmail.com] Sent: Friday, June 5, 2015 12:10 PM To: Evo Eftimov Cc: Dmitry Goldenberg; Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Would tachyon be appropriate here? On Friday, June 5, 2015, Evo Eftimov evo.efti...@isecc.com wrote: Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark Batch Jobs (besides anyone can put something like that in 5 min), while I am under the impression that Dmytiy is working on Spark Streaming app Besides the Job Server is essentially for sharing the Spark Context between multiple threads Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD from any Spark Streaming App and then join DStream RDDs against them to emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub project From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com javascript:_e(%7B%7D,'cvml','dgoldenberg...@gmail.com'); ] Sent: Friday, June 5, 2015 12:12 AM To: Yiannis Gkoufas Cc: Olivier Girardot; user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); Subject: Re: How to share large resources like dictionaries while processing data with Spark ? Thanks so much, Yiannis, Olivier, Huang! On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com javascript:_e(%7B%7D,'cvml','johngou...@gmail.com'); wrote: Hi there, I would recommend checking out https://github.com/spark-jobserver/spark-jobserver which I think gives the functionality you are looking for. I haven't tested it though. BR On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com javascript:_e(%7B%7D,'cvml','ssab...@gmail.com'); wrote: You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com javascript:_e(%7B%7D,'cvml','dgoldenberg...@gmail.com'); a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); For additional commands, e-mail: user-h...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org'); -- - Charles
RE: Objects serialized before foreachRDD/foreachPartition ?
Dmitry was concerned about the “serialization cost” NOT the “memory footprint – hence option a) is still viable since a Broadcast is performed only ONCE for the lifetime of Driver instance From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, June 3, 2015 2:44 PM To: Evo Eftimov Cc: dgoldenberg; user Subject: Re: Objects serialized before foreachRDD/foreachPartition ? Considering memory footprint of param as mentioned by Dmitry, option b seems better. Cheers On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Objects serialized before foreachRDD/foreachPartition ?
Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:46 PM To: Evo Eftimov Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:14 PM To: Cody Koeninger Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com: Probably you should ALWAYS keep the RDD storage policy to MEMORY
RE: Value for SPARK_EXECUTOR_CORES
I don’t think the number of CPU cores controls the “number of parallel tasks”. The number of Tasks corresponds first and foremost to the number of (Dstream) RDD Partitions The Spark documentation doesn’t mention what is meant by “Task” in terms of Standard Multithreading Terminology ie a Thread or Process so your point is good Ps: time and time again every product and dev team and company invent their own terminology so 50% of the time using the product is spent on deciphering and reinventing the wheel From: Mulugeta Mammo [mailto:mulugeta.abe...@gmail.com] Sent: Thursday, May 28, 2015 7:24 PM To: Ruslan Dautkhanov Cc: user Subject: Re: Value for SPARK_EXECUTOR_CORES Thanks for the valuable information. The blog states: The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. So, I guess the max number of executor-cores I can assign is the CPU count (which includes the number of threads per core), not just the number of cores. I just want to be sure the cores term Spark is using. Thanks On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com wrote: It's not only about cores. Keep in mind spark.executor.cores also affects available memeory for each task: From http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively. I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run faster.. -- Ruslan Dautkhanov On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: My executor has the following spec (lscpu): CPU(s): 16 Core(s) per socket: 4 Socket(s): 2 Thread(s) per code: 2 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 2 = 4) ? Thanks
RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
@DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor etc. Obviously this will cause some temporary delay in fact interruption in your processing but if the business use case can tolerate that then go for it From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, May 28, 2015 12:36 PM To: dgoldenberg Cc: spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. Longer version. I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers is fixed for the lifetime of your DStream. What we do is to calculate the cluster capacity and use that as a fixed upper bound (with a margin) for the receiver throughput. There's work in progress to add a reactive model to the receiver, where backpressure can be applied to handle overload conditions. See https://issues.apache.org/jira/browse/SPARK-7398 Once the data is received, it will be processed in a 'classical' Spark pipeline, so previous posts on spark resource scheduling might apply. Regarding metrics, the standard metrics subsystem of spark will report streaming job performance. Check the driver's metrics endpoint to peruse the available metrics: driver:ui-port/metrics/json -kr, Gerard. (*) Spark is a project that moves so fast that statements might be invalidated by new work every minute. On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm trying to understand if there are design patterns for autoscaling Spark (add/remove slave machines to the cluster) based on the throughput. Assuming we can throttle Spark consumers, the respective Kafka topics we stream data from would start growing. What are some of the ways to generate the metrics on the number of new messages and the rate they are piling up? This perhaps is more of a Kafka question; I see a pretty sparse javadoc with the Metric interface and not much else... What are some of the ways to expand/contract the Spark cluster? Someone has mentioned Mesos... I see some info on Spark metrics in the Spark monitoring guide https://spark.apache.org/docs/latest/monitoring.html . Do we want to perhaps implement a custom sink that would help us autoscale up or down based on the throughput? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile div Original message /divdivFrom: Dmitry Goldenberg dgoldenberg...@gmail.com /divdivDate:2015/05/28 13:16 (GMT+00:00) /divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: Gerard Maas gerard.m...@gmail.com,spark users user@spark.apache.org /divdivSubject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? /divdiv /divThanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor etc. Obviously this will cause some temporary delay in fact interruption in your processing but if the business use case can tolerate that then go for it From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, May 28, 2015 12:36 PM To: dgoldenberg Cc: spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. Longer version. I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Thursday, May 28, 2015 2:34 PM To: Evo Eftimov Cc: Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described
RE: How does spark manage the memory of executor with multiple tasks
An Executor is a JVM instance spawned and running on a Cluster Node (Server machine). Task is essentially a JVM Thread – you can have as many Threads as you want per JVM. You will also hear about “Executor Slots” – these are essentially the CPU Cores available on the machine and granted for use to the Executor Ps: what creates ongoing confusion here is that the Spark folks have “invented” their own terms to describe the design of their what is essentially a Distributed OO Framework facilitating Parallel Programming and Data Management in a Distributed Environment, BUT have not provided clear dictionary/explanations linking these “inventions” with standard concepts familiar to every Java, Scala etc developer From: canan chen [mailto:ccn...@gmail.com] Sent: Tuesday, May 26, 2015 9:02 AM To: user@spark.apache.org Subject: How does spark manage the memory of executor with multiple tasks Since spark can run multiple tasks in one executor, so I am curious to know how does spark manage memory across these tasks. Say if one executor takes 1GB memory, then if this executor can run 10 tasks simultaneously, then each task can consume 100MB on average. Do I understand it correctly ? It doesn't make sense to me that spark run multiple tasks in one executor.
RE: How does spark manage the memory of executor with multiple tasks
This is the first time I hear that “one can specify the RAM per task” – the RAM is granted per Executor (JVM). On the other hand each Task operates on ONE RDD Partition – so you can say that this is “the RAM allocated to the Task to process” – but it is still within the boundaries allocated to the Executor (JVM) within which the Task is running. Also while running, any Task like any JVM Thread can request as much additional RAM e.g. for new Object instances as there is available in the Executor aka JVM Heap From: canan chen [mailto:ccn...@gmail.com] Sent: Tuesday, May 26, 2015 9:30 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: How does spark manage the memory of executor with multiple tasks Yes, I know that one task represent a JVM thread. This is what I confused. Usually users want to specify the memory on task level, so how can I do it if task if thread level and multiple tasks runs in the same executor. And even I don't know how many threads there will be. Besides that, if one task cause OOM, it would cause other tasks in the same executor fail too. There's no isolation between tasks. On Tue, May 26, 2015 at 4:15 PM, Evo Eftimov evo.efti...@isecc.com wrote: An Executor is a JVM instance spawned and running on a Cluster Node (Server machine). Task is essentially a JVM Thread – you can have as many Threads as you want per JVM. You will also hear about “Executor Slots” – these are essentially the CPU Cores available on the machine and granted for use to the Executor Ps: what creates ongoing confusion here is that the Spark folks have “invented” their own terms to describe the design of their what is essentially a Distributed OO Framework facilitating Parallel Programming and Data Management in a Distributed Environment, BUT have not provided clear dictionary/explanations linking these “inventions” with standard concepts familiar to every Java, Scala etc developer From: canan chen [mailto:ccn...@gmail.com] Sent: Tuesday, May 26, 2015 9:02 AM To: user@spark.apache.org Subject: How does spark manage the memory of executor with multiple tasks Since spark can run multiple tasks in one executor, so I am curious to know how does spark manage memory across these tasks. Say if one executor takes 1GB memory, then if this executor can run 10 tasks simultaneously, then each task can consume 100MB on average. Do I understand it correctly ? It doesn't make sense to me that spark run multiple tasks in one executor.
Re: How does spark manage the memory of executor with multiple tasks
the link you sent says multiple executors per node Worker is just demon process launching Executors / JVMs so it can execute tasks - it does that by cooperating with the master and the driver There is a one to one maping between Executor and JVM Sent from Samsung Mobile div Original message /divdivFrom: Arush Kharbanda ar...@sigmoidanalytics.com /divdivDate:2015/05/26 10:55 (GMT+00:00) /divdivTo: canan chen ccn...@gmail.com /divdivCc: Evo Eftimov evo.efti...@isecc.com,user@spark.apache.org /divdivSubject: Re: How does spark manage the memory of executor with multiple tasks /divdiv /divHi Evo, Worker is the JVM and an executor runs on the JVM. And after Spark 1.4 you would be able to run multiple executors on the same JVM/worker. https://issues.apache.org/jira/browse/SPARK-1706. Thanks Arush On Tue, May 26, 2015 at 2:54 PM, canan chen ccn...@gmail.com wrote: I think the concept of task in spark should be on the same level of task in MR. Usually in MR, we need to specify the memory the each mapper/reducer task. And I believe executor is not a user-facing concept, it's a spark internal concept. For spark users they don't need to know the concept of executor, but need to know the concept of task. On Tue, May 26, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote: This is the first time I hear that “one can specify the RAM per task” – the RAM is granted per Executor (JVM). On the other hand each Task operates on ONE RDD Partition – so you can say that this is “the RAM allocated to the Task to process” – but it is still within the boundaries allocated to the Executor (JVM) within which the Task is running. Also while running, any Task like any JVM Thread can request as much additional RAM e.g. for new Object instances as there is available in the Executor aka JVM Heap From: canan chen [mailto:ccn...@gmail.com] Sent: Tuesday, May 26, 2015 9:30 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: How does spark manage the memory of executor with multiple tasks Yes, I know that one task represent a JVM thread. This is what I confused. Usually users want to specify the memory on task level, so how can I do it if task if thread level and multiple tasks runs in the same executor. And even I don't know how many threads there will be. Besides that, if one task cause OOM, it would cause other tasks in the same executor fail too. There's no isolation between tasks. On Tue, May 26, 2015 at 4:15 PM, Evo Eftimov evo.efti...@isecc.com wrote: An Executor is a JVM instance spawned and running on a Cluster Node (Server machine). Task is essentially a JVM Thread – you can have as many Threads as you want per JVM. You will also hear about “Executor Slots” – these are essentially the CPU Cores available on the machine and granted for use to the Executor Ps: what creates ongoing confusion here is that the Spark folks have “invented” their own terms to describe the design of their what is essentially a Distributed OO Framework facilitating Parallel Programming and Data Management in a Distributed Environment, BUT have not provided clear dictionary/explanations linking these “inventions” with standard concepts familiar to every Java, Scala etc developer From: canan chen [mailto:ccn...@gmail.com] Sent: Tuesday, May 26, 2015 9:02 AM To: user@spark.apache.org Subject: How does spark manage the memory of executor with multiple tasks Since spark can run multiple tasks in one executor, so I am curious to know how does spark manage memory across these tasks. Say if one executor takes 1GB memory, then if this executor can run 10 tasks simultaneously, then each task can consume 100MB on average. Do I understand it correctly ? It doesn't make sense to me that spark run multiple tasks in one executor. -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
RE: Storing spark processed output to Database asynchronously.
If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new
RE: Storing spark processed output to Database asynchronously.
… and measure 4 is to implement a custom Feedback Loop to e.g.to monitor the amount of free RAM and number of queued jobs and automatically decrease the message consumption rate of the Receiver until the number of clogged RDDs and Jobs subsides (again here you artificially decrease your performance in the name of the reliability/integrity of your system ie not loosing messages) From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 9:39 PM To: 'Tathagata Das'; 'Gautam Bajaj' Cc: 'user' Subject: RE: Storing spark processed output to Database asynchronously. If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
A receiver occupies a cpu core, an executor is simply a jvm instance and as such it can be granted any number of cores and ram So check how many cores you have per executor Sent from Samsung Mobile div Original message /divdivFrom: Mike Trienis mike.trie...@orcsol.com /divdivDate:2015/05/22 21:51 (GMT+00:00) /divdivTo: user@spark.apache.org /divdivSubject: Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb) /divdiv /divI guess each receiver occupies a executor. So there was only one executor available for processing the job. On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have cluster of four nodes (three workers and one master, with one core each) which consumes data from Kinesis at 15 second intervals using two streams (i.e. receivers). The job simply grabs the latest batch and pushes it to MongoDB. I believe that the problem is that all tasks are executed on a single worker node and never distributed to the others. This is true even after I set the number of concurrentJobs to 3. Overall, I would really like to increase throughput (i.e. more than 500 records / second) and understand why all executors are not being utilized. Here are some parameters I have set: spark.streaming.blockInterval 200 spark.locality.wait 500 spark.streaming.concurrentJobs 3 This is the code that's actually doing the writing: def write(rdd: RDD[Data], time:Time) : Unit = { val result = doSomething(rdd, time) result.foreachPartition { i = i.foreach(record = connection.insert(record)) } } def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { rdd.flatMap(MyObject) } Any ideas as to how to improve the throughput? Thanks, Mike.
RE: Spark Streaming and Drools
OR you can run Drools in a Central Server Mode ie as a common/shared service, but that would slowdown your Spark Streaming job due to the remote network call which will have to be generated for every single message From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:22 AM To: 'Evo Eftimov'; 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools The only “tricky” bit would be when you want to manage/update the Rule Base in your Drools Engines already running as Singletons in Executor JVMs on Worker Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already loaded in Drools is not a problem. From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:20 AM To: 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
RE: Spark Streaming and Drools
I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
RE: Spark Streaming and Drools
You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
RE: Spark Streaming and Drools
The only “tricky” bit would be when you want to manage/update the Rule Base in your Drools Engines already running as Singletons in Executor JVMs on Worker Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already loaded in Drools is not a problem. From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:20 AM To: 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
RE: Intermittent difficulties for Worker to contact Master on same machine in standalone
Check whether the name can be resolved in the /etc/hosts file (or DNS) of the worker (the same btw applies for the Node where you run the driver app – all other nodes must be able to resolve its name) From: Stephen Boesch [mailto:java...@gmail.com] Sent: Wednesday, May 20, 2015 10:07 AM To: user Subject: Intermittent difficulties for Worker to contact Master on same machine in standalone What conditions would cause the following delays / failure for a standalone machine/cluster to have the Worker contact the Master? 15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081 15/05/20 02:02:53 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077 15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1) .. .. 15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3) 15/05/20 02:03:26 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
RE: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs
Is that a Spark or Spark Streaming application Re the map transformation which is required you can also try flatMap Finally an Executor is essentially a JVM spawn by a Spark Worker Node or YARN – giving 60GB RAM to a single JVM will certainly result in “off the charts” GC. I would suggest to experiment with the following two things: 1. Give less RAM to each Executor but have more Executor including more than one Executor per Node especially if the ratio RAM to CPU Cores is favorable 2. Use Memory Serialized RDDs – this will store them still in RAM but in Java Object Serialized form and Spark uses Tachion for that purpose – a distributed In Memory File System – and it is Off the JVM Heap and hence avoids GC From: Night Wolf [mailto:nightwolf...@gmail.com] Sent: Tuesday, May 19, 2015 9:36 AM To: user@spark.apache.org Subject: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs Hi all, I have a job that, for every row, creates about 20 new objects (i.e. RDD of 100 rows in = RDD 2000 rows out). The reason for this is each row is tagged with a list of the 'buckets' or 'windows' it belongs to. The actual data is about 10 billion rows. Each executor has 60GB of memory. Currently I have a mapPartitions task that is doing this object creation in a Scala Map and then returning the HashMap as an iterator via .toIterator. Is there a more efficient way to do this (assuming I can't use something like flatMap). The job runs (assuming each task size is small enough). But the GC time is understandably off the charts. I've reduced the spark cache memory percentage to 0.05 (as I just need space for a few broadcasts and this is a data churn task). I've left the shuffle memory percent unchanged. What kinds of settings should I be tuning with regards to GC? Looking at https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf slide 125 recommends some settings but I'm not sure what would be best here). I tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the executors die). Are there any tips with respect to the ratio of new gen and old gen space when creating lots of objects which will live in a data structure until the entire partition is processed? Any tips for tuning these kinds of jobs would be helpful! Thanks, ~N
RE: Spark Streaming and reducing latency
Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence
RE: Spark Streaming and reducing latency
Ooow – that is essentially the custom feedback loop mentioned in my previous emails in generic Architecture Terms and what you have done is only one of the possible implementations moreover based on Zookeeper – there are other possible designs not using things like zookeeper at all and hence achieving much lower latency and responsiveness Can I also give you a friendly advice – there is a long way FROM “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible and objective communication and facts From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:28 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU
RE: Spark Streaming and reducing latency
My pleasure young man, i will even go beynd the so called heads up and send you a solution design for Feedback Loop preventing spark streaming app clogging and resource depletion and featuring machine learning based self-tunning AND which is not zookeeper based and hence offers lower latency Ps: ultimately though remember that none of this stuff is part of spark streming as of yet Sent from Samsung Mobile div Original message /divdivFrom: Akhil Das ak...@sigmoidanalytics.com /divdivDate:2015/05/18 16:56 (GMT+00:00) /divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: user@spark.apache.org /divdivSubject: RE: Spark Streaming and reducing latency /divdiv /divThanks for the heads up mate. On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote: Ooow – that is essentially the custom feedback loop mentioned in my previous emails in generic Architecture Terms and what you have done is only one of the possible implementations moreover based on Zookeeper – there are other possible designs not using things like zookeeper at all and hence achieving much lower latency and responsiveness Can I also give you a friendly advice – there is a long way FROM “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible and objective communication and facts From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:28 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number
RE: Spark Streaming and reducing latency
You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming and reducing latency
And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing
RE: Spark Streaming and reducing latency
This is the nature of Spark Streaming as a System Architecture: 1. It is a batch processing system architecture (Spark Batch) optimized for Streaming Data 2. In terms of sources of Latency in such System Architecture, bear in mind that besides “batching”, there is also the Central “Driver” function/module, which is essentially a Central Job/Task Manager (ie running on a dedicated node, which doesn’t sit on the Path of the Messages), which even in a Streaming Data scenario, FOR EACH Streaming BATCH schedules tasks (as per the DAG for the streaming job), sends them to the workers, receives the results, then schedules and sends more tasks (as per the DAG for the job) and so on and so forth In terms of Parallel Programming Patterns/Architecture, the above is known as Data Parallel Architecture with Central Job/Task Manager. There are other alternatives for achieving lower latency and in terms of Parallel Programming Patterns they are known as Pipelines or Task Parallel Architecture – essentially every messages streams individually through an assembly line of Tasks. As the tasks can be run on multiple cores of one box or in a distributed environment. Storm for example implements this pattern or you can just put together your own solution From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Sunday, May 17, 2015 4:04 PM To: dgoldenberg Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
You can make ANY standard receiver sleep by implementing a custom Message Deserializer class with sleep method inside it. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Sunday, May 17, 2015 4:29 PM To: Haopu Wang Cc: user Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application? Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 15, 2015 6:45 PM To: Evo Eftimov Cc: user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
Ok thanks a lot for clarifying that – btw was your application a Spark Streaming App – I am also looking for confirmation that FAIR scheduling is supported for Spark Streaming Apps From: Richard Marscher [mailto:rmarsc...@localytics.com] Sent: Friday, May 15, 2015 7:20 PM To: Evo Eftimov Cc: Tathagata Das; user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 15, 2015 6:45 PM To: Evo Eftimov Cc: user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: swap tuple
Where is the “Tuple” supposed to be in String, String - you can refer to a “Tuple” if it was e.g. String, Tuple2String, String From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of Holden Karau Sent: Thursday, May 14, 2015 5:56 PM To: Yasemin Kaya Cc: user@spark.apache.org Subject: Re: swap tuple Can you paste your code? transformations return a new RDD rather than modifying an existing one, so if you were to swap the values of the tuple using a map you would get back a new RDD and then you would want to try and print this new RDD instead of the original one. On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.com wrote: Hi, I have JavaPairRDDString, String and I want to swap tuple._1() to tuple._2(). I use tuple.swap() but it can't be changed JavaPairRDD in real. When I print JavaPairRDD, the values are same. Anyone can help me for that? Thank you. Have nice day. yasemin -- hiç ender hiç -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
RE: SPARKTA: a real-time aggregation engine based on Spark Streaming
I do not intend to provide comments on the actual “product” since my time is engaged elsewhere My comments were on the “process” for commenting which looked as self-indulgent, self patting on the back communication (between members of the party and its party leader) – that bs used to be inherent to the “commercial” vendors, but I can confirm as fact it is also in effect to the “open source movement” (because human nature remains the same) From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:30 PM To: Paolo Platter Cc: Evo Eftimov; Matei Zaharia; user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thank you Paolo. Don't hesitate to contact us. Evo, we will be glad to hear from you and we are happy to see some kind of fast feedback from the main thought leader of spark, for sure. 2015-05-14 17:24 GMT+02:00 Paolo Platter paolo.plat...@agilelab.it: Nice Job! we are developing something very similar… I will contact you to understand if we can contribute to you with some piece ! Best Paolo Da: Evo Eftimov mailto:evo.efti...@isecc.com Data invio: giovedì 14 maggio 2015 17:21 A: 'David Morales' mailto:dmora...@stratio.com , Matei Zaharia mailto:matei.zaha...@gmail.com Cc: user@spark.apache.org That has been a really rapid “evaluation” of the “work” and its “direction” From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:12 PM To: Matei Zaharia Cc: user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd
RE: SPARKTA: a real-time aggregation engine based on Spark Streaming
That has been a really rapid “evaluation” of the “work” and its “direction” From: David Morales [mailto:dmora...@stratio.com] Sent: Thursday, May 14, 2015 4:12 PM To: Matei Zaharia Cc: user@spark.apache.org Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming Thanks for your kind words Matei, happy to see that our work is in the right way. 2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: (Sorry, for non-English people: that means it's a good thing.) Matei On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote: ...This is madness! On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote: Hi there, We have released our real-time aggregation engine based on Spark Streaming. SPARKTA is fully open source (Apache2) You can checkout the slides showed up at the Strata past week: http://www.slideshare.net/Stratio/strata-sparkta Source code: https://github.com/Stratio/sparkta And documentation http://docs.stratio.com/modules/sparkta/development/ We are open to your ideas and contributors are welcomed. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- David Morales de Frías :: +34 607 010 411 :: https://twitter.com/dmoralesdf @dmoralesdf http://www.stratio.com/ Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // https://twitter.com/StratioBD @stratiobd
RE: DStream Union vs. StreamingContext Union
I can confirm it does work in Java From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Tuesday, May 12, 2015 5:53 PM To: Evo Eftimov Cc: Saisai Shao; user@spark.apache.org Subject: Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=b343f6c5-5a2e-45fc-8317-54caf52e49ed ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=6d288bce-f90c-47b8-b786-1cc26adf5b93 On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Tuesday, May 12, 2015 5:37 PM To: Saisai Shao Cc: user@spark.apache.org Subject: Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=7c28f88f-f212-4811-a16e-e8b21035b172 ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=6cd729de-8339-40af-b2c5-b249011d6c3e ᐧ
RE: Spark streaming closes with Cassandra Conector
And one other suggestion in relation to the connection pool line of enquiry - check whether your cassandra service is configured to allow only one session per e.g. User I think the error is generated inside thr connection pool when it tries to initialize a connection after the first one Sent from Samsung Mobile div Original message /divdivFrom: Evo Eftimov evo.efti...@isecc.com /divdivDate:2015/05/10 12:02 (GMT+00:00) /divdivTo: 'Gerard Maas' gerard.m...@gmail.com /divdivCc: 'Sergio Jiménez Barrio' drarse.a...@gmail.com,'spark users' user@spark.apache.org /divdivSubject: RE: Spark streaming closes with Cassandra Conector /divdiv /divHmm there is also a Connection Pool involved and such things (especially while still rough on the edges) may behave erratically in a distributed multithreaded environment Can you try forEachPartition and foreach together – this will create a slightly different multithreading execution and distribution profile which may skip a potential error in the Connection Pool code From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:56 AM To: Evo Eftimov Cc: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector I'm familiar with the TableWriter code and that log only appears if the write actually succeeded. (See https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala) Thinking infrastructure, we see that it's always trying to reach 'localhost'. Are you running 1 node test in local mode? Otherwise, there's something wrong with the way you're configuring Cassandra or the connection to it (always tempted to say her :-) ). -kr, Gerard. On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote: I think the message that it has written 2 rows is misleading If you look further down you will see that it could not initialize a connection pool for Casandra (presumably while trying to write the previously mentioned 2 rows) Another confirmation of this hypothesis is the phrase “error during Transport Initialization” – so all these stuff points out in the direction of Infrastructure or Configuration issues – check you Casandra service and how you connect to it etc mate From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:33 AM To: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector It successfully writes some data and fails afterwards, like the host or connection goes down. Weird. Maybe you should post this question on the Spark-Cassandra connector group: https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user -kr, Gerard. On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: This is: 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s. 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 127.0.0.1 (datacenter1) 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042 com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: Closed channel) at com.datastax.driver.core.Connection.initializeTransport(Connection.java:186) at com.datastax.driver.core.Connection.init(Connection.java:116) at com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32) at com.datastax.driver.core.Connection$Factory.open(Connection.java:586) at com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74) at com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61) at com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224) at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469) at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144) at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562) at com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145
RE: Spark streaming closes with Cassandra Conector
And in case you are running in local mode try giving more cores to spark with e.g. [5] – low number could be interfering with the tuning params which you can try to play with as well – all this is in the context of how those params interact with the Connection Pool and what that pool is doing in terms of Multithreading https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Tuning The following properties set in SparkConf can be used to fine-tune the saving process, These values have been set to achieve stability and not performance. Changing these values may increase your performance based on your workload: * spark.cassandra.output.batch.size.rows: number of rows per single batch; default is 'auto' which means the connector will adjust the number of rows based on the amount of data in each row * spark.cassandra.output.batch.size.bytes: maximum total size of the batch in bytes; defaults to 1 kB. * spark.cassandra.output.batch.grouping.key: determines how insert statements are grouped into batches; available values are: * none: a batch may contain any statements * replica_set: a batch may contain only statements to be written to the same replica set * partition (default): a batch may contain only statements for rows sharing the same partition key value * spark.cassandra.output.batch.buffer.size: how many batches per single Spark task can be stored in memory before sending to Cassandra; default 1000 * spark.cassandra.output.concurrent.writes: maximum number of batches executed in parallel by a single Spark task; defaults to 5 * spark.cassandra.output.consistency.level: consistency level for writing; defaults to LOCAL_ONE. * spark.cassandra.output.throughput_mb_per_sec: maximum write throughput allowed per single core in MB/s limit this on long (+8 hour) runs to 70% of your max throughput as seen on a smaller job for stability From: Sergio Jiménez Barrio [mailto:drarse.a...@gmail.com] Sent: Sunday, May 10, 2015 12:59 PM To: Evo Eftimov Subject: Re: Spark streaming closes with Cassandra Conector How Can I see this? Thanks Evo 2015-05-10 13:36 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: And one other suggestion in relation to the connection pool line of enquiry - check whether your cassandra service is configured to allow only one session per e.g. User I think the error is generated inside thr connection pool when it tries to initialize a connection after the first one Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/10 12:02 (GMT+00:00) To: 'Gerard Maas' Cc: 'Sergio Jiménez Barrio' ,'spark users' Subject: RE: Spark streaming closes with Cassandra Conector Hmm there is also a Connection Pool involved and such things (especially while still rough on the edges) may behave erratically in a distributed multithreaded environment Can you try forEachPartition and foreach together – this will create a slightly different multithreading execution and distribution profile which may skip a potential error in the Connection Pool code From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:56 AM To: Evo Eftimov Cc: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector I'm familiar with the TableWriter code and that log only appears if the write actually succeeded. (See https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala) Thinking infrastructure, we see that it's always trying to reach 'localhost'. Are you running 1 node test in local mode? Otherwise, there's something wrong with the way you're configuring Cassandra or the connection to it (always tempted to say her :-) ). -kr, Gerard. On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote: I think the message that it has written 2 rows is misleading If you look further down you will see that it could not initialize a connection pool for Casandra (presumably while trying to write the previously mentioned 2 rows) Another confirmation of this hypothesis is the phrase “error during Transport Initialization” – so all these stuff points out in the direction of Infrastructure or Configuration issues – check you Casandra service and how you connect to it etc mate From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:33 AM To: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector It successfully writes some data and fails afterwards, like the host or connection goes down. Weird. Maybe you should post this question on the Spark-Cassandra connector group: https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user -kr
RE: Spark streaming closes with Cassandra Conector
I think the message that it has written 2 rows is misleading If you look further down you will see that it could not initialize a connection pool for Casandra (presumably while trying to write the previously mentioned 2 rows) Another confirmation of this hypothesis is the phrase “error during Transport Initialization” – so all these stuff points out in the direction of Infrastructure or Configuration issues – check you Casandra service and how you connect to it etc mate From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:33 AM To: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector It successfully writes some data and fails afterwards, like the host or connection goes down. Weird. Maybe you should post this question on the Spark-Cassandra connector group: https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user -kr, Gerard. On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: This is: 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s. 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 127.0.0.1 (datacenter1) 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042 com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: Closed channel) at com.datastax.driver.core.Connection.initializeTransport(Connection.java:186) at com.datastax.driver.core.Connection.init(Connection.java:116) at com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32) at com.datastax.driver.core.Connection$Factory.open(Connection.java:586) at com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74) at com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61) at com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224) at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469) at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144) at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562) at com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: Closed channel at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432) at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427) at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413) at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ... 3 more 15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds Thanks! 2015-05-10 0:58 GMT+02:00 Gerard Maas gerard.m...@gmail.com: Hola Sergio, It would help if you added the error message + stack trace. -kr, Gerard. On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio
RE: Spark streaming closes with Cassandra Conector
Hmm there is also a Connection Pool involved and such things (especially while still rough on the edges) may behave erratically in a distributed multithreaded environment Can you try forEachPartition and foreach together – this will create a slightly different multithreading execution and distribution profile which may skip a potential error in the Connection Pool code From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:56 AM To: Evo Eftimov Cc: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector I'm familiar with the TableWriter code and that log only appears if the write actually succeeded. (See https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala) Thinking infrastructure, we see that it's always trying to reach 'localhost'. Are you running 1 node test in local mode? Otherwise, there's something wrong with the way you're configuring Cassandra or the connection to it (always tempted to say her :-) ). -kr, Gerard. On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote: I think the message that it has written 2 rows is misleading If you look further down you will see that it could not initialize a connection pool for Casandra (presumably while trying to write the previously mentioned 2 rows) Another confirmation of this hypothesis is the phrase “error during Transport Initialization” – so all these stuff points out in the direction of Infrastructure or Configuration issues – check you Casandra service and how you connect to it etc mate From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Sunday, May 10, 2015 11:33 AM To: Sergio Jiménez Barrio; spark users Subject: Re: Spark streaming closes with Cassandra Conector It successfully writes some data and fails afterwards, like the host or connection goes down. Weird. Maybe you should post this question on the Spark-Cassandra connector group: https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user -kr, Gerard. On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: This is: 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s. 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 127.0.0.1 (datacenter1) 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042 com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: Closed channel) at com.datastax.driver.core.Connection.initializeTransport(Connection.java:186) at com.datastax.driver.core.Connection.init(Connection.java:116) at com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32) at com.datastax.driver.core.Connection$Factory.open(Connection.java:586) at com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74) at com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231) at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61) at com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224) at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469) at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144) at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562) at com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: Closed channel at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432) at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427) at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413) at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380
RE: Map one RDD into two RDD
Scala is a language, Spark is an OO/Functional, Distributed Framework facilitating Parallel Programming in a distributed environment Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark OO Framework – ie it is limited in terms of what it can achieve in terms of influencing the Spark Framework behavior – that is the nature of programming with/for frameworks When RDD1 and RDD2 are partitioned and different Actions applied to them this will result in Parallel Pipelines / DAGs within the Spark Framework RDD1 = RDD.filter() RDD2 = RDD.filter() From: Bill Q [mailto:bill.q@gmail.com] Sent: Thursday, May 7, 2015 4:55 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Map one RDD into two RDD Thanks for the replies. We decided to use concurrency in Scala to do the two mappings using the same source RDD in parallel. So far, it seems to be working. Any comments? On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote: RDD1 = RDD.filter() RDD2 = RDD.filter() From: Bill Q [mailto:bill.q@gmail.com javascript:_e(%7B%7D,'cvml','bill.q@gmail.com'); ] Sent: Tuesday, May 5, 2015 10:42 PM To: user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); Subject: Map one RDD into two RDD Hi all, I have a large RDD that I map a function to it. Based on the nature of each record in the input RDD, I will generate two types of data. I would like to save each type into its own RDD. But I can't seem to find an efficient way to do it. Any suggestions? Many thanks. Bill -- Many thanks. Bill -- Many thanks. Bill
RE: Creating topology in spark streaming
What is called Bolt in Storm is essentially a combination of [Transformation/Action and DStream RDD] in Spark – so to achieve a higher parallelism for specific Transformation/Action on specific Dstream RDD simply repartition it to the required number of partitions which directly relates to the corresponding number of Threads From: anshu shukla [mailto:anshushuk...@gmail.com] Sent: Wednesday, May 6, 2015 9:33 AM To: ayan guha Cc: user@spark.apache.org; d...@spark.apache.org Subject: Re: Creating topology in spark streaming But main problem is how to increase the level of parallelism for any particular bolt logic . suppose i want this type of topology . https://storm.apache.org/documentation/images/topology.png How we can manage it . On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote: Every transformation on a dstream will create another dstream. You may want to take a look at foreachrdd? Also, kindly share your code so people can help better On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote: Please help guys, Even After going through all the examples given i have not understood how to pass the D-streams from one bolt/logic to other (without writing it on HDFS etc.) just like emit function in storm . Suppose i have topology with 3 bolts(say) BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql database using spark SQL) Now since Sentiment analysis will take most of the time ,we have to increase its level of parallelism for tuning latency. Howe to increase the levele of parallelism since the logic of topology is not clear . -- Thanks Regards, Anshu Shukla Indian Institute of Sciences -- Thanks Regards, Anshu Shukla
RE: Map one RDD into two RDD
RDD1 = RDD.filter() RDD2 = RDD.filter() From: Bill Q [mailto:bill.q@gmail.com] Sent: Tuesday, May 5, 2015 10:42 PM To: user@spark.apache.org Subject: Map one RDD into two RDD Hi all, I have a large RDD that I map a function to it. Based on the nature of each record in the input RDD, I will generate two types of data. I would like to save each type into its own RDD. But I can't seem to find an efficient way to do it. Any suggestions? Many thanks. Bill -- Many thanks. Bill
RE: Creating topology in spark streaming
The “abstraction level” of Storm or shall we call it Architecture, is effectively Pipelines of Nodes/Agents – Pipelines is one of the standard Parallel Programming Patterns which you can use on multicore CPUs as well as Distributed Systems – the chaps from Storm simply implemented it as a reusable framework for distributed systems and offered it for free. Effectively it you have a set of independent Agents chained in a pipeline as the output from the previous Agent feeds into the Input of the next Agent Spark Streaming (which is essentially Batch Spark but with some optimizations for Streaming) on the other hand is more like a Map Reduce framework where you always have to have a Central Job/Task Manager scheduling and submitting tasks to remote distributed nodes, collecting the results / statuses and then scheduling and sending some more tasks and so on “Map Reduce” is simply another Parallel Programming pattern known as Data Parallelism or Data Parallel Programming. Although you can also have Data Parallelism without a Central Scheduler From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] Sent: Wednesday, May 6, 2015 11:20 AM To: Evo Eftimov Cc: anshu shukla; ayan guha; user@spark.apache.org Subject: Re: Creating topology in spark streaming Hi, I agree with Evo, Spark works at a different abstraction level than Storm, and there is not a direct translation from Storm topologies to Spark Streaming jobs. I think something remotely close is the notion of lineage of DStreams or RDDs, which is similar to a logical plan of an engine like Apache Pig. Here https://github.com/JerryLead/SparkInternals/blob/master/pdf/2-JobLogicalPlan.pdf is a diagram of a spark logical plan by a third party. I would suggest you reading the book Learning Spark https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/foreword01.html for more on this. But in general I think that Storm has an abstraction level closer to MapReduce, and Spark has an abstraction level closer to Pig, so the correspondence between Storm and Spark notions cannot be perfect. Greetings, Juan 2015-05-06 11:37 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: What is called Bolt in Storm is essentially a combination of [Transformation/Action and DStream RDD] in Spark – so to achieve a higher parallelism for specific Transformation/Action on specific Dstream RDD simply repartition it to the required number of partitions which directly relates to the corresponding number of Threads From: anshu shukla [mailto:anshushuk...@gmail.com] Sent: Wednesday, May 6, 2015 9:33 AM To: ayan guha Cc: user@spark.apache.org; d...@spark.apache.org Subject: Re: Creating topology in spark streaming But main problem is how to increase the level of parallelism for any particular bolt logic . suppose i want this type of topology . https://storm.apache.org/documentation/images/topology.png How we can manage it . On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote: Every transformation on a dstream will create another dstream. You may want to take a look at foreachrdd? Also, kindly share your code so people can help better On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote: Please help guys, Even After going through all the examples given i have not understood how to pass the D-streams from one bolt/logic to other (without writing it on HDFS etc.) just like emit function in storm . Suppose i have topology with 3 bolts(say) BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql database using spark SQL) Now since Sentiment analysis will take most of the time ,we have to increase its level of parallelism for tuning latency. Howe to increase the levele of parallelism since the logic of topology is not clear . -- Thanks Regards, Anshu Shukla Indian Institute of Sciences -- Thanks Regards, Anshu Shukla
RE: Receiver Fault Tolerance
This is about Kafka Receiver IF you are using Spark Streaming Ps: that book is now behind the curve in a quite a few areas since the release of 1.3.1 – read the documentation and forums From: James King [mailto:jakwebin...@gmail.com] Sent: Wednesday, May 6, 2015 1:09 PM To: user Subject: Receiver Fault Tolerance In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation It talks about 'Receiver Fault Tolerance' I'm unsure of what a Receiver is here, from reading it sounds like when you submit an application to the cluster in cluster mode i.e. --deploy-mode cluster the driver program will run on a Worker and this case this Worker is seen as a Receiver because it is consuming messages from the source. Is the above understanding correct? or is there more to it?
spark filestrea problem
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception
spark filestream problem
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem-tp22742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark filestream problem
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem-tp22743.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark filestream problem
I have figured it out in the meantime - simply when moving file on HDFS it preserves its time stamp and on the other hand the spark filestream adapter seems to care as much about filenames as timestamps - hence NEW files with OLD time stamps will NOT be processed - yuk The hack you can use is to a) copy the required file in a temp location and then b) move it from there to the dir monitored by spark filestream - this will ensure it is with recent timestamp -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Saturday, May 2, 2015 5:09 PM To: user@spark.apache.org Subject: spark filestream problem it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem -tp22743.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark filestream problem
I have figured it out in the meantime - simply when moving file on HDFS it preserves its time stamp and on the other hand the spark filestream adapter seems to care as much about filenames as timestamps - hence NEW files with OLD time stamps will NOT be processed - yuk The hack you can use is to a) copy the required file in a temp location and then b) move it from there to the dir monitored by spark filestream - this will ensure it is with recent timestamp -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Saturday, May 2, 2015 5:07 PM To: user@spark.apache.org Subject: spark filestream problem it seems that on Spark Streaming 1.2 the filestream API may have a bug - it doesn't detect new files when moving or renaming them on HDFS - only when copying them but that leads to a well known problem with .tmp files which get removed and make spark steraming filestream throw exception -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem -tp22742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Tasks run only on one machine
# of tasks = # of partitions, hence you can provide the desired number of partitions to the textFile API which should result a) in a better spatial distribution of the RDD b) each partition will be operated upon by a separate task You can provide the number of p -Original Message- From: Pat Ferrel [mailto:p...@occamsmachete.com] Sent: Thursday, April 23, 2015 5:51 PM To: user@spark.apache.org Subject: Tasks run only on one machine Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Slower performance when bigger memory?
You can resort to Serialized storage (still in memory) of your RDDs - this will obviate the need for GC since the RDD elements are stored as serialized objects off the JVM heap (most likely in Tachion which is distributed in memory files system used by Spark internally) Also review the Object Oriented Model of your RDD to see whether it consists of too many redundant objects and multiple levels of hierarchy - in high performance computing and distributed cluster object oriented frameworks like Spark some of the OO Patterns represent unnecessary burden .. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Thursday, April 23, 2015 6:14 PM To: user@spark.apache.org Subject: Slower performance when bigger memory? Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai
RE: Custom paritioning of DSTream
You can use transform which yields RDDs from the DStream as on each of the RDDs you can then apply partitionBy - transform also returns another DSTream while foreach doesn't Btw what do you mean re foreach killing the performance by not distributing the workload - every function (provided it is not Action) applied to an RDD within foreach is distributed across the cluster since it gets applied to an RDD From: davidkl [via Apache Spark User List] [mailto:ml-node+s1001560n22630...@n3.nabble.com] Sent: Thursday, April 23, 2015 10:13 AM To: Evo Eftimov Subject: Re: Custom paritioning of DSTream Hello Evo, Ranjitiyer, I am also looking for the same thing. Using foreach is not useful for me as processing the RDD as a whole won't be distributed across workers and that would kill performance in my application :-/ Let me know if you find a solution for this. Regards _ If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DS Tream-tp22574p22630.html To unsubscribe from Custom paritioning of DSTream, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=unsubscribe_by_codenode=22574code=ZXZvLmVmdGltb3ZAaXNlY2MuY29tfDIy NTc0fDY0MDQ0NDg5Ng== . http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml. namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem ail.naml NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22631.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: writing to hdfs on master node much faster
Check whether your partitioning results in balanced partitions ie partitions with similar sizes - one of the reasons for the performance differences observed by you may be that after your explicit repartitioning, the partition on your master node is much smaller than the RDD partitions on the other 2 nodes -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, April 20, 2015 12:57 PM To: jamborta Cc: user@spark.apache.org Subject: Re: writing to hdfs on master node much faster What machines are HDFS data nodes -- just your master? that would explain it. Otherwise, is it actually the write that's slow or is something else you're doing much faster on the master for other reasons maybe? like you're actually shipping data via the master first in some local computation? so the master's executor has the result much faster? On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote: Hi all, I have a three node cluster with identical hardware. I am trying a workflow where it reads data from hdfs, repartitions it and runs a few map operations then writes the results back to hdfs. It looks like that all the computation, including the repartitioning and the maps complete within similar time intervals on all the nodes, except when it writes it back to HDFS when the master node does the job way much faster then the slaves (15s for each block as opposed to 1.2 min for the slaves). Any suggestion what the reason might be? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on -master-node-much-faster-tp22570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Equal number of RDD Blocks
What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
RE: Equal number of RDD Blocks
And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards, Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
RE: Equal number of RDD Blocks
Well spark steraming is supposed to create / distribute the Receivers on different cluster nodes. If you are saying that actually your receivers are running on the same node probably that node is getting most of the data to minimize the network transfer costs If you want to distribute your data more evenly you can partition it explicitly Also contact Data Bricks why the Receivers are not being distributed on different cluster nodes From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:57 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks I also see that its creating both receivers on the same executor and that might be the cause of having more RDDs on executor than the other. Can I suggest spark to create each receiver on a each executor Regards, Laeeq On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote: And what is the message rate of each topic mate – that was the other part of the required clarifications From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] Sent: Monday, April 20, 2015 3:38 PM To: Evo Eftimov; user@spark.apache.org Subject: Re: Equal number of RDD Blocks Hi, I have two different topics and two Kafka receivers, one for each topic. Regards, Laeeq On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote: What is meant by “streams” here: 1. Two different DSTream Receivers producing two different DSTreams consuming from two different kafka topics, each with different message rate 2. One kafka topic (hence only one message rate to consider) but with two different DStream receivers (ie running in parallel) giving a start of two different DSTreams From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] Sent: Monday, April 20, 2015 3:15 PM To: user@spark.apache.org Subject: Equal number of RDD Blocks Hi, I have two streams of data from kafka. How can I make approx. equal number of RDD blocks of on two executors. Please see the attachement, one worker has 1785 RDD blocks and the other has 26. Regards, Laeeq
RE: Super slow caching in 1.3?
Now this is very important: “Normal RDDs” refers to “batch RDDs”. However the default in-memory Serialization of RDDs which are part of DSTream is “Srialized” rather than actual (hydrated) Objects. The Spark documentation states that “Serialization” is required for space and garbage collection efficiency (but creates higher CPU load) – which makes sense consider the large number of RDDs which get discarded in a streaming app So what does Data Bricks actually recommend as Object Oriented model for RDD elements used in Spark Streaming apps – flat or not and can you provide a detailed description / spec of both From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Thursday, April 16, 2015 7:23 PM To: Evo Eftimov Cc: Christian Perez; user Subject: Re: Super slow caching in 1.3? Here are the types that we specialize, other types will be much slower. This is only for Spark SQL, normal RDDs do not serialize data that is cached. I'll also not that until yesterday we were missing FloatType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154 Christian, can you provide the schema of the fast and slow datasets? On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote: Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom paritioning of DSTream
Is the only way to implement a custom partitioning of DStream via the foreach approach so to gain access to the actual RDDs comprising the DSTReam and hence their paritionBy method DSTReam has only a repartition method accepting only the number of partitions, BUT not the method of partitioning -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Can a map function return null
In fact you can return “NULL” from your initial map and hence not resort to OptionalString at all From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Sunday, April 19, 2015 9:48 PM To: 'Steve Lewis' Cc: 'Olivier Girardot'; 'user@spark.apache.org' Subject: RE: Can a map function return null Well you can do another map to turn OptionalString into String as in the cases when Optional is empty you can store e.g. “NULL” as the value of the RDD element If this is not acceptable (based on the objectives of your architecture) and IF when returning plain null instead of Optional does throw Spark exception THEN as far as I am concerned, chess-mate From: Steve Lewis [mailto:lordjoe2...@gmail.com] Sent: Sunday, April 19, 2015 8:16 PM To: Evo Eftimov Cc: Olivier Girardot; user@spark.apache.org Subject: Re: Can a map function return null So you imagine something like this: JavaRDDString words = ... JavaRDD OptionalString wordsFiltered = words.map(new FunctionString, OptionalString() { @Override public OptionalString call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD OptionalString which cannot be used as a JavaRDDString which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on item != null In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDDFoo original = ... JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() { @Override public IterableFoo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDDFoo words = original.map(new MapFunctionString, String() { @Override Foo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } }); -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
RE: Can a map function return null
Well you can do another map to turn OptionalString into String as in the cases when Optional is empty you can store e.g. “NULL” as the value of the RDD element If this is not acceptable (based on the objectives of your architecture) and IF when returning plain null instead of Optional does throw Spark exception THEN as far as I am concerned, chess-mate From: Steve Lewis [mailto:lordjoe2...@gmail.com] Sent: Sunday, April 19, 2015 8:16 PM To: Evo Eftimov Cc: Olivier Girardot; user@spark.apache.org Subject: Re: Can a map function return null So you imagine something like this: JavaRDDString words = ... JavaRDD OptionalString wordsFiltered = words.map(new FunctionString, OptionalString() { @Override public OptionalString call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD OptionalString which cannot be used as a JavaRDDString which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on item != null In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDDFoo original = ... JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() { @Override public IterableFoo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDDFoo words = original.map(new MapFunctionString, String() { @Override Foo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } }); -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
RE: How to do dispatching in Streaming?
Good use of analogies J Yep friction (or entropy in general) exists in everything – but hey by adding and doing “more work” at the same time (aka more powerful rockets) some people have overcome the friction of the air and even got as far as the moon and beyond It is all about the bottom lime / the big picture – in some models, friction can be a huge factor in the equations in some other it is just part of the landscape From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Friday, April 17, 2015 10:12 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? Evo, In Spark there's a fixed scheduling cost for each task, so more tasks mean an increased bottom line for the same amount of work being done. The number of tasks per batch interval should relate to the CPU resources available for the job following the same 'rule of thumbs' than for Spark, being 2-3 times the #of cores. In that physical model presented before, I think we could consider this scheduling cost as a form of friction. -kr, Gerard. On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com wrote: Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, April 16, 2015 10:41 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RE: General configurations on CDH5 to achieve maximum Spark Performance
And btw if you suspect this is a YARN issue you can always launch and use Spark in a Standalone Mode which uses its own embedded cluster resource manager - this is possible even when Spark has been deployed on CDH under YARN by the pre-canned install scripts of CDH To achieve that: 1. Launch spark in a standalone mode using its shell scripts - you may get some script errors initially because of some mess in the scripts created by the pre-canned CDH YARN install - which you can fix by editing the spark standalone scripts - the error messages will guide you 2. Submit a spark job to the standalone spark master rather than YARN and this is it 3. Measure and compare the performance under YARN, Spark Standalone on Cluster and Spark Standalone on a single machine Bear in mind that running Spark in Standalone mode while using YARN for all other apps would not be very appropriate in production because the two resource managers will be competing for cluster resources - but you can use this for performance tests From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 6:28 PM To: 'Manish Gupta 8'; 'user@spark.apache.org' Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Essentially to change the performance yield of software cluster infrastructure platform like spark you play with different permutations of: - Number of CPU cores used by Spark Executors on every cluster node - Amount of RAM allocated for each executor How disks and network IO is used also plays a role but that is influenced more by app algorithmic aspects rather than YARN / Spark cluster config (except rack awreness etc) When Spark runs under the management of YARN the above is controlled / allocated by YARN https://spark.apache.org/docs/latest/running-on-yarn.html From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:21 PM To: Evo Eftimov; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Thanks Evo. Yes, my concern is only regarding the infrastructure configurations. Basically, configuring Yarn (Node manager) + Spark is must and default setting never works. And what really happens, is we make changes as and when an issue is faced because of one of the numerous default configuration settings. And every time, we have to google a lot to decide on the right values J Again, my issue is very centric to running Spark on Yarn in CDH5 environment. If you know a link that talks about optimum configuration settings for running Spark on Yarn (CDH5), please share the same. Thanks, Manish From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 10:38 PM To: Manish Gupta 8; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient
RE: How to do dispatching in Streaming?
And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RE: How to do dispatching in Streaming?
Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, April 16, 2015 10:41 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RE: How to do dispatching in Streaming?
Also you can have each message type in a different topic (needs to be arranged upstream from your Spark Streaming app ie in the publishing systems and the messaging brokers) and then for each topic you can have a dedicated instance of InputReceiverDStream which will be the start of a dedicated DAG pipeline instance for every message type. Moreover each such DAG pipeline instance will run in parallel with the others From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RE: Data partitioning and node tracking in Spark-GraphX
How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to join RDD keyValuePairs efficiently
Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Data partitioning and node tracking in Spark-GraphX
Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:20 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.
RE: Data partitioning and node tracking in Spark-GraphX
Well you can have a two level index structure, still without any need for physical cluster node awareness Level 1 Index is the previously described partitioned [K,V] RDD – this gets you to the value (RDD element) you need on the respective cluster node Level 2 Index – it will be built and reside within the Value of each [K,V] RDD element – so after you retrieve the appropriate Element from the appropriate cluster node based on Level 1 Index, then you query the Value in the element based on Level 2 Index From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:32 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX Thanks a lot for the reply. Indeed it is useful but to be more precise i have 3D data and want to index it using octree. Thus i aim to build a two level indexing mechanism i.e. First at global level i want to partition and send the data to the nodes then at node level i again want to use octree to inded my data at local level. Could you please elaborate the solution in this context ? On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com wrote: Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:20 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content. -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.
RE: General configurations on CDH5 to achieve maximum Spark Performance
Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
RE: Super slow caching in 1.3?
Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: General configurations on CDH5 to achieve maximum Spark Performance
Essentially to change the performance yield of software cluster infrastructure platform like spark you play with different permutations of: - Number of CPU cores used by Spark Executors on every cluster node - Amount of RAM allocated for each executor How disks and network IO is used also plays a role but that is influenced more by app algorithmic aspects rather than YARN / Spark cluster config (except rack awreness etc) When Spark runs under the management of YARN the above is controlled / allocated by YARN https://spark.apache.org/docs/latest/running-on-yarn.html From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:21 PM To: Evo Eftimov; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Thanks Evo. Yes, my concern is only regarding the infrastructure configurations. Basically, configuring Yarn (Node manager) + Spark is must and default setting never works. And what really happens, is we make changes as and when an issue is faced because of one of the numerous default configuration settings. And every time, we have to google a lot to decide on the right values J Again, my issue is very centric to running Spark on Yarn in CDH5 environment. If you know a link that talks about optimum configuration settings for running Spark on Yarn (CDH5), please share the same. Thanks, Manish From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 10:38 PM To: Manish Gupta 8; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
RE: saveAsTextFile
The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
RE: saveAsTextFile
Nop Sir, it is possible - check my reply earlier -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 6:35 PM To: Vadim Bichutskiy Cc: user@spark.apache.org Subject: Re: saveAsTextFile You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Basically you need to unbundle the elements of the RDD and then store them wherever you want - Use foreacPartition and then foreach -Original Message- From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:39 PM To: Sean Owen Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Also to juggle even further the multithreading model of both spark and HDFS you can even publish the data from spark first to a message broker e.g. kafka from where a predetermined number (from 1 to infinity) of parallel consumers will retrieve and store in HDFS in one or more finely controlled files and directories From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:45 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
RE: How to join RDD keyValuePairs efficiently
You can use def partitionBy(partitioner: Partitioner): RDD[(K, V)] Return a copy of the RDD partitioned using the specified partitioner The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is something which adds valuable functionality to spark e.g. the point lookups PROVIDED it can be executed from within function running on worker executors Can somebody from DataBricks sched more light here -Original Message- From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Thursday, April 16, 2015 9:39 PM To: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Evo partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- T ususcib, -mil uerunubcrbesprkapch.og Fo adiioalcomads emal:usr...@sar.aace.rg - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org