Re: Launching Spark Cluster Application through IDE
From IntelliJ, you can use the remote debugging feature. http://stackoverflow.com/questions/19128264/how-to-remote-debug-in-intellij-12-1-4 For remote debugging, you need to pass the following: -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=4000,suspend=n jvm options and configure your ide on that given port (4000) for remote debugging. Thanks Best Regards On Fri, Mar 20, 2015 at 9:46 AM, raggy raghav0110...@gmail.com wrote: I am trying to debug a Spark Application on a cluster using a master and several worker nodes. I have been successful at setting up the master node and worker nodes using Spark standalone cluster manager. I downloaded the spark folder with binaries and use the following commands to setup worker and master nodes. These commands are executed from the spark directory. command for launching master ./sbin/start-master.sh command for launching worker node ./bin/spark-class org.apache.spark.deploy.worker.Worker master-URL command for submitting application ./sbin/spark-submit --class Application --master URL ~/app.jar Now, I would like to understand the flow of control through the Spark source code on the worker nodes when I submit my application(I just want to use one of the given examples that use reduce()). I am assuming I should setup Spark on Eclipse. The Eclipse setup link on the Apache Spark website seems to be broken. I would appreciate some guidance on setting up Spark and Eclipse to enable stepping through Spark source code on the worker nodes. If not Eclipse, I would be open to using some other IDE or approach that will enable me to step through Spark source code after launching my application. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Launching-Spark-Cluster-Application-through-IDE-tp22155.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Visualizing Spark Streaming data
I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Powered by Spark addition
Hello Matei, Could you please also add our company to the Powered By list? Details are as follows: Name: Act Now URL: www.actnowib.com Description: Sparks powers NOW APPS, a big data, real-time, predictive analytics platform. Using Spark SQL, MLlib and GraphX components for both batch ETL and analytics applied to telecommunication data, providing faster and more meaningful insights.and actionable data to the operators. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Powered-by-Spark-addition-tp7422p22161.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Are you submitting your application from local to a remote host? If you want to run the spark application from a remote machine, then you have to at least set the following configurations properly. - *spark.driver.host* - points to the ip/host from where you are submitting the job (make sure you are able to ping this from the cluster) - *spark.driver.port* - set it to a port number which is accessible from the spark cluster. You can look at more configuration options over here. http://spark.apache.org/docs/latest/configuration.html#networking Thanks Best Regards On Fri, Mar 20, 2015 at 4:02 AM, Eason Hu eas...@gmail.com wrote: Hi Akhil, Thank you for your help. I just found that the problem is related to my local spark application, since I ran it in IntelliJ and I didn't reload the project after I recompile the jar via maven. If I didn't reload, it will use some local cache data to run the application which leads to two different versions. After I reloaded the project and reran, it was running fine for v1.1.1 and I no longer saw that class incompatible issues. However, I now encounter a new issue starting from v1.2.0 and above. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, eason.hu); users with modify permissions: Set(hduser, eason.hu) 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started 15/03/19 01:10:18 INFO Remoting: Starting remoting 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@hduser-07:59122] 15/03/19 01:10:18 INFO Utils: Successfully started service 'driverPropsFetcher' on port 59122. 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://sparkDriver@192.168.1.53:65001]]. 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:144) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) ... 7 more Do you have any clues why it happens only after v1.2.0 and above? Nothing else changes. Thanks, Eason On Tue, Mar 17, 2015 at 8:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its clearly saying: java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 Version incompatibility, can you double check your version? On 18 Mar 2015 06:08, Eason Hu
Re: Visualizing Spark Streaming data
I'll stay with my recommendation - that's exactly what Kibana is made for ;) 2015-03-20 9:06 GMT+01:00 Harut Martirosyan harut.martiros...@gmail.com: Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut
Re: Visualizing Spark Streaming data
Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut
Measuer Bytes READ and Peak Memory Usage for Query
Hi All I would like to measure Bytes Read and Peak Memory Usage for a Spark SQL Query. Please clarify if Bytes Read = aggregate size of all RDDs ?? All my RDDs are in memory and 0B spill to disk. And I am clueless how to measure Peak Memory Usage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Measuer-Bytes-READ-and-Peak-Memory-Usage-for-Query-tp22159.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Measuer Bytes READ and Peak Memory Usage for Query
You could do a cache and see the memory usage under Storage tab in the driver UI (runs on port 4040) Thanks Best Regards On Fri, Mar 20, 2015 at 12:02 PM, anu anamika.guo...@gmail.com wrote: Hi All I would like to measure Bytes Read and Peak Memory Usage for a Spark SQL Query. Please clarify if Bytes Read = aggregate size of all RDDs ?? All my RDDs are in memory and 0B spill to disk. And I am clueless how to measure Peak Memory Usage. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Measuer-Bytes-READ-and-Peak-Memory-Usage-for-Query-tp22159.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Load balancing
1. If you are consuming data from Kafka or any other receiver based sources, then you can start 1-2 receivers per worker (assuming you'll have min 4 core per worker) 2. If you are having single receiver or is a fileStream then what you can do to distribute the data across machines is to do a repartition. Thanks Best Regards On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Re: Visualizing Spark Streaming data
Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit and multiple files
Hi Davies, I am already using --py-files. The system does use the other file. The error I am getting is not trivial. Please check the error log. On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com wrote: You could submit additional Python source via --py-files , for example: $ bin/spark-submit --py-files work.py main.py On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com wrote: Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.2. loses often all executors
Hi, I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it loses all executors whenever I have any Python code bug (like looking up a key in a dictionary that does not exist). In earlier versions, it would raise an exception but it would not lose all executors. Anybody with a similar problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Load balancing
Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming data from Kafka or any other receiver based sources, then you can start 1-2 receivers per worker (assuming you'll have min 4 core per worker) 2. If you are having single receiver or is a fileStream then what you can do to distribute the data across machines is to do a repartition. Thanks Best Regards On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Clean the shuffle data during iteration
Hello, Is that possible to delete shuffle data of previous iteration as it is not necessary? Alcaid
Re: MLlib Spam example gets stuck in Stage X
Hello Xiangrui, I use spark 1.2.0 on cdh 5.3. Thanks! -Su On Fri, Mar 20, 2015 at 2:27 PM Xiangrui Meng men...@gmail.com wrote: Su, which Spark version did you use? -Xiangrui On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das ak...@sigmoidanalytics.com wrote: To get these metrics out, you need to open the driver ui running on port 4040. And in there you will see Stages information and for each stage you can see how much time it is spending on GC etc. In your case, the parallelism seems 4, the more # of parallelism the more # of tasks you will see. Thanks Best Regards On Thu, Mar 19, 2015 at 1:15 PM, Su She suhsheka...@gmail.com wrote: Hi Akhil, 1) How could I see how much time it is spending on stage 1? Or what if, like above, it doesn't get past stage 1? 2) How could I check if its a GC time? and where would I increase the parallelism for the model? I have a Spark Master and 2 Workers running on CDH 5.3...what would the default spark-shell level of parallelism be...I thought it would be 3? Thank you for the help! -Su On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you see where exactly it is spending time? Like you said it goes to Stage 2, then you will be able to see how much time it spend on Stage 1. See if its a GC time, then try increasing the level of parallelism or repartition it like sc.getDefaultParallelism*3. Thanks Best Regards On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src /main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 Error: I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:== (1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su
Spark Streaming Not Reading Messages From Multiple Kafka Topics
Hi all, I'm building a Spark Streaming application that will continuously read multiple kafka topics at the same time. However, I found a weird issue that it reads only hundreds of messages then it stopped reading any more. If I changed the three topic to only one topic, then it is fine and it will continue to consume. Below is the code I have. val consumerThreadsPerInputDstream = 1 val topics = Map(raw_0 - consumerThreadsPerInputDstream) raw_1 - consumerThreadsPerInputDstream, raw_2 - consumerThreadsPerInputDstream) val msgs = KafkaUtils.createStream(ssc, 10.10.10.10:2181/hkafka, group01, topics).map(_._2) ... How come it will no longer consume after hundreds of messages for three topic reading? How to resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WebUI on yarn through ssh tunnel affected by AmIpfilter
Instead of opening a tunnel to the Spark web ui port, could you open a tunnel to the YARN RM web ui instead? That should allow you to navigate to the Spark application's web ui through the RM proxy, and hopefully that will work better. On Fri, Feb 6, 2015 at 9:08 PM, yangqch davidyang...@gmail.com wrote: Hi folks, I am new to spark. I just get spark 1.2 to run on emr ami 3.3.1 (hadoop 2.4). I ssh to emr master node and submit the job or start the shell. Everything runs well except the webUI. In order to see the UI, I used ssh tunnel which forward my dev machine port to emr master node webUI port. When I open the webUI, at the very beginning of the application (during the spark launch time), the webUI is as nice as shown in many spark docs. However, once the YARN AmIpfilter started to work, the webUI becomes very ugly. No pictures can be displayed, only text can be shown (just like you view it in lynx). Meanwhile, in spark shell, it pops up amfilter.AmIpFilter (AmIpFilter.java:doFilter(157)) - Could not find proxy-user cookie, so user will not be set”. Can anyone give me some help? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IPyhon notebook command for spark need to be updated?
Feel free to send a pull request to fix the doc (or say which versions it's needed in). Matei On Mar 20, 2015, at 6:49 PM, Krishna Sankar ksanka...@gmail.com wrote: Yep the command-option is gone. No big deal, just add the '%pylab inline' command as part of your notebook. Cheers k/ On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com mailto:yuecong1...@gmail.com wrote: Hello : I tried ipython notebook with the following command in my enviroment. PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark But it shows --pylab inline support is removed from ipython newest version. the log is as : --- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the command line has been removed. [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or `%matplotlib inline` in the notebook itself. -- I am using IPython 3.0.0. and only IPython works in my enviroment. -- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark -- Does somebody have the same issue as mine? How do you solve it? Thanks, Cong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: IPyhon notebook command for spark need to be updated?
Let me do it now. I appreciate the perfect easy understandable documentation of spark! The updated command will be like PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark When IPython notebook server is launched, you can create a new Python 2 notebook from Files tab. Inside the notebook, you can input the '%pylab inline' command as part of your notebook before you start to try spark from IPython notebook. Cheers. Cong 2015-03-20 16:14 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Feel free to send a pull request to fix the doc (or say which versions it's needed in). Matei On Mar 20, 2015, at 6:49 PM, Krishna Sankar ksanka...@gmail.com wrote: Yep the command-option is gone. No big deal, just add the '%pylab inline' command as part of your notebook. Cheers k/ On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com wrote: Hello : I tried ipython notebook with the following command in my enviroment. PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark But it shows --pylab inline support is removed from ipython newest version. the log is as : --- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the command line has been removed. [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or `%matplotlib inline` in the notebook itself. -- I am using IPython 3.0.0. and only IPython works in my enviroment. -- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark -- Does somebody have the same issue as mine? How do you solve it? Thanks, Cong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WebUI on yarn through ssh tunnel affected by AmIpfilter
I ran into a similar issue. What's happening is that when Spark is running in YARN client mode, YARN automatically launches a Web Application Proxy http://archive.cloudera.com/cdh5/cdh/5/hadoop/hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html to reduce hacking attempts. In doing so, it adds the AmIpFilter to the proxy. You can see this is the example log snippet below: 15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkyar...@ip-172-31-44-228.us-west-2.compute.internal:53028/user/YarnAM#-1897510590] 15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS - 172.31.36.22, PROXY_URI_BASES - http://172.31.36.22:9046/proxy/application_1426881405719_0009), /proxy/application_1426881405719_0009 15/03/20 21:33:14 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/03/20 21:33:15 INFO yarn.Client: Application report for application_1426881405719_0009 (state: RUNNING) 15/03/20 21:33:15 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: ip-172-31-44-228.us-west-2.compute.internal ApplicationMaster RPC port: 0 queue: default start time: 1426887190001 final status: UNDEFINED *tracking URL: http://172.31.36.22:9046/proxy/application_1426881405719_0009/* While I haven't found a way to disable it (the Spark doc http://spark.apache.org/docs/1.2.1/security.html may help), you can view the Web UI by forwarding the proxy's port (9046 by default). Then point your browser the the tracking URL with the host IP replaced with localhost, eg: http://localhost:9046/proxy/application_1426881405719_0009 Hope that helps. Ben -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540p22169.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Hi, Running Spark 1.3 with secured Hadoop. Spark-shell with Yarn client mode runs without issue when not using Dynamic Allocation. When Dynamic allocation is turned on, the shell comes up but same SQL etc. causes it to loop. spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=10 # Set IdleTime low for testing spark.dynamicAllocation.executorIdleTimeout=60 spark.shuffle.service.enabled=true Following is the start of the messages and then it keeps looping with Requesting 0 new executors 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100) 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1)
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Forgot to add - the cluster is idle otherwise so there should be no resource issues. Also the configuration works when not using Dynamic allocation. On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Running Spark 1.3 with secured Hadoop. Spark-shell with Yarn client mode runs without issue when not using Dynamic Allocation. When Dynamic allocation is turned on, the shell comes up but same SQL etc. causes it to loop. spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=1 spark.dynamicAllocation.maxExecutors=10 # Set IdleTime low for testing spark.dynamicAllocation.executorIdleTimeout=60 spark.shuffle.service.enabled=true Following is the start of the messages and then it keeps looping with Requesting 0 new executors 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100) 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1) 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new executor(s) because tasks are backlogged (new desired total will be 1)
Registring UDF from a different package fails
Hi All, I have all my UDFs defined in the classes residing in a different package than where I am instantiating my HiveContext. I have a register function in my UDF class. I pass HiveContext to this function. and in this function I call hiveContext.registerFunction(myudf, myudf _) All goes well but at the runtime when I execute query val sqlresult = hiveContext.sql(query) It doesn't work. sqlresult comes as null. There is no exception thrown by spark and there is no proper logs indicating the error. But all goes well if I bring my UDF class into the same package where I am instantiating hiveContext. I dig more into the spark code and found that (may be I am wrong here) ./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala has a class named as SimpleFunctionRegistry wherein lookupFunction is not throwing error if it doesn't find a function. Kindly help Appreciate Ravi.
about Partition Index
Dear all, About the index of each partition of an RDD, I am wondering whether we can keep their numbering on each physical machine in a hash partitioning process. For example, a cluster containing three physical machines A,B,C (all are workers), for an RDD with six partitions, assume that the two partitions with index 0 and 3 are in A, partitions with index 1 and 4 are in B and the ones with index 2 and 5 are in C. Then, if I hash partition the RDD using partitionBy(new HashPartitioner(6)), will the new created RDD still have the same partition index on each machine? Is it possible that the partitions with index 0 and 3 are now on B but not A? If it is, is there any method that we can use to keep both the RDDs having the same numbering on each physical machine? Thanks in advance. Long - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)
: 7E6F85873D69D14E, x-amz-id-2: rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=, x-amz-region: eu-central-1, Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT, Connection: close, Server: AmazonS3] 15/03/20 11:25:32 WARN RestStorageService: Retrying request after automatic adjustment of Host endpoint from frankfurt.ingestion.batch.s3.amazonaws.com to frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com following request signing error using AWS request signing version 4: GET https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/ HTTP/1.1 15/03/20 11:25:32 WARN RestStorageService: Retrying request following error response: GET '/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/' -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date: Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host: frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z, Authorization: AWS4-HMAC-SHA256 Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4], Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2: V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=, Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3] Exception in thread main org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz Do you have any Ideas? Was somebody of you already able to access S3 in Frankfurt, if so - how? Cheers Ralf
Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)
/HotelImageList.gz/delimiter=/' -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date: Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host: frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z, Authorization: AWS4-HMAC-SHA256 Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4], Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2: V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=, Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3] Exception in thread main org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz Do you have any Ideas? Was somebody of you already able to access S3 in Frankfurt, if so - how? Cheers Ralf
How to handle under-performing nodes in the cluster
Hi all, I have 6 nodes in the cluster and one of the nodes is clearly under-performing: I was wandering what is the impact of having such issues? Also what is the recommended way to workaround it? Thanks a lot, Yiannis
What is the jvm size when start spark-submit through local mode
Hi, I am curious, when I start a spark program in local mode, which parameter will be used to decide the jvm memory size for executor? In theory should be: --executor-memory 20G But I remember local mode will only start spark executor in the same process of driver, then should be: --driver-memory 20G Regards, Shuai
Buffering for Socket streams
Hi all, We are designing a workflow where we try to stream local files to a Socket streamer, that would clean and process the files and write them to hdfs. We have an issue with bigger files when the streamer cannot keep up with the data, and runs out of memory. What would be the best way to implement an approach where the Socket stream receiver would notify the stream not to send more data (stop reading from disk too?), just before it might run out of memory? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)
=/' -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date: Fri, 20 Mar 2015 11:25:31 GMT, Authorization: AWS XXX_MY_KEY_XXX:XXX_I_GUESS_SECRET_XXX], Response Headers: [x-amz-request-id: 7E6F85873D69D14E, x-amz-id-2: rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=, x-amz-region: eu-central-1, Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT, Connection: close, Server: AmazonS3] 15/03/20 11:25:32 WARN RestStorageService: Retrying request after automatic adjustment of Host endpoint from frankfurt.ingestion.batch.s3.amazonaws.com to frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com following request signing error using AWS request signing version 4: GET https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/ HTTP/1.1 15/03/20 11:25:32 WARN RestStorageService: Retrying request following error response: GET '/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/' -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date: Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host: frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z, Authorization: AWS4-HMAC-SHA256 Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4], Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2: V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=, Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3] Exception in thread main org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz Do you have any Ideas? Was somebody of you already able to access S3 in Frankfurt, if so - how? Cheers Ralf
Re: version conflict common-net
Anyone? or is this question nonsensical... and I am doing something fundamentally wrong? On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote: Hi Folks, I have a situation where I am getting a version conflict between java libraries that is used by my application and ones used by spark. Following are the details - I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net. In our spark application we use commons-net with version 3.3. However I found out that spark uses commons-net version 2.2. Hence when we try to submit our application using spark-submit, I end up getting, a NoSuchMethodError() Error starting receiver 5 - java.lang.NoSuchMethodError: org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V at ZipStream.onStart(ZipStream.java:55) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) . Now, if I change the commons-net version to 2.2, the job runs fine (expect for the fact that some of the features we use from the commons-net 3.3 are not there). How does one resolve such an issue where sparks uses one set of libraries and our user application requires the same set of libraries, but just a different version of it (In my case commons-net 2.2 vs 3.3). I see that there is a setting that I can supply - spark.files.userClassPathFirst, but the documentation says that it is experimental and for us this did not work at all. Thanks in advance. Regards, -Jacob
Re: Visualizing Spark Streaming data
Grafana allows pretty slick interactive use patterns, especially with graphite as the back-end. In a multi-user environment, why not have each user just build their own independent dashboards and name them under some simple naming convention? *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan harut.martiros...@gmail.com wrote: Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut
Re: About the env of Spark1.2
bq. Caused by: java.net.UnknownHostException: dhcp-10-35-14-100: Name or service not known Can you check your DNS ? Cheers On Fri, Mar 20, 2015 at 8:54 PM, tangzilu zilu.t...@hotmail.com wrote: Hi All: I recently started to deploy Spark1.2 in my VisualBox Linux. But when I run the command ./spark-shell in the path of /opt/spark-1.2.1/bin, I got the result like this: [root@dhcp-10-35-14-100 bin]# ./spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root 15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root 15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server 15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class server' on port 47691. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.1 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710) at org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702) at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702) at org.apache.spark.HttpServer.uri(HttpServer.scala:158) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Filesystem closed Exception
Hi, all: When I exit the console of spark-sql, the following exception throwed.. My spark version is 1.3.0, hadoop version is 2.2.0 Exception in thread Thread-3 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196) at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388) at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.stop(SparkContext.scala:1388) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
Filesystem closed Exception
Hi, all: When I exit the console of spark-sql, the following exception throwed.. Exception in thread Thread-3 java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196) at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388) at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkContext.stop(SparkContext.scala:1388) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
About the env of Spark1.2
Hi All: I recently started to deploy Spark1.2 in my VisualBox Linux. But when I run the command ./spark-shell in the path of /opt/spark-1.2.1/bin, I got the result like this: [root@dhcp-10-35-14-100 bin]# ./spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root 15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root 15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server 15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class server' on port 47691. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.1 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710) at org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702) at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702) at org.apache.spark.HttpServer.uri(HttpServer.scala:158) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by:
Re: Spark 1.2. loses often all executors
Isn't that a feature? Other than running a buggy pipeline, just kills all executors? You can always handle exceptions with proper try catch in your code though. Thanks Best Regards On Fri, Mar 20, 2015 at 3:51 PM, mrm ma...@skimlinks.com wrote: Hi, I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it loses all executors whenever I have any Python code bug (like looking up a key in a dictionary that does not exist). In earlier versions, it would raise an exception but it would not lose all executors. Anybody with a similar problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job History Server
Uh, does that mean HDP shipped Marcelo's uncommitted patch from SPARK-1537 anyway? Given the discussion there, that seems kinda aggressive. On Wed, Mar 18, 2015 at 8:49 AM, Marcelo Vanzin van...@cloudera.com wrote: Those classes are not part of standard Spark. You may want to contact Hortonworks directly if they're suggesting you use those. On Wed, Mar 18, 2015 at 3:30 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ShuffleBlockFetcherIterator: Failed to get block(s)
My job crashes with a bunch of these messages in the YARN logs. What are the appropriate steps in troubleshooting? 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 10 outstanding blocks (after 3 retries) 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from host:port
Re: Spark 1.2. loses often all executors
Maybe this is related to a bug in 1.2 [1], it's fixed in 1.2.2 (not released), could checkout the 1.2 branch and verify that? [1] https://issues.apache.org/jira/browse/SPARK-5788 On Fri, Mar 20, 2015 at 3:21 AM, mrm ma...@skimlinks.com wrote: Hi, I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it loses all executors whenever I have any Python code bug (like looking up a key in a dictionary that does not exist). In earlier versions, it would raise an exception but it would not lose all executors. Anybody with a similar problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit and multiple files
You MUST put --py-files BEFORE main.py, as mentioned in another threads. On Fri, Mar 20, 2015 at 1:47 AM, Guillaume Charhon guilla...@databerries.com wrote: Hi Davies, I am already using --py-files. The system does use the other file. The error I am getting is not trivial. Please check the error log. On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com wrote: You could submit additional Python source via --py-files , for example: $ bin/spark-submit --py-files work.py main.py On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com wrote: Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
spark.sql.shuffle.partitions only control the number of tasks in the second stage (the number of reducers). For your case, I'd say that the number of tasks in the first state (number of mappers) will be the number of files you have. Actually, have you changed spark.executor.memory (it controls the memory for an executor of your application)? I did not see it in your original email. The difference between worker memory and executor memory can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html), SPARK_WORKER_MEMORY Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's individual memory is configured using its spark.executor.memory property. On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Actually I realized that the correct way is: sqlContext.sql(set spark.sql.shuffle.partitions=1000) but I am still experiencing the same behavior/error. On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, the way I set the configuration is: val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.shuffle.partitions,1000); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
RE: Why I didn't see the benefits of using KryoSerializer
Hi, Imran: Thanks for your information. I found a benchmark online about serialization which compares Java vs Kryo vs gridgain at here: http://gridgain.blogspot.com/2012/12/java-serialization-good-fast-and-faster.html From my test result, in the above benchmark case for the SimpleObject, Kryo is slightly faster than Java serialization, but only use half of the space vs Java serialization. So now I understand more about what kind of benefits I should expect from using KryoSerializer. But I have some questions related to Spark SQL. If I use Spark SQL, should I expect less memory usage? I mean in Spark SQL, everything is controlled by Spark. If I pass in -Dspark.serializer=org.apache.spark.serializer.KryoSerializer and save the table in Cache, so it will use much less memory? Do I also need to specify StorageLevel.MEMORY_ONLY_SER if I want to use less memory? Where I can set that in Spark SQL? Thanks Yong From: iras...@cloudera.com Date: Fri, 20 Mar 2015 11:54:38 -0500 Subject: Re: Why I didn't see the benefits of using KryoSerializer To: java8...@hotmail.com CC: user@spark.apache.org Hi Yong, yes I think your analysis is correct. I'd imagine almost all serializers out there will just convert a string to its utf-8 representation. You might be interested in adding compression on top of a serializer, which would probably bring the string size down in almost all cases, but then you also need to take the time for compression. Kryo is generally more efficient than the java serializer on complicated object types. I guess I'm still a little surprised that kryo is slower than java serialization for you. You might try setting spark.kryo.referenceTracking to false if you are just serializing objects with no circular references. I think that will improve the performance a little, though I dunno how much. It might be worth running your experiments again with slightly more complicated objects and see what you observe. Imran On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote: I read the Spark code a little bit, trying to understand my own question. It looks like the different is really between org.apache.spark.serializer.JavaSerializer and org.apache.spark.serializer.KryoSerializer, both having the method named writeObject. In my test case, for each line of my text file, it is about 140 bytes of String. When either JavaSerializer.writeObject(140 bytes of String) or KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the underline OutputStream space usage. Does this mean that KryoSerializer really doesn't give us any benefit for String type? I understand that for primitives types, it shouldn't have any benefits, but how about String type? When we talk about lower the memory using KryoSerializer in spark, under what case it can bring significant benefits? It is my first experience with the KryoSerializer, so maybe I am total wrong about its usage. Thanks Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: Why I didn't see the benefits of using KryoSerializer Date: Tue, 17 Mar 2015 12:01:35 -0400 Hi, I am new to Spark. I tried to understand the memory benefits of using KryoSerializer. I have this one box standalone test environment, which is 24 cores with 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0. I put one text file in the hdfs about 1.2G. Here is the settings in the spark-env.sh export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=4g First test case:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)log.count()log.count() The data is about 3M rows. For the first test case, from the storage in the web UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 cached partitions.This matched with what I thought, and first count finished about 17s, and 2nd count finished about 6s. 2nd test case after restart the spark-shell:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached is 100% with 10 cached partitions. It looks like caching the default java serialized format reduce the memory usage, but coming with a cost that first count finished around 39s and 2nd count finished around 9s. So the job runs slower, with less memory usage. So far I can understand all what happened and the tradeoff. Now the problem comes with when I tried to test with KryoSerializer SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer /opt/spark/bin/spark-shellval log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() First, I saw that the new serializer setting passed in, as proven in the Spark
RE: Spark will process _temporary folder on S3 is very slow and always cause failure
Thanks! Let me update the status. I have copied the DirectOutputCommitter to my local. And set: Conf.set(spark.hadoop.mapred.output.committer.class, org..DirectOutputCommitter) It works perfectly. Thanks everyone J Regards, Shuai From: Aaron Davidson [mailto:ilike...@gmail.com] Sent: Tuesday, March 17, 2015 3:06 PM To: Imran Rashid Cc: Shuai Zheng; user@spark.apache.org Subject: Re: Spark will process _temporary folder on S3 is very slow and always cause failure Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be configured in Spark with: sparkConf.set(spark.hadoop.mapred.output.committer.class, classOf[DirectOutputCommitter].getName) On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote: I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
Re: RDD Blocks skewing to just few executors
Hm is data locality a factor here? I don't know. Just a side note: this doesn't cause OOM errors per se since the cache won't exceed the % of heap it's allowed. However that will hasten OOM problems due to tasks using too much memory, of course. The solution is to get more memory to the tasks or reduce their working set size. On Fri, Mar 20, 2015 at 12:32 PM, Alessandro Lulli lu...@di.unipi.it wrote: Hi All, I'm experiencing the same issue with Spark 120 (not verified with previous). Could you please help us on this? Thanks Alessandro On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote: Hi I'm running a standalone cluster with 8 worker servers. I'm developing a streaming app that is adding new lines of text to several different RDDs each batch interval. Each line has a well randomized unique identifier that I'm trying to use for partitioning, since the data stream does contain duplicates lines. I'm doing partitioning with this: val eventsByKey = streamRDD.map { event = (getUID(event), event)} val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq) .partitionBy(new HashPartitioner(numPartions)).map(e = e._2) I'm adding to the existing RDD like with this: val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) { (currentIter,batchIter) = val uniqEvents = ListBuffer[String]() val uids = Map[String,Boolean]() Array(currentIter, batchIter).foreach { iter = iter.foreach { event = val uid = getUID(event) if (!uids.contains(uid)) { uids(uid) = true uniqEvents +=event } } } uniqEvents.iterator } val count = mergedRDD.count The reason I'm doing it this way is that when I was doing: val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct val count = mergedRDD.count It would start taking a long time and a lot of shuffles. The zipPartitions approach does perform better, though after running an hour or so I start seeing this in the webUI. http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png As you can see most of the data is skewing to just 2 executors, with 1 getting more than half the Blocks. These become a hotspot and eventually I start seeing OOM errors. I've tried this a half a dozen times and the 'hot' executors changes, but not the skewing behavior. Any idea what is going on here? Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
can distinct transform applied on DStream?
val aDstream = ... val distinctStream = aDstream.transform(_.distinct()) but the elements in distinctStream are not distinct. Did I use it wrong?
Re: ShuffleBlockFetcherIterator: Failed to get block(s)
I think you should see some other errors before that, from NettyBlockTransferService, with a msg like Exception while beginning fetchBlocks. There might be a bit more information there. there are an assortment of possible causes, but first lets just make sure you have all the details from the original cause. On Fri, Mar 20, 2015 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com wrote: My job crashes with a bunch of these messages in the YARN logs. What are the appropriate steps in troubleshooting? 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 10 outstanding blocks (after 3 retries) 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from host:port
Re: RDD Blocks skewing to just few executors
Hi All, I'm experiencing the same issue with Spark 120 (not verified with previous). Could you please help us on this? Thanks Alessandro On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote: Hi I'm running a standalone cluster with 8 worker servers. I'm developing a streaming app that is adding new lines of text to several different RDDs each batch interval. Each line has a well randomized unique identifier that I'm trying to use for partitioning, since the data stream does contain duplicates lines. I'm doing partitioning with this: val eventsByKey = streamRDD.map { event = (getUID(event), event)} val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq) .partitionBy(new HashPartitioner(numPartions)).map(e = e._2) I'm adding to the existing RDD like with this: val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) { (currentIter,batchIter) = val uniqEvents = ListBuffer[String]() val uids = Map[String,Boolean]() Array(currentIter, batchIter).foreach { iter = iter.foreach { event = val uid = getUID(event) if (!uids.contains(uid)) { uids(uid) = true uniqEvents +=event } } } uniqEvents.iterator } val count = mergedRDD.count The reason I'm doing it this way is that when I was doing: val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct val count = mergedRDD.count It would start taking a long time and a lot of shuffles. The zipPartitions approach does perform better, though after running an hour or so I start seeing this in the webUI. http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png As you can see most of the data is skewing to just 2 executors, with 1 getting more than half the Blocks. These become a hotspot and eventually I start seeing OOM errors. I've tried this a half a dozen times and the 'hot' executors changes, but not the skewing behavior. Any idea what is going on here? Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job History Server
Hi Patcharee, It is an alpha feature in HDP distribution, integrating ATS with Spark history server. If you are using upstream, you can configure spark as regular without these configuration. But other related configuration are still mandatory, such as hdp.version related. Thanks. Zhan Zhang On Mar 18, 2015, at 3:30 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Visualizing Spark Streaming data
But it requires all possible combinations of your filters as separate metrics, moreover, it only can show time based information, you cannot group by say country. On 20 March 2015 at 19:09, Irfan Ahmad ir...@cloudphysics.com wrote: Grafana allows pretty slick interactive use patterns, especially with graphite as the back-end. In a multi-user environment, why not have each user just build their own independent dashboards and name them under some simple naming convention? *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan harut.martiros...@gmail.com wrote: Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut -- RGRDZ Harut
Re: version conflict common-net
It's not a crazy question, no. I'm having a bit of trouble figuring out what's happening. Commons Net 2.2 is what's used by Spark. The error appears to come from Spark. But the error is not finding a method that did not exist in 2.2. I am not sure what ZipStream is, for example. This could be a bizarre situation where classloader rules mean that part of 2.2 and part of 3.3 are being used. For example, let's say: - your receiver uses 3.3 classes that are only in 3.3, so they are found in your user classloader - 3.3 classes call some class that also existed in 2.2, but those are found in the Spark classloader. - 2.2 class doesn't have methods that 3.3 expects userClassPathFirst is often a remedy. There are several versions of this flag though. For example you need a different one if on YARN to have it take effect. It's worth ruling that out first. If all else fails you can shade 3.3. On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com wrote: Anyone? or is this question nonsensical... and I am doing something fundamentally wrong? On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote: Hi Folks, I have a situation where I am getting a version conflict between java libraries that is used by my application and ones used by spark. Following are the details - I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net. In our spark application we use commons-net with version 3.3. However I found out that spark uses commons-net version 2.2. Hence when we try to submit our application using spark-submit, I end up getting, a NoSuchMethodError() Error starting receiver 5 - java.lang.NoSuchMethodError: org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V at ZipStream.onStart(ZipStream.java:55) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) . Now, if I change the commons-net version to 2.2, the job runs fine (expect for the fact that some of the features we use from the commons-net 3.3 are not there). How does one resolve such an issue where sparks uses one set of libraries and our user application requires the same set of libraries, but just a different version of it (In my case commons-net 2.2 vs 3.3). I see that there is a setting that I can supply - spark.files.userClassPathFirst, but the documentation says that it is experimental and for us this did not work at all. Thanks in advance. Regards, -Jacob - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
Hi Yin, the way I set the configuration is: val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.shuffle.partitions,1000); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: Error communicating with MapOutputTracker
Hi Thomas, sorry for such a late reply. I don't have any super-useful advice, but this seems like something that is important to follow up on. to answer your immediate question, No, there should not be any hard limit to the number of tasks that MapOutputTracker can handle. Though of course as things get bigger, the overheads increase which is why you might hit timeouts. Two other minor suggestions: (1) increase spark.akka.askTimeout -- thats the timeout you are running into, it defaults to 30 seconds (2) as you've noted, you've needed to play w/ other timeouts b/c of long GC pauses -- its possible some GC tuning might help, though its a bit of a black art so its hard to say what you can try. You cold always try Concurrent Mark Swee to avoid the long pauses, but of course that will probably hurt overall performance. can you share any more details of what you are trying to do? Since you're fetching shuffle blocks in a shuffle map task, I guess you've got two shuffles back-to-back, eg. someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}. Do you expect to be doing a lot of GC in between the two shuffles?? -eg., in the little example I have, if there were lots of objects being created in the map filter steps that will make it out of the eden space. One possible solution to this would be to force the first shuffle to complete, before running any of the subsequent transformations, eg. by forcing materialization to the cache first val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK) intermediateRDD.count() // force the shuffle to complete, without trying to do our complicated downstream logic at the same time val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...} Also, can you share your data size? Do you expect the shuffle to be skewed, or do you think it will be well-balanced? Not that I'll have any suggestions for you based on the answer, but it may help us reproduce it and try to fix whatever the root cause is. thanks, Imran On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com wrote: I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO
Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded
I think you are running into a combo of https://issues.apache.org/jira/browse/SPARK-5928 and https://issues.apache.org/jira/browse/SPARK-5945 The standard solution is to just increase the number of partitions you are creating. textFile(), reduceByKey(), and sortByKey() all take an optional second argument, where you can specify the number of partitions you use. It looks its using spark.default.parallelism right now, which will be the number of cores in your cluster usually (not sure what that is in your case). The exception you gave shows your about 6x over the limit in at least this one case, so I'd start by with at least 10x the number of partitions you have now, and increase until it works (or you run into some other problem from too many partitions ...) I'd also strongly suggest doing the filter before you do the sortByKey -- no reason to force all that data if you're going to through a lot of it away. Its not completely clear where you are hitting the error now -- that alone. might even solve your problem. hope this helps, Imran On Thu, Mar 19, 2015 at 5:28 PM, roni roni.epi...@gmail.com wrote: I get 2 types of error - -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded Spar keeps re-trying to submit the code and keeps getting this error. My file on which I am finding the sliding window strings is 500 MB and I am doing it with length = 150. It woks fine till length is 100. This is my code - val hgfasta = sc.textFile(args(0)) // read the fasta file val kCount = hgfasta.flatMap(r = { r.sliding(args(2).toInt) }) val kmerCount = kCount.map(x = (x, 1)).reduceByKey(_ + _).map { case (x, y) = (y, x) }.sortByKey(false).map { case (i, j) = (j, i) } val filtered = kmerCount.filter(kv = kv._2 5) filtered.map(kv = kv._1 + , + kv._2.toLong).saveAsTextFile(args(1)) } It gets stuck and flat map and save as Text file Throws -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
Re: Visualizing Spark Streaming data
Hi Harut, Jeff's right that Kibana + Elasticsearch can take you quite far out of the box. Depending on your volume of data, you may only be able to keep recent data around though. Another option that is custom-built for handling many dimensions at query time (not as separate metrics) is Druid (http://druid.io/). It supports the Lambda architecture. It does real-time indexing from Kafka and after a configurable window, hands off shards to historical nodes. The historical shards can also be recomputed in batch mode to fixed up duplicates or late data. I wrote a plugin for Grafana that talks to Druid. It doesn't support all of Druid's rich query API but it can get you pretty far. https://github.com/Quantiply/grafana-plugins/ Cheers, Roger On Fri, Mar 20, 2015 at 9:11 AM, Harut Martirosyan harut.martiros...@gmail.com wrote: But it requires all possible combinations of your filters as separate metrics, moreover, it only can show time based information, you cannot group by say country. On 20 March 2015 at 19:09, Irfan Ahmad ir...@cloudphysics.com wrote: Grafana allows pretty slick interactive use patterns, especially with graphite as the back-end. In a multi-user environment, why not have each user just build their own independent dashboards and name them under some simple naming convention? *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan harut.martiros...@gmail.com wrote: Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut -- RGRDZ Harut
Re: DataFrame operation on parquet: GC overhead limit exceeded
Actually I realized that the correct way is: sqlContext.sql(set spark.sql.shuffle.partitions=1000) but I am still experiencing the same behavior/error. On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, the way I set the configuration is: val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.shuffle.partitions,1000); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: Why I didn't see the benefits of using KryoSerializer
Hi Yong, yes I think your analysis is correct. I'd imagine almost all serializers out there will just convert a string to its utf-8 representation. You might be interested in adding compression on top of a serializer, which would probably bring the string size down in almost all cases, but then you also need to take the time for compression. Kryo is generally more efficient than the java serializer on complicated object types. I guess I'm still a little surprised that kryo is slower than java serialization for you. You might try setting spark.kryo.referenceTracking to false if you are just serializing objects with no circular references. I think that will improve the performance a little, though I dunno how much. It might be worth running your experiments again with slightly more complicated objects and see what you observe. Imran On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote: I read the Spark code a little bit, trying to understand my own question. It looks like the different is really between org.apache.spark.serializer.JavaSerializer and org.apache.spark.serializer.KryoSerializer, both having the method named writeObject. In my test case, for each line of my text file, it is about 140 bytes of String. When either JavaSerializer.writeObject(140 bytes of String) or KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the underline OutputStream space usage. Does this mean that KryoSerializer really doesn't give us any benefit for String type? I understand that for primitives types, it shouldn't have any benefits, but how about String type? When we talk about lower the memory using KryoSerializer in spark, under what case it can bring significant benefits? It is my first experience with the KryoSerializer, so maybe I am total wrong about its usage. Thanks Yong -- From: java8...@hotmail.com To: user@spark.apache.org Subject: Why I didn't see the benefits of using KryoSerializer Date: Tue, 17 Mar 2015 12:01:35 -0400 Hi, I am new to Spark. I tried to understand the memory benefits of using KryoSerializer. I have this one box standalone test environment, which is 24 cores with 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0. I put one text file in the hdfs about 1.2G. Here is the settings in the spark-env.sh export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 export SPARK_WORKER_MEMORY=32g export SPARK_DRIVER_MEMORY=2g export SPARK_EXECUTOR_MEMORY=4g First test case: val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) log.count() log.count() The data is about 3M rows. For the first test case, from the storage in the web UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 cached partitions. This matched with what I thought, and first count finished about 17s, and 2nd count finished about 6s. 2nd test case after restart the spark-shell: val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) log.count() log.count() Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached is 100% with 10 cached partitions. It looks like caching the default java serialized format reduce the memory usage, but coming with a cost that first count finished around 39s and 2nd count finished around 9s. So the job runs slower, with less memory usage. So far I can understand all what happened and the tradeoff. Now the problem comes with when I tried to test with KryoSerializer SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer /opt/spark/bin/spark-shell val log=sc.textFile(hdfs://namenode:9000/test_1g/) log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) log.count() log.count() First, I saw that the new serializer setting passed in, as proven in the Spark Properties of Environment shows spark.driver.extraJavaOptions -Dspark.serializer=org.apache.spark.serializer.KryoSerializer . This is not there for first 2 test cases. But in the web UI of Storage, the Size in Memory is 1234M, with 100% Fraction Cached and 10 cached partitions. The first count took 46s and 2nd count took 23s. I don't get much less memory size as I expected, but longer run time for both counts. Anything I did wrong? Why the memory foot print of MEMORY_ONLY_SER for KryoSerializer still use the same size as default Java serializer, with worse duration? Thanks Yong
RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Below is the output: core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 1967947 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 2024 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 1967947 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited I have set the max open file to 2024 by ulimit -n 2024, but same issue I am not sure whether it is a reasonable setting. Actually I am doing a loop, each time try to sort only 3GB data, it runs very quick in first loop, and slow down in second loop. At each time loop I start and destroy the context (because I want to clean up the temp file create under tmp folder, which take a lot of space). Just default setting. My logic: For loop: Val sc = new sc Sql = sc.loadParquet Sortbykey Sc.stop End And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM). From: java8964 [mailto:java8...@hotmail.com] Sent: Friday, March 20, 2015 3:54 PM To: user@spark.apache.org Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Do you think the ulimit for the user running Spark on your nodes? Can you run ulimit -a under the user who is running spark on the executor node? Does the result make sense for the data you are trying to process? Yong _ From: szheng.c...@gmail.com To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Date: Fri, 20 Mar 2015 15:28:26 -0400 Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Re: Spark SQL UDT Kryo serialization, Unable to find class
You probably don't cause a shuffle (which requires serialization) unless there is a join or group by. It's possible that we are need to pass the spark class loader to kryo when creating a new instance (you can get it from Utils I believe). We never run Otto this problem since this API is not public yet. I'd start by looking in SparkSqlSerializer. On Mar 18, 2015 1:13 AM, Zia Ur Rehman Kayani zia.kay...@platalytics.com wrote: Thanks for your reply. I've tried this as well, by passing the JAR file path to *spark.executor.extraClassPath *but it doesn't help me out, actually I've figured it out that custom UDT works fine if I use only one RDD (table). the issue arises when we join two or more RDDs. According to this https://datastax-oss.atlassian.net/browse/SPARKC-23, its is a bug when we use custom ROW and use JOIN. But the solution proposed isn't working in my case. Any clue ? On Tue, Mar 17, 2015 at 10:19 PM, Michael Armbrust mich...@databricks.com wrote: I'll caution you that this is not a stable public API. That said, it seems that the issue is that you have not copied the jar file containing your class to all of the executors. You should not need to do any special configuration of serialization (you can't for SQL, as we hard code it for performance, since we generally know all the types that are going to be shipped) On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani zia.kay...@platalytics.com wrote: Hi, I want to introduce custom type for SchemaRDD, I'm following this https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala example. But I'm having Kryo Serialization issues, here is stack trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 22, localhost): *com.esotericsoftware.kryo.KryoException: Unable to find class: com.gis.io.GeometryWritable* Serialization trace: value (org.apache.spark.sql.catalyst.expressions.MutableAny) values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at
Spark per app logging
Hi, We have spark setup such that there are various users running multiple jobs at the same time. Currently all the logs go to 1 file specified in the log4j.properties. Is it possible to configure log4j in spark for per app/user logging instead of sending all logs to 1 file mentioned in the log4j.properties? Thanks Udit
Matching Spark application metrics data to App Id
Hi, I want to get telemetry metrics on spark apps activities, such as run time and jvm activities. Using Spark Metrics I am able to get the following sample data point on the an app: type=GAUGE, name=application.SparkSQL::headnode0.1426626495312.runtime_ms, value=414873 How can I match this datapoint to the AppId? (i.e. app-20150317210815-0001) Spark App name is not an unique identifier. 1426626495312 appear to be unique, but I am unable to see how this is related to the AppId. Thanks, Judy
Re: saveAsTable broken in v1.3 DataFrames?
Any other users interested in a feature DataFrame.saveAsExternalTable() for making _useful_ external tables in Hive, or am I the only one? Bueller? If I start a PR for this, will it be taken seriously? On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez christ...@svds.com wrote: Hi Yin, Thanks for the clarification. My first reaction is that if this is the intended behavior, it is a wasted opportunity. Why create a managed table in Hive that cannot be read from inside Hive? I think I understand now that you are essentially piggybacking on Hive's metastore to persist table info between/across sessions, but I imagine others might expect more (as I have.) We find ourselves wanting to do work in Spark and persist the results where other users (e.g. analysts using Tableau connected to Hive/Impala) can explore it. I imagine this is very common. I can, of course, save it as parquet and create an external table in hive (which I will do now), but saveAsTable seems much less useful to me now. Any other opinions? Cheers, C On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote: I meant table properties and serde properties are used to store metadata of a Spark SQL data source table. We do not set other fields like SerDe lib. For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table should not show unrelated stuff like Serde lib and InputFormat. I have created https://issues.apache.org/jira/browse/SPARK-6413 to track the improvement on the output of DESCRIBE statement. On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote: Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is not a Hive table, but a Spark SQL data source table (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Do you think the ulimit for the user running Spark on your nodes? Can you run ulimit -a under the user who is running spark on the executor node? Does the result make sense for the data you are trying to process? Yong From: szheng.c...@gmail.com To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1 Date: Fri, 20 Mar 2015 15:28:26 -0400 Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826 (Too many open files) And then I switch to:conf.set(spark.shuffle.consolidateFiles, true).set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don’t know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
EC2 cluster created by spark using old HDFS 1.0
Hi, I created a cluster using spark-ec2 script. But it installs HDFS version 1.0. I would like to use this cluster to connect to HIVE installed on a cloudera CDH 5.3 cluster. But I am getting the following error:- org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client vers ion 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8 9) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat. java:40) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511) at org.apache.spark.rdd.RDD.collect(RDD.scala:813) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC.init(console:38) at $iwC.init(console:40) at init(console:42) at
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6 b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782 6 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don't know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Mailing list schizophrenia?
I notice that some people send messages directly to user@spark.apache.org and some via nabble, either using email or the web client. There are two index sites, one directly at apache.org and one at nabble. But messages sent directly to user@spark.apache.org only show up in the apache list. Further, it appears that you can subscribe either directly to user@spark.apache.org, in which you see all emails, or via nabble and you see a subset. Is this correct and is it intentional? Apache site: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser Nabble site: http://apache-spark-user-list.1001560.n3.nabble.com/ An example of a message that only shows up in Apache: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E This message was sent both to Nabble and user@spark.apache.org to see how that behaves. Jim
Re: Mailing list schizophrenia?
Yes, it did get delivered to the apache list shown here: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E But the web site for spark community directs people to nabble for viewing messages and it doesn't show up there. Community page: http://spark.apache.org/community.html Link in that page to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/ The reliable archive: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser On Fri, Mar 20, 2015 at 12:34 PM, Ted Yu yuzhih...@gmail.com wrote: Jim: I can find the example message here: http://search-hadoop.com/m/JW1q5zP54J1 On Fri, Mar 20, 2015 at 12:29 PM, Jim Kleckner j...@cloudphysics.com wrote: I notice that some people send messages directly to user@spark.apache.org and some via nabble, either using email or the web client. There are two index sites, one directly at apache.org and one at nabble. But messages sent directly to user@spark.apache.org only show up in the apache list. Further, it appears that you can subscribe either directly to user@spark.apache.org, in which you see all emails, or via nabble and you see a subset. Is this correct and is it intentional? Apache site: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser Nabble site: http://apache-spark-user-list.1001560.n3.nabble.com/ An example of a message that only shows up in Apache: http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E This message was sent both to Nabble and user@spark.apache.org to see how that behaves. Jim
Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Assuming you are on Linux, what is your /etc/security/limits.conf set for nofile/soft (number of open file handles)? On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826 (Too many open files) And then I switch to: conf.set(spark.shuffle.consolidateFiles, true) .set(spark.shuffle.manager, SORT) Then I get the error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly know the first issue is because Spark shuffle creates too many local temp files (and I don’t know the solution, because looks like my solution also cause other issues), but I am not sure what means is the second error. Anyone knows the solution for both cases? Regards, Shuai
Create a Spark cluster with cloudera CDH 5.2 support
Hi, I am trying to create a Spark cluster using the spark-ec2 script which will support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by adding --hadoop-major-version=2.5.0 which solved some of the errors I was getting. But now when I run select query on hive I get the following error:- Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status Has anybody tried doing this? Is there a solution? I used this command to create my cluster:- ./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2 --zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2 --spark-version=1.3.0 --slaves=1 launch spark-cluster Thank You for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib Spam example gets stuck in Stage X
Su, which Spark version did you use? -Xiangrui On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das ak...@sigmoidanalytics.com wrote: To get these metrics out, you need to open the driver ui running on port 4040. And in there you will see Stages information and for each stage you can see how much time it is spending on GC etc. In your case, the parallelism seems 4, the more # of parallelism the more # of tasks you will see. Thanks Best Regards On Thu, Mar 19, 2015 at 1:15 PM, Su She suhsheka...@gmail.com wrote: Hi Akhil, 1) How could I see how much time it is spending on stage 1? Or what if, like above, it doesn't get past stage 1? 2) How could I check if its a GC time? and where would I increase the parallelism for the model? I have a Spark Master and 2 Workers running on CDH 5.3...what would the default spark-shell level of parallelism be...I thought it would be 3? Thank you for the help! -Su On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you see where exactly it is spending time? Like you said it goes to Stage 2, then you will be able to see how much time it spend on Stage 1. See if its a GC time, then try increasing the level of parallelism or repartition it like sc.getDefaultParallelism*3. Thanks Best Regards On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 Error: I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:==(1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create a Spark cluster with cloudera CDH 5.2 support
I think you missed -Phadoop-2.4 On Fri, Mar 20, 2015 at 5:27 PM, morfious902002 anubha...@gmail.com wrote: Hi, I am trying to create a Spark cluster using the spark-ec2 script which will support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by adding --hadoop-major-version=2.5.0 which solved some of the errors I was getting. But now when I run select query on hive I get the following error:- Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status Has anybody tried doing this? Is there a solution? I used this command to create my cluster:- ./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2 --zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2 --spark-version=1.3.0 --slaves=1 launch spark-cluster Thank You for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark per app logging
Are these jobs the same jobs, just run by different users or, different jobs ? If the latter, can each application use its own log4j.properties ? Cheers On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta ume...@groupon.com wrote: Hi, We have spark setup such that there are various users running multiple jobs at the same time. Currently all the logs go to 1 file specified in the log4j.properties. Is it possible to configure log4j in spark for per app/user logging instead of sending all logs to 1 file mentioned in the log4j.properties? Thanks Udit
Re: Error when using multiple python files spark-submit
I see. I will try the other way around. On Thu, Mar 19, 2015 at 8:06 PM, Davies Liu dav...@databricks.com wrote: the options of spark-submit should come before main.py, or they will become the options of main.py, so it should be: ../hadoop/spark-install/bin/spark-submit --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 main.py On Mon, Mar 16, 2015 at 4:11 AM, poiuytrez guilla...@databerries.com wrote: I have a spark app which is composed of multiple files. When I launch Spark using: ../hadoop/spark-install/bin/spark-submit main.py --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 I am getting an error: 15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0 (TID 5817) on executor spark-w-3.c.databerries.internal: org.apache.spark.api.python.PythonException (Traceback (most recent call last): File /home/hadoop/spark-install/python/pyspark/worker.py, line 90, in main command = pickleSer._read_with_length(infile) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 151, in _read_with_length return self.loads(obj) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 396, in loads return cPickle.loads(obj) ImportError: No module named naive It is weird because I do not serialize anything. naive.py is also available on every machine at the same path. Any insight on what could be going on? The issue does not happen on my laptop. PS : I am using Spark 1.2.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit and multiple files
I tried your program in yarn-client mode and it worked with no exception. This is the command I used: spark-submit --master yarn-client --py-files work.py main.py (Spark 1.2.1) On 20.3.2015. 9:47, Guillaume Charhon wrote: Hi Davies, I am already using --py-files. The system does use the other file. The error I am getting is not trivial. Please check the error log. On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com mailto:dav...@databricks.com wrote: You could submit additional Python source via --py-files , for example: $ bin/spark-submit --py-files work.py main.py On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com mailto:guilla...@databerries.com wrote: Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
How to check that a dataset is sorted after it has been written out?
Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in therdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the sameorder as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks!-Mike
IPyhon notebook command for spark need to be updated?
Hello : I tried ipython notebook with the following command in my enviroment. PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark But it shows --pylab inline support is removed from ipython newest version. the log is as : --- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the command line has been removed. [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or `%matplotlib inline` in the notebook itself. -- I am using IPython 3.0.0. and only IPython works in my enviroment. -- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark -- Does somebody have the same issue as mine? How do you solve it? Thanks, Cong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IPyhon notebook command for spark need to be updated?
Yep the command-option is gone. No big deal, just add the '%pylab inline' command as part of your notebook. Cheers k/ On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com wrote: Hello : I tried ipython notebook with the following command in my enviroment. PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark But it shows --pylab inline support is removed from ipython newest version. the log is as : --- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the command line has been removed. [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or `%matplotlib inline` in the notebook itself. -- I am using IPython 3.0.0. and only IPython works in my enviroment. -- $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook --pylab inline ./bin/pyspark -- Does somebody have the same issue as mine? How do you solve it? Thanks, Cong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org