Re: [Streaming] Configure executor logging on Mesos
So sounds like some generic downloadable uris support can solve this problem, that Mesos automatically places in your sandbox and you can refer to it. If so please file a jira and this is a pretty simple fix on the Spark side. Tim On Sat, May 30, 2015 at 7:34 AM, andy petrella andy.petre...@gmail.com wrote: Hello, I'm currently exploring DCOS for the spark notebook, and while looking at the spark configuration I found something interesting which is actually converging to what we've discovered: https://github.com/mesosphere/universe/blob/master/repo/packages/S/spark/0/marathon.json So the logging is working fine here because the spark package is using the spark-class which is able to configure the log4j file. But the interesting part comes with the fact that the `uris` parameter is filled in with a downloadable path to the log4j file! However, it's not possible when creating the spark context ourselfves and relying on the mesos sheduler backend only. Unles the spark.executor.uri (or a another one) can take more than one downloadable path. my.2¢ andy On Fri, May 29, 2015 at 5:09 PM Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, Thanks for the info. We (Andy Petrella and myself) have been diving a bit deeper into this log config: The log line I was referring to is this one (sorry, I provided the others just for context) *Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties* That line comes from Logging.scala [1] where a default config is loaded is none is found in the classpath upon the startup of the Spark Mesos executor in the Mesos sandbox. At that point in time, none of the application-specific resources have been shipped yet as the executor JVM is just starting up. To load a custom configuration file we should have it already on the sandbox before the executor JVM starts and add it to the classpath on the startup command. Is that correct? For the classpath customization, It looks like it should be possible to pass a -Dlog4j.configuration property by using the 'spark.executor.extraClassPath' that will be picked up at [2] and that should be added to the command that starts the executor JVM, but the resource must be already on the host before we can do that. Therefore we also need some means of 'shipping' the log4j.configuration file to the allocated executor. This all boils down to your statement on the need of shipping extra files to the sandbox. Bottom line: It's currently not possible to specify a config file for your mesos executor. (ours grows several GB/day). The only workaround I found so far is to open up the Spark assembly, replace the log4j-default.properties and pack it up again. That would work, although kind of rudimentary as we use the same assembly for many jobs. Probably, accessing the log4j API programmatically should also work (I didn't try that yet) Should we open a JIRA for this functionality? -kr, Gerard. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128 [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77 On Thu, May 28, 2015 at 7:50 PM, Tim Chen t...@mesosphere.io wrote: -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Thu, May 28, 2015 at 10:49 AM Subject: Re: [Streaming] Configure executor logging on Mesos To: Gerard Maas gerard.m...@gmail.com Hi Gerard, The log line you referred to is not Spark logging but Mesos own logging, which is using glog. Our own executor logs should only contain very few lines though. Most of the log lines you'll see is from Spark, and it can be controled by specifiying a log4j.properties to be downloaded with your Mesos task. Alternatively if you are downloading Spark executor via spark.executor.uri, you can include log4j.properties in that tar ball. I think we probably need some more configurations for Spark scheduler to pick up extra files to be downloaded into the sandbox. Tim On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I'm trying to control the verbosity of the logs on the Mesos executors with no luck so far. The default behaviour is INFO on stderr dump with an unbounded growth that gets too big at some point. I noticed that when the executor is instantiated, it locates a default log configuration in the spark assembly: I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave 20150528-063307-780930314-5050-8152-S5 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties So, no matter what I provide in my job jar files (or also tried with (spark.executor.extraClassPath=log4j.properties) takes effect in the executor's configuration. How should I configure the log on
Re: spark-sql errors
any ideas guys ? how to solve this ? From: Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID To: user user@spark.apache.org Sent: Friday, May 29, 2015 5:29 PM Subject: spark-sql errors https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/6SqGuYemnbc
Why is my performance on local really slow?
Hi, following my previous post http://apache-spark-user-list.1001560.n3.nabble.com/Help-optimizing-some-spark-code-tc23006.html I have been trying to find the best way to intersect an RDD of Longs (ids) with an RDD of (id, value) pairs such that i end up with just the values of the ids from the first rdd for example if i had rdd1 = [ 1L ] rdd2 = [ (1L, one), (2L, two), (3L, three) ] then i'd want to get the result rdd result = [ one ] ** note that rdd2 is far larger than rdd1 (millions vs thousands) what i have been doing is mapping the first rdd to a tuple like so rdd1.map(_ - ()) so now rdd1 = [ (1L, ()) ] and then i have 2 rdds of (Long, _) and i can use join operations, but the joins seemed really slow, and so i thought i'd try to optimize them by repartitioning the keys in advance to the same partitions, and also sorting them, but that didn't help (sorting helps a bit, repartitioning didn't seem to do much - im currently running locally) --- ** so then i thought i'd use broadcast variables, the first problem i encountered was, for a broadcast variable i need the variable to first be in the drivers memory, so i can use sc.broadcast(...) but the map that results from rdd2.collectAsMap() is too large for my memory, can i broadcast in parts, or better yet straight from an rdd *?* ** the second thing that i found problematic here was when i reduced the size of the rdd just for testing, and i sent it as a broadcast variable (as a map of id - value) then i did rdd1.map(broadcastVariable.value) to create the new rdd i wanted, it was very very significantly slower than if i just do that same action locally, even though all those actions should take place on the executioner (which is local) without any shuffling or anything like that since all partitions have the broadcast varialbe, so why is it so slow ? and what can i do about it :/ *?* I'd love to get any suggestions here! thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-my-performance-on-local-really-slow-tp23088.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
import CSV file using read.csv
Hi All , I need to read a csv file using read.csv function in igraph by python but i don't know where to put the data to be read by igraph and i don't know the exact syntax of importing a csv file into igraph so i would appreciate any help -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-CSV-file-using-read-csv-tp23089.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How Broadcast variable works
Can someone help take a look at my questions? Thanks. bit1...@163.com From: bit1...@163.com Date: 2015-05-29 18:57 To: user Subject: How Broadcast variable works Hi, I have a spark streaming application. SparkContext uses broadcast vriables to broadcast Configuration information that each task will be used I have following two questions: 1. Will the broadcast variable be broadcasted every time when driver sends tasks to workers in each batch interval? 2. If the above question is true, then if the broadcast variable is modified between the batch interval(The configuration information is updated over time) and Spark Context broadcasts it again, will tasks see the updated variable? Thanks. bit1...@163.com
Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
I hope they will come up with1.4 before spark summit in mid June On 31 May 2015 10:07, Joseph Bradley jos...@databricks.com wrote: Spark 1.4 should be available next month, but I'm not sure about the exact date. Your interpretation of high lambda is reasonable. High lambda is really data-dependent. lambda is the same as the regParam in Spark, available in all recent Spark versions. On Fri, May 29, 2015 at 5:35 AM, mélanie gallois melanie.galloi...@gmail.com wrote: When will Spark 1.4 be available exactly? To answer to Model selection can be achieved through high lambda resulting lots of zero in the coefficients : Do you mean that putting a high lambda as a parameter of the logistic regression keeps only a few significant variables and deletes the others with a zero in the coefficients? What is a high lambda for you? Is the lambda a parameter available in Spark 1.4 only or can I see it in Spark 1.3? 2015-05-23 0:04 GMT+02:00 Joseph Bradley jos...@databricks.com: If you want to select specific variable combinations by hand, then you will need to modify the dataset before passing it to the ML algorithm. The DataFrame API should make that easy to do. If you want to have an ML algorithm select variables automatically, then I would recommend using L1 regularization for now and possibly elastic net after 1.4 is release, per DB's suggestion. If you want detailed model statistics similar to what R provides, I've created a JIRA for discussing how we should add that functionality to MLlib. Those types of stats will be added incrementally, but feedback would be great for prioritization: https://issues.apache.org/jira/browse/SPARK-7674 To answer your question: How are the weights calculated: is there a correlation calculation with the variable of interest? -- Weights are calculated as with all logistic regression algorithms, by using convex optimization to minimize a regularized log loss. Good luck! Joseph On Fri, May 22, 2015 at 1:07 PM, DB Tsai dbt...@dbtsai.com wrote: In Spark 1.4, Logistic Regression with elasticNet is implemented in ML pipeline framework. Model selection can be achieved through high lambda resulting lots of zero in the coefficients. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 1:19 AM, SparknewUser melanie.galloi...@gmail.com wrote: I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Mélanie*
Re: How to get the best performance with LogisticRegressionWithSGD?
This is really getting into an understanding of how optimization and GLMs work. I'd recommend reading some intro ML or stats literature on how Generalized Linear Models are estimated, as well as how convex optimization is used in ML. There are some free online texts as well as MOOCs which have good intros. (There is also the upcoming ML with Spark MOOC!) On Fri, May 29, 2015 at 3:11 AM, SparknewUser melanie.galloi...@gmail.com wrote: I've tried several different couple of parameters for my LogisticRegressionWithSGD and here are my results. My numIterations varies from 100 to 500 by 50 and my stepSize varies from 0.1 to 1 by 0.1. My last line represents the maximum of each column and my last column the maximum of each line and we see a growth and diminution. What is the logic? My maximum is for the couple (numIter,StepSize)=(0.4,200) numIter/stepSize0,1 0,2 0,3 0,4 0,5 0,6 0,7 0,8 0,9 1 line max 1000,670,690,500,480,500,69 0,700,500,660,55 0,70 1500,500,510,500,500,500,50 0,530,500,530,68 0,68 2000,670,710,640,740,500,70 0,710,710,500,50 0,74 2500,500,500,550,500,500,50 0,730,550,500,50 0,73 3000,670,500,500,670,500,67 0,720,480,660,67 0,72 3500,710,600,660,500,510,50 0,660,620,660,71 0,71 4000,510,540,710,670,620,50 0,500,500,510,50 0,71 4500,510,500,500,510,500,50 0,660,510,500,50 0,66 5000,510,640,500,500,510,49 0,660,670,540,51 0,67 column max 0,71 0,710,710,740,620,700,73 0,710,660,71 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-best-performance-with-LogisticRegressionWithSGD-tp23053p23082.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.0 - 1.3.1 produces java.lang.NoSuchFieldError: NO_FILTER
I had the same issue on AWS EMR with Spark 1.3.1.e (AWS version) passed with '-h' parameter (it is bootstrap action parameter for spark). I don't see the problem with Spark 1.3.1.e not passing the parameter. I am not sure about your env. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-1-3-1-produces-java-lang-NoSuchFieldError-NO-FILTER-tp22897p23090.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.0 - 1.3.1 produces java.lang.NoSuchFieldError: NO_FILTER
Looks like your program somehow picked up a older version of parquet (spark 1.3.1 uses parquet 1.6.0rc3 and seems NO_FILTER field was introduced in 1.6.0rc2). Is it possible that you can check the parquet lib version in your classpath? Thanks, Yin On Sat, May 30, 2015 at 2:44 PM, ogoh oke...@gmail.com wrote: I had the same issue on AWS EMR with Spark 1.3.1.e (AWS version) passed with '-h' parameter (it is bootstrap action parameter for spark). I don't see the problem with Spark 1.3.1.e not passing the parameter. I am not sure about your env. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-0-1-3-1-produces-java-lang-NoSuchFieldError-NO-FILTER-tp22897p23090.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
Spark 1.4 should be available next month, but I'm not sure about the exact date. Your interpretation of high lambda is reasonable. High lambda is really data-dependent. lambda is the same as the regParam in Spark, available in all recent Spark versions. On Fri, May 29, 2015 at 5:35 AM, mélanie gallois melanie.galloi...@gmail.com wrote: When will Spark 1.4 be available exactly? To answer to Model selection can be achieved through high lambda resulting lots of zero in the coefficients : Do you mean that putting a high lambda as a parameter of the logistic regression keeps only a few significant variables and deletes the others with a zero in the coefficients? What is a high lambda for you? Is the lambda a parameter available in Spark 1.4 only or can I see it in Spark 1.3? 2015-05-23 0:04 GMT+02:00 Joseph Bradley jos...@databricks.com: If you want to select specific variable combinations by hand, then you will need to modify the dataset before passing it to the ML algorithm. The DataFrame API should make that easy to do. If you want to have an ML algorithm select variables automatically, then I would recommend using L1 regularization for now and possibly elastic net after 1.4 is release, per DB's suggestion. If you want detailed model statistics similar to what R provides, I've created a JIRA for discussing how we should add that functionality to MLlib. Those types of stats will be added incrementally, but feedback would be great for prioritization: https://issues.apache.org/jira/browse/SPARK-7674 To answer your question: How are the weights calculated: is there a correlation calculation with the variable of interest? -- Weights are calculated as with all logistic regression algorithms, by using convex optimization to minimize a regularized log loss. Good luck! Joseph On Fri, May 22, 2015 at 1:07 PM, DB Tsai dbt...@dbtsai.com wrote: In Spark 1.4, Logistic Regression with elasticNet is implemented in ML pipeline framework. Model selection can be achieved through high lambda resulting lots of zero in the coefficients. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 1:19 AM, SparknewUser melanie.galloi...@gmail.com wrote: I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Mélanie*
Re: Question regarding spark data partition and coalesce. Need info on my use case.
I had a similar requirement and I come up with a small algorithem to determine number of partitions based on cluster size and input data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Question-regarding-spark-data-partition-and-coalesce-Need-info-on-my-use-case-tp12214p23091.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How Broadcast variable works
1. No. thats the purpose of broadcast variable, ie not to be shipped with every task. From Documentation Broadcast Variables Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). 2. See above :) If you need what you are asking for, you are looking for closures. Best Ayan On Sat, May 30, 2015 at 4:11 PM, bit1...@163.com bit1...@163.com wrote: Can someone help take a look at my questions? Thanks. -- bit1...@163.com *From:* bit1...@163.com *Date:* 2015-05-29 18:57 *To:* user user@spark.apache.org *Subject:* How Broadcast variable works Hi, I have a spark streaming application. SparkContext uses broadcast vriables to broadcast Configuration information that each task will be used I have following two questions: 1. Will the broadcast variable be broadcasted every time when driver sends tasks to workers in each batch interval? 2. If the above question is true, then if the broadcast variable is modified between the batch interval(The configuration information is updated over time) and Spark Context broadcasts it again, will tasks see the updated variable? Thanks. -- bit1...@163.com -- Best Regards, Ayan Guha
Re: PySpark with OpenCV causes python worker to crash
Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
Re: PySpark with OpenCV causes python worker to crash
I've verified the issue lies within Spark running OpenCV code and not within the sequence file BytesWritable formatting. This is the code which can reproduce that spark is causing the failure by not using the sequencefile as input at all but running the same function with same input on spark but fails: def extract_sift_features_opencv(imgfile_imgbytes): imgfilename, discardsequencefile = imgfile_imgbytes imgbytes = bytearray(open(/tmp/img.jpg, rb).read()) nparr = np.fromstring(buffer(imgbytes), np.uint8) img = cv2.imdecode(nparr, 1) gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) sift = cv2.xfeatures2d.SIFT_create() kp, descriptors = sift.detectAndCompute(gray, None) return (imgfilename, test) And corresponding tests.py: https://gist.github.com/samos123/d383c26f6d47d34d32d6 On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com wrote: Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
Re: Batch aggregation by sliding window + join
yes, I see now. In a case of 3 days it's indeed possible, however if I want to hold 30 days(or even bigger) block aggregation it will be a bit slow. for the sake of the history: I've found several directions that I can improve shuffling(from video https://www.youtube.com/watch?v=Wg2boMqLjCg) e.g. since I don't have cached rdds, I can try to increase spark.shuffle.memoryFraction from default 0.2 to something bigger(even 0.6) as for my initial question - there is PR that tries to solve this issue(not yet merged though) https://github.com/apache/spark/pull/4449 which introduces custom RDD with custom InputFormat(based on HadoopRDD), I'll try to do something similar. anyway thanks for ideas and help! On 29 May 2015 at 18:01, ayan guha guha.a...@gmail.com wrote: My point is if you keep daily aggregates already computed then you do not reprocess raw data. But yuh you may decide to recompute last 3 days everyday. On 29 May 2015 23:52, Igor Berman igor.ber...@gmail.com wrote: Hi Ayan, thanks for the response I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only core, might be I should?) What do you mean by materialized? I can repartitionAndSort by key daily-aggregation, however I'm not quite understand how it will help with yesterdays block which needs to be loaded from file and it has no connection to this repartition of daily block. On 29 May 2015 at 01:51, ayan guha guha.a...@gmail.com wrote: Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Best Ayan On 29 May 2015 03:31, igor.berman igor.ber...@gmail.com wrote: Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task joins same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same partitioner(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Streaming] Configure executor logging on Mesos
Hello, I'm currently exploring DCOS for the spark notebook, and while looking at the spark configuration I found something interesting which is actually converging to what we've discovered: https://github.com/mesosphere/universe/blob/master/repo/packages/S/spark/0/marathon.json So the logging is working fine here because the spark package is using the spark-class which is able to configure the log4j file. But the interesting part comes with the fact that the `uris` parameter is filled in with a downloadable path to the log4j file! However, it's not possible when creating the spark context ourselfves and relying on the mesos sheduler backend only. Unles the spark.executor.uri (or a another one) can take more than one downloadable path. my.2¢ andy On Fri, May 29, 2015 at 5:09 PM Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, Thanks for the info. We (Andy Petrella and myself) have been diving a bit deeper into this log config: The log line I was referring to is this one (sorry, I provided the others just for context) *Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties* That line comes from Logging.scala [1] where a default config is loaded is none is found in the classpath upon the startup of the Spark Mesos executor in the Mesos sandbox. At that point in time, none of the application-specific resources have been shipped yet as the executor JVM is just starting up. To load a custom configuration file we should have it already on the sandbox before the executor JVM starts and add it to the classpath on the startup command. Is that correct? For the classpath customization, It looks like it should be possible to pass a -Dlog4j.configuration property by using the 'spark.executor.extraClassPath' that will be picked up at [2] and that should be added to the command that starts the executor JVM, but the resource must be already on the host before we can do that. Therefore we also need some means of 'shipping' the log4j.configuration file to the allocated executor. This all boils down to your statement on the need of shipping extra files to the sandbox. Bottom line: It's currently not possible to specify a config file for your mesos executor. (ours grows several GB/day). The only workaround I found so far is to open up the Spark assembly, replace the log4j-default.properties and pack it up again. That would work, although kind of rudimentary as we use the same assembly for many jobs. Probably, accessing the log4j API programmatically should also work (I didn't try that yet) Should we open a JIRA for this functionality? -kr, Gerard. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128 [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77 On Thu, May 28, 2015 at 7:50 PM, Tim Chen t...@mesosphere.io wrote: -- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Thu, May 28, 2015 at 10:49 AM Subject: Re: [Streaming] Configure executor logging on Mesos To: Gerard Maas gerard.m...@gmail.com Hi Gerard, The log line you referred to is not Spark logging but Mesos own logging, which is using glog. Our own executor logs should only contain very few lines though. Most of the log lines you'll see is from Spark, and it can be controled by specifiying a log4j.properties to be downloaded with your Mesos task. Alternatively if you are downloading Spark executor via spark.executor.uri, you can include log4j.properties in that tar ball. I think we probably need some more configurations for Spark scheduler to pick up extra files to be downloaded into the sandbox. Tim On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I'm trying to control the verbosity of the logs on the Mesos executors with no luck so far. The default behaviour is INFO on stderr dump with an unbounded growth that gets too big at some point. I noticed that when the executor is instantiated, it locates a default log configuration in the spark assembly: I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave 20150528-063307-780930314-5050-8152-S5 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties So, no matter what I provide in my job jar files (or also tried with (spark.executor.extraClassPath=log4j.properties) takes effect in the executor's configuration. How should I configure the log on the executors? thanks, Gerard.