Re: Spark serialization issues with third-party libraries
Hi You can see my code here . Its a POC to implement UIMA on spark https://bitbucket.org/SigmoidDev/uimaspark https://bitbucket.org/SigmoidDev/uimaspark/src/8476fdf16d84d0f517cce45a8bc1bd3410927464/UIMASpark/src/main/scala/ *UIMAProcessor.scala*?at=master this is the class where the major part of the integration happens. Thanks Arush On Sun, Nov 23, 2014 at 7:52 PM, jatinpreet jatinpr...@gmail.com wrote: Thanks Sean, I was actually using instances created elsewhere inside my RDD transformations which as I understand is against Spark programming model. I was referred to a talk about UIMA and Spark integration from this year's Spark summit, which had a workaround for this problem. I just had to make some class members transient. http://spark-summit.org/2014/talk/leveraging-uima-in-spark Thanks - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-serialization-issues-with-third-party-libraries-tp19454p19589.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Issues about running on client in standalone mode
Hi all:I deployed a spark client in my own machine. I put SPARK in path:` /home/somebody/spark`, and the cluster's worker spark home path is `/home/spark/spark` .While I launched the jar, it shows that: ` AppClient$ClientActor: Executor updated: app-20141124170955-11088/12 is now FAILED (java.io.IOException: Cannot run program /home/somebody/proc/spark_client/spark/bin/compute-classpath.sh (in directory .): error=2, No such file or directory)`. The worker should run /home/spark/spark/bin/compute-classpath.sh but not the client's compute-classpath.sh. It appears to be that I set some environment variables with the client path, but in fact, there is no spark-env.sh or spark-default.conf associated with my client spark path.Is there any hint? Thanks.
Fwd: 1gb file processing...task doesn't launch on all the node...Unseen exception
Hi, I tried with try catch blocks. Infact, inside mapPartitionsWithIndex, method is invoked which does the operation. I put the operations inside the function in try...catch block but thats of no use...still the error persists. Even I commented all the operations and a simple print statement inside the method is not executed. The data size is 542 MB. hdfs block size is 64 MB and it has got 9 blocks. I used a 2 node cluster with rep.factor 2. When is see the logs, it seemed to me like it tried to launch tasks on the other node ..but TaskSetManager has encountered Null pointer exception and the job is aborted. Is this the problem with mapPartitionWithIndex ? The same operations when performed with map transformation, it got executed with no issues. Please let me know if anyone has the same problem ? Thanks, Padma Ch On Fri, Nov 14, 2014 at 7:42 PM, Akhil [via Apache Spark User List] ml-node+s1001560n18936...@n3.nabble.com wrote: It shows nullPointerException, your data could be corrupted? Try putting a try catch inside the operation that you are doing, Are you running the worker process on the master node also? If not, then only 1 node will be doing the processing. If yes, then try setting the level of parallelism and number of partitions while creating/transforming the RDD. Thanks Best Regards On Fri, Nov 14, 2014 at 5:17 PM, Priya Ch [hidden email] http://user/SendEmail.jtp?type=nodenode=18936i=0 wrote: Hi All, We have set up 2 node cluster (NODE-DSRV05 and NODE-DSRV02) each is having 32gb RAM and 1 TB hard disk capacity and 8 cores of cpu. We have set up hdfs which has 2 TB capacity and the block size is 256 mb When we try to process 1 gb file on spark, we see the following exception 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes) 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes) 14/11/14 17:01:42 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes) 14/11/14 17:01:43 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@IMPETUS-DSRV02:41124/user/Executor#539551156] with ID 0 14/11/14 17:01:43 INFO storage.BlockManagerMasterActor: Registering block manager NODE-DSRV05.impetus.co.in:60432 with 2.1 GB RAM 14/11/14 17:01:43 INFO storage.BlockManagerMasterActor: Registering block manager NODE-DSRV02:47844 with 2.1 GB RAM 14/11/14 17:01:43 INFO network.ConnectionManager: Accepted connection from [NODE-DSRV05.impetus.co.in/192.168.145.195:51447] 14/11/14 17:01:43 INFO network.SendingConnection: Initiating connection to [NODE-DSRV05.impetus.co.in/192.168.145.195:60432] 14/11/14 17:01:43 INFO network.SendingConnection: Connected to [ NODE-DSRV05.impetus.co.in/192.168.145.195:60432], 1 messages pending 14/11/14 17:01:43 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on NODE-DSRV05.impetus.co.in:60432 (size: 17.1 KB, free: 2.1 GB) 14/11/14 17:01:43 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on NODE-DSRV05.impetus.co.in:60432 (size: 14.1 KB, free: 2.1 GB) 14/11/14 17:01:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, NODE-DSRV05.impetus.co.in): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 3, NODE-DSRV05.impetus.co.in, NODE_LOCAL, 1667 bytes) 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on executor NODE-DSRV05.impetus.co.in: java.lang.NullPointerException (null) [duplicate 1] 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2) on executor NODE-DSRV05.impetus.co.in: java.lang.NullPointerException (null) [duplicate 2] 14/11/14 17:01:44 INFO scheduler.TaskSetManager: Starting task 2.1 in stage 0.0 (TID 4,
Re: Issues about running on client in standalone mode
How are you submitting the job? Thanks Best Regards On Mon, Nov 24, 2014 at 3:02 PM, LinQili lin_q...@outlook.com wrote: Hi all: I deployed a spark client in my own machine. I put SPARK in path:` /home/somebody/spark`, and the cluster's worker spark home path is `/home/spark/spark` . While I launched the jar, it shows that: ` AppClient$ClientActor: Executor updated: app-20141124170955-11088/12 is now FAILED (java.io.IOException: Cannot run program /home/somebody/proc/spark_client/spark/bin/compute-classpath.sh (in directory .): error=2, No such file or directory)`. The worker should run /home/spark/spark/bin/compute-classpath.sh but not the client's compute-classpath.sh. It appears to be that I set some environment variables with the client path, but in fact, there is no spark-env.sh or spark-default.conf associated with my client spark path. Is there any hint? Thanks.
re: How to incrementally compile spark examples using mvn
Thank you, Marcelo and Sean, mvn install is a good answer for my demands. -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月21日 1:47 收件人: yiming zhang 抄送: Sean Owen; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn Hi Yiming, On Wed, Nov 19, 2014 at 5:35 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? As Sean suggest, you shouldn't need to install anything. After mvn install, your local repo is a working Spark installation, and you can use spark-submit and other tool directly within it. You just need to remember to rebuild the assembly/ project when modifying Spark code (or the examples/ project when modifying examples). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark serialization issues with third-party libraries
Thanks Arush! Your example is nice and easy to understand. I am implementing it through Java though. Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-serialization-issues-with-third-party-libraries-tp19454p19624.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Store kmeans model
Dear all, How can one save a kmeans model after training ? Best, Jao
Submit Spark driver on Yarn Cluster in client mode
Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
Re: Submit Spark driver on Yarn Cluster in client mode
You can export the hadoop configurations dir (export HADOOP_CONF_DIR=XXX) in the environment and then submit it like: ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 More details over here https://spark.apache.org/docs/1.1.0/submitting-applications.html#launching-applications-with-spark-submit Thanks Best Regards On Mon, Nov 24, 2014 at 4:01 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
issue while running the code in standalone mode: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Hi, When i trying to execute the program from my laptop by connecting to HDP environment (on which Spark also configured), i'm getting the warning (Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory) and Job is being terminated. My console has following log statements. Note: I could able to run the same client program by using spark-submit command. Whatever parameters i passed to spark-submit command, i passed the same to to SparkConf object. But still getting the same error. Any clue on this? 14/11/24 16:07:09 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[4] at map at JavaSchemaRDD.scala:42) 14/11/24 16:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/0 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/0 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/2 on worker-20141124021958-STI-SM-DEV-SYS4-51561 (STI-SM-DEV-SYS4:51561) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/2 on hostPort STI-SM-DEV-SYS4:51561 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/1 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/1 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/3 on worker-20141124022001-STI-SM-DEV-SYS5-50404 (STI-SM-DEV-SYS5:50404) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/3 on hostPort STI-SM-DEV-SYS5:50404 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/2 is now RUNNING 14/11/24 16:07:10 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now RUNNING 14/11/24 16:07:24 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:39 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:43 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now EXITED (Command exited with code 1) 14/11/24 16:07:43 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/3 removed: Command exited with code 1 Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-while-running-the-code-in-standalone-mode-Initial-job-has-not-accepted-any-resources-check-you-tp19628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submit Spark driver on Yarn Cluster in client mode
Hi Akhil, But driver and yarn both are in different networks, How to specify (export HADOOP_CONF_DIR=XXX) path. Like driver is from my windows machine and yarn is some unix machine on different network. -Naveen. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, November 24, 2014 4:08 PM To: Naveen Kumar Pokala Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Submit Spark driver on Yarn Cluster in client mode You can export the hadoop configurations dir (export HADOOP_CONF_DIR=XXX) in the environment and then submit it like: ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 More details over here https://spark.apache.org/docs/1.1.0/submitting-applications.html#launching-applications-with-spark-submit Thanks Best Regards On Mon, Nov 24, 2014 at 4:01 PM, Naveen Kumar Pokala npok...@spcapitaliq.commailto:npok...@spcapitaliq.com wrote: Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
Re: Submit Spark driver on Yarn Cluster in client mode
Not sure if it will work, but you can try creating a dummy hadoop conf directory and put those files (*-site.xml) files inside it and hopefully spark will pick it up and submit it on that remote cluster. (If there isn't any network/firewall issues). Thanks Best Regards On Mon, Nov 24, 2014 at 4:16 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi Akhil, But driver and yarn both are in different networks, How to specify (export HADOOP_CONF_DIR=XXX) path. Like driver is from my windows machine and yarn is some unix machine on different network. -Naveen. *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com ak...@sigmoidanalytics.com] *Sent:* Monday, November 24, 2014 4:08 PM *To:* Naveen Kumar Pokala *Cc:* user@spark.apache.org *Subject:* Re: Submit Spark driver on Yarn Cluster in client mode You can export the hadoop configurations dir (export HADOOP_CONF_DIR=XXX) in the environment and then submit it like: ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 More details over here https://spark.apache.org/docs/1.1.0/submitting-applications.html#launching-applications-with-spark-submit Thanks Best Regards On Mon, Nov 24, 2014 at 4:01 PM, Naveen Kumar Pokala npok...@spcapitaliq.com wrote: Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
Re: issue while running the code in standalone mode: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
This can happen mainly because of the following: - Wrong master url (Make sure you give the master url which is listed on top left corner of the webui - running on 8080) - Allocated more memory/cores while creating the sparkContext. Thanks Best Regards On Mon, Nov 24, 2014 at 4:13 PM, vdiwakar.malladi vdiwakar.mall...@gmail.com wrote: Hi, When i trying to execute the program from my laptop by connecting to HDP environment (on which Spark also configured), i'm getting the warning (Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory) and Job is being terminated. My console has following log statements. Note: I could able to run the same client program by using spark-submit command. Whatever parameters i passed to spark-submit command, i passed the same to to SparkConf object. But still getting the same error. Any clue on this? 14/11/24 16:07:09 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[4] at map at JavaSchemaRDD.scala:42) 14/11/24 16:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/0 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/0 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/2 on worker-20141124021958-STI-SM-DEV-SYS4-51561 (STI-SM-DEV-SYS4:51561) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/2 on hostPort STI-SM-DEV-SYS4:51561 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/1 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/1 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/3 on worker-20141124022001-STI-SM-DEV-SYS5-50404 (STI-SM-DEV-SYS5:50404) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/3 on hostPort STI-SM-DEV-SYS5:50404 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/2 is now RUNNING 14/11/24 16:07:10 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now RUNNING 14/11/24 16:07:24 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:39 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:43 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now EXITED (Command exited with code 1) 14/11/24 16:07:43 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/3 removed: Command exited with code 1 Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-while-running-the-code-in-standalone-mode-Initial-job-has-not-accepted-any-resources-check-you-tp19628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use case question
hi, We are building an analytics dashboard. Data will be updated every 5 minutes for now and eventually every 1 minute, maybe more frequent. The amount of data coming is not huge, per customer maybe 30 records per minute although we could have 500 customers. Is streaming correct for this I nstead of reading from multiple partitions for the incremental data?
Re: Use case question
Streaming would be easy to implement, all you have to do is to create the stream, do some transformation (depends on your usecase) and finally write it to your dashboards backend. What kind of dashboards are you building? For d3.js based ones, you can have websocket and write the stream output to the socket, for qlikView/tableau based ones you can push the stream to database. Thanks Best Regards On Mon, Nov 24, 2014 at 4:34 PM, Gordon Benjamin gordon.benjami...@gmail.com wrote: hi, We are building an analytics dashboard. Data will be updated every 5 minutes for now and eventually every 1 minute, maybe more frequent. The amount of data coming is not huge, per customer maybe 30 records per minute although we could have 500 customers. Is streaming correct for this I nstead of reading from multiple partitions for the incremental data?
Re: Use case question
Thanks. Yes d3 ones. Just to clarify--we could take our current system, which is incrementally adding partitions and overlay an Apache streaming layer to ingest these partitions? Then nightly, we could coalesce these partitions for example? I presume that while we are carrying out a coalesce, the end user would not lose access to the underlying data? Let me know of I'm off the mark here. On Monday, November 24, 2014, Akhil Das ak...@sigmoidanalytics.com wrote: Streaming would be easy to implement, all you have to do is to create the stream, do some transformation (depends on your usecase) and finally write it to your dashboards backend. What kind of dashboards are you building? For d3.js based ones, you can have websocket and write the stream output to the socket, for qlikView/tableau based ones you can push the stream to database. Thanks Best Regards On Mon, Nov 24, 2014 at 4:34 PM, Gordon Benjamin gordon.benjami...@gmail.com javascript:_e(%7B%7D,'cvml','gordon.benjami...@gmail.com'); wrote: hi, We are building an analytics dashboard. Data will be updated every 5 minutes for now and eventually every 1 minute, maybe more frequent. The amount of data coming is not huge, per customer maybe 30 records per minute although we could have 500 customers. Is streaming correct for this I nstead of reading from multiple partitions for the incremental data?
Re: issue while running the code in standalone mode: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Wouldn't it likely be the opposite? Too much memory / too many cores being requested relative to the resource that YARN makes available? On Nov 24, 2014 11:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: This can happen mainly because of the following: - Wrong master url (Make sure you give the master url which is listed on top left corner of the webui - running on 8080) - Allocated more memory/cores while creating the sparkContext. Thanks Best Regards On Mon, Nov 24, 2014 at 4:13 PM, vdiwakar.malladi vdiwakar.mall...@gmail.com wrote: Hi, When i trying to execute the program from my laptop by connecting to HDP environment (on which Spark also configured), i'm getting the warning (Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory) and Job is being terminated. My console has following log statements. Note: I could able to run the same client program by using spark-submit command. Whatever parameters i passed to spark-submit command, i passed the same to to SparkConf object. But still getting the same error. Any clue on this? 14/11/24 16:07:09 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[4] at map at JavaSchemaRDD.scala:42) 14/11/24 16:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/0 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/0 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/2 on worker-20141124021958-STI-SM-DEV-SYS4-51561 (STI-SM-DEV-SYS4:51561) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/2 on hostPort STI-SM-DEV-SYS4:51561 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/1 is now EXITED (Command exited with code 1) 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/1 removed: Command exited with code 1 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor added: app-20141124023636-0004/3 on worker-20141124022001-STI-SM-DEV-SYS5-50404 (STI-SM-DEV-SYS5:50404) with 4 cores 14/11/24 16:07:09 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141124023636-0004/3 on hostPort STI-SM-DEV-SYS5:50404 with 4 cores, 8.0 GB RAM 14/11/24 16:07:09 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/2 is now RUNNING 14/11/24 16:07:10 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now RUNNING 14/11/24 16:07:24 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:39 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/24 16:07:43 INFO client.AppClient$ClientActor: Executor updated: app-20141124023636-0004/3 is now EXITED (Command exited with code 1) 14/11/24 16:07:43 INFO cluster.SparkDeploySchedulerBackend: Executor app-20141124023636-0004/3 removed: Command exited with code 1 Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-while-running-the-code-in-standalone-mode-Initial-job-has-not-accepted-any-resources-check-you-tp19628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue while running the code in standalone mode: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Thanks for your response. I gave correct master url. Moreover as i mentioned in my post, i could able to run the sample program by using spark-submit. But it is not working when i'm running from my machine. Any clue on this? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-while-running-the-code-in-standalone-mode-Initial-job-has-not-accepted-any-resources-check-you-tp19628p19637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Programming Guide - registerTempTable Error
OK thank you very much for that! On 23 Nov 2014 21:49, Denny Lee [via Apache Spark User List] ml-node+s1001560n19598...@n3.nabble.com wrote: It sort of depends on your environment. If you are running on your local environment, I would just download the latest Spark 1.1 binaries and you'll be good to go. If its a production environment, it sort of depends on how you are setup (e.g. AWS, Cloudera, etc.) On Sun Nov 23 2014 at 11:27:49 AM riginos [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=0 wrote: That was the problem ! Thank you Denny for your fast response! Another quick question: Is there any way to update spark to 1.1.0 fast? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-SQL-Programming-Guide- registerTempTable-Error-tp19591p19595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Programming-Guide-registerTempTable-Error-tp19591p19598.html To unsubscribe from Spark SQL Programming Guide - registerTempTable Error, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=19591code=c2FtYXJhc3JpZ2lub3NAZ21haWwuY29tfDE5NTkxfDE4MjAzNjYzMjQ= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Programming-Guide-registerTempTable-Error-tp19591p19638.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Writing collection to file error
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: java.io.PrintWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at
Re: Use case question
I'm not quiet sure if i understood you correctly, but here's the thing, if you use sparkstreaming, it is more likely to refresh your dashboard for each batch. So for every batch your dashboard will be updated with the new data. And yes, the end use won't feel anything while you do the coalesce/repartition and all but after that your dashboards will be refreshed with new data. Thanks Best Regards On Mon, Nov 24, 2014 at 4:54 PM, Gordon Benjamin gordon.benjami...@gmail.com wrote: Thanks. Yes d3 ones. Just to clarify--we could take our current system, which is incrementally adding partitions and overlay an Apache streaming layer to ingest these partitions? Then nightly, we could coalesce these partitions for example? I presume that while we are carrying out a coalesce, the end user would not lose access to the underlying data? Let me know of I'm off the mark here. On Monday, November 24, 2014, Akhil Das ak...@sigmoidanalytics.com wrote: Streaming would be easy to implement, all you have to do is to create the stream, do some transformation (depends on your usecase) and finally write it to your dashboards backend. What kind of dashboards are you building? For d3.js based ones, you can have websocket and write the stream output to the socket, for qlikView/tableau based ones you can push the stream to database. Thanks Best Regards On Mon, Nov 24, 2014 at 4:34 PM, Gordon Benjamin gordon.benjami...@gmail.com wrote: hi, We are building an analytics dashboard. Data will be updated every 5 minutes for now and eventually every 1 minute, maybe more frequent. The amount of data coming is not huge, per customer maybe 30 records per minute although we could have 500 customers. Is streaming correct for this I nstead of reading from multiple partitions for the incremental data?
Re: Writing collection to file error
Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile(/path/CFOutput) Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal saurabh.agra...@markit.com wrote: import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at
Re: EC2 cluster with SSD ebs
Hi, I found that the ec2 script has been improved a lot. And the option ebs-vol-type is added to specify ebs type. However, it seems that the option does not work, the cmd I used is the following: $SPARK_HOME/ec2/spark-ec2 -k sparkcv -i spark.pem -m r3.4xlarge -s 3 -t r3.2xlarge --ebs-vol-type=gp2 --ebs-vol-size=200 --copy-aws-credentials launch spark-cluster When checking AWS EC2 console, I find 'standard' as the volume type. Any idea ? Thank you. =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-with-SSD-ebs-tp19474p19642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark broadcast error
I want to ran my spark program on a YARN cluster. But when I tested broadcast function in my program, I got an error. Exception in thread main org.apache.spark.SparkException: Error sending message as driverActor is null [message = UpdateBlockInfo(BlockManagerId(driver, in160-011.byted.org, 19704, 0),broadcast_0_piece0,StorageLevel(false, true, false, false, 1),61,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:167) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:218) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:58) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:310) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:286) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:769) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:84) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) my test code just is : val broadcastVar = sc.broadcast(Array(1, 2, 3)) I am new to spark, can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-broadcast-error-tp19643.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use case question
Great thanks On Monday, November 24, 2014, Akhil Das ak...@sigmoidanalytics.com wrote: I'm not quiet sure if i understood you correctly, but here's the thing, if you use sparkstreaming, it is more likely to refresh your dashboard for each batch. So for every batch your dashboard will be updated with the new data. And yes, the end use won't feel anything while you do the coalesce/repartition and all but after that your dashboards will be refreshed with new data. Thanks Best Regards On Mon, Nov 24, 2014 at 4:54 PM, Gordon Benjamin gordon.benjami...@gmail.com javascript:_e(%7B%7D,'cvml','gordon.benjami...@gmail.com'); wrote: Thanks. Yes d3 ones. Just to clarify--we could take our current system, which is incrementally adding partitions and overlay an Apache streaming layer to ingest these partitions? Then nightly, we could coalesce these partitions for example? I presume that while we are carrying out a coalesce, the end user would not lose access to the underlying data? Let me know of I'm off the mark here. On Monday, November 24, 2014, Akhil Das ak...@sigmoidanalytics.com javascript:_e(%7B%7D,'cvml','ak...@sigmoidanalytics.com'); wrote: Streaming would be easy to implement, all you have to do is to create the stream, do some transformation (depends on your usecase) and finally write it to your dashboards backend. What kind of dashboards are you building? For d3.js based ones, you can have websocket and write the stream output to the socket, for qlikView/tableau based ones you can push the stream to database. Thanks Best Regards On Mon, Nov 24, 2014 at 4:34 PM, Gordon Benjamin gordon.benjami...@gmail.com wrote: hi, We are building an analytics dashboard. Data will be updated every 5 minutes for now and eventually every 1 minute, maybe more frequent. The amount of data coming is not huge, per customer maybe 30 records per minute although we could have 500 customers. Is streaming correct for this I nstead of reading from multiple partitions for the incremental data?
ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly
Hello, I have a large data calculation in Spark, distributed across serveral nodes. In the end, I want to write to a single output file. For this I do: output.coalesce(1, false).saveAsTextFile(filename). What happens is all the data from the workers flows to a single worker, and that one writes the data. If the data is small enough, it all goes well. However, for a RDD from a certain size, I get a lot of the following messages (see below). From what I understand, ExternalAppendOnlyMap spills the data to disk when it can't hold it in memory. Is there a way to tell it to stream the data right to disk, instead of spilling each block slowly? 14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 22 ms 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 70 non-empty blocks out of 90 blocks 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (1 time so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (2 times so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 2 ms 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 15 MB to disk (1 time so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 16 MB to disk (2 times so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 14 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (33 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (34 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (35 times so far) 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (1 time so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (2 times so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 9 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (36 times so far) 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 11 MB to disk (37 times so far) 14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task 'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Spark Cassandra Guava version issues
I've got a Cassandra 2.1.1 + Spark 1.1.0 cluster running. I'm using sbt-assembly to create a uber jar to submit to the stand alone master. I'm using the hadoop 1 prebuilt binaries for Spark. As soon as I try to do sc.CassandraTable(...) I get an error that's likely to be a Guava versioning issue. I'm using the Spark Cassandra connector v 1.1.0-rc2 which just came out, though the issue was in rc1 as well. I can't see the cassandra connector using Guava directly, so I guess it's a dependency for some other thing that the cassandra spark connector is using. Does anybody have a workaround for this? The sbt file and the exception are given below. Regards, Ashic. sbt file: import sbt._ import Keys._ import sbtassembly.Plugin._ import AssemblyKeys._ assemblySettings name := foo version := 0.1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq ( org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.1.0-rc2 withSources() withJavadoc(), org.specs2 %% specs2 % 2.4 % test withSources() ) //allow provided for run run in Compile = Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)) mergeStrategy in assembly := { case PathList(META-INF, xs @ _*) = (xs map {_.toLowerCase}) match { case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: Nil) = MergeStrategy.discard case _ = MergeStrategy.discard } case _ = MergeStrategy.first } resolvers += Akka Repository at http://repo.akka.io/releases/; test in assembly := {} Exception: 14/11/24 14:20:11 INFO client.AppClient$ClientActor: Executor updated: app-20141124142008-0001/0 is now RUNNING Exception in thread main java.lang.NoSuchMethodError: com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set; at com.datastax.driver.core.Cluster$ConnectionReaper.init(Cluster.java:2065) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1163) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1110) at com.datastax.driver.core.Cluster.init(Cluster.java:118) at com.datastax.driver.core.Cluster.init(Cluster.java:105) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:174) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1075) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:81) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:165) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:227) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:226) at com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:266) at com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:263) at com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:292) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at
Re: Spark SQL with Apache Phoenix lower and upper Bound
Hi Alaa Ali, That's right, when using the PhoenixInputFormat, you can do simple 'WHERE' clauses and then perform any aggregate functions you'd like from within Spark. Any aggregations you run won't be quite as fast as running the native Spark queries, but once it's available as an RDD you can also do a lot more with it than just the Phoenix functions provide. I don't know if this works with PySpark or not, but assuming the 'newHadoopRDD' functionality works for other input formats, it should work for Phoenix as well. Josh On Fri, Nov 21, 2014 at 5:12 PM, Alaa Ali contact.a...@gmail.com wrote: Awesome, thanks Josh, I missed that previous post of yours! But your code snippet shows a select statement, so what I can do is just run a simple select with a where clause if I want to, and then run my data processing on the RDD to mimic the aggregation I want to do with SQL, right? Also, another question, I still haven't tried this out, but I'll actually be using this with PySpark, so I'm guessing the PhoenixPigConfiguration and newHadoopRDD can be defined in PySpark as well? Regards, Alaa Ali On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin jmaho...@interset.com wrote: Hi Alaa Ali, In order for Spark to split the JDBC query in parallel, it expects an upper and lower bound for your input data, as well as a number of partitions so that it can split the query across multiple tasks. For example, depending on your data distribution, you could set an upper and lower bound on your timestamp range, and spark should be able to create new sub-queries to split up the data. Another option is to load up the whole table using the PhoenixInputFormat as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate functions, but it does let you load up whole tables as RDDs. I've previously posted example code here: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E There's also an example library implementation here, although I haven't had a chance to test it yet: https://github.com/simplymeasured/phoenix-spark Josh On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali contact.a...@gmail.com wrote: I want to run queries on Apache Phoenix which has a JDBC driver. The query that I want to run is: select ts,ename from random_data_date limit 10 But I'm having issues with the JdbcRDD upper and lowerBound parameters (that I don't actually understand). Here's what I have so far: import org.apache.spark.rdd.JdbcRDD import java.sql.{Connection, DriverManager, ResultSet} val url=jdbc:phoenix:zookeeper val sql = select ts,ename from random_data_date limit ? val myRDD = new JdbcRDD(sc, () = DriverManager.getConnection(url), sql, 5, 10, 2, r = r.getString(ts) + , + r.getString(ename)) But this doesn't work because the sql expression that the JdbcRDD expects has to have two ?s to represent the lower and upper bound. How can I run my query through the JdbcRDD? Regards, Alaa Ali
Re: Spark SQL Programming Guide - registerTempTable Error
We keep conf as symbolic link so that upgrade is as simple as drop-in replacement On Monday, November 24, 2014, riginos samarasrigi...@gmail.com wrote: OK thank you very much for that! On 23 Nov 2014 21:49, Denny Lee [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=19638i=0 wrote: It sort of depends on your environment. If you are running on your local environment, I would just download the latest Spark 1.1 binaries and you'll be good to go. If its a production environment, it sort of depends on how you are setup (e.g. AWS, Cloudera, etc.) On Sun Nov 23 2014 at 11:27:49 AM riginos [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=0 wrote: That was the problem ! Thank you Denny for your fast response! Another quick question: Is there any way to update spark to 1.1.0 fast? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-SQL-Programming-Guide- registerTempTable-Error-tp19591p19595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19598i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Programming-Guide-registerTempTable-Error-tp19591p19598.html To unsubscribe from Spark SQL Programming Guide - registerTempTable Error, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Spark SQL Programming Guide - registerTempTable Error http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Programming-Guide-registerTempTable-Error-tp19591p19638.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- - Rishi
RE: ClassNotFoundException in standalone mode
I finally managed to get the example working, here are the details that may help other users. I have 2 windows nodes for the test system, PN01 and PN02. Both have the same shared drive S: (it is mapped to C:\source on PN02). If I run the worker and master from S:\spark-1.1.0-bin-hadoop2.4, then running simple test fails on the ClassNotFoundException (even if there is only one node which hosts both the master and the worker). If I run the workers and masters from the local drive (c:\source\spark-1.1.0-bin-hadoop2.4), then the simple test runs ok (with one or two nodes) I haven’t found why the class fails to load with the shared drive (I checked the permissions and they look ok) but at least the cluster is working now. If anyone has experience getting Spark with windows shared drive, any advice welcome ! Thanks, Benoit. PS: Yes thanks Angel, I did check that s:\spark\simple%JAVA_HOME%\bin\jar tvf s:\spark\simple\target\scala-2.10\simple-project_2.10-1.0.jar 299 Thu Nov 20 17:29:40 GMT 2014 META-INF/MANIFEST.MF 1070 Thu Nov 20 17:29:40 GMT 2014 SimpleApp$$anonfun$2.class 1350 Thu Nov 20 17:29:40 GMT 2014 SimpleApp$$anonfun$main$1.class 2581 Thu Nov 20 17:29:40 GMT 2014 SimpleApp$.class 1070 Thu Nov 20 17:29:40 GMT 2014 SimpleApp$$anonfun$1.class 710 Thu Nov 20 17:29:40 GMT 2014 SimpleApp.class From: angel2014 [mailto:angel.alvarez.pas...@gmail.com] Sent: Friday, November 21, 2014 3:16 AM To: u...@spark.incubator.apache.org Subject: Re: ClassNotFoundException in standalone mode Can you make sure the class SimpleApp$$anonfun$1 is included in your app jar? 2014-11-20 18:19 GMT+01:00 Benoit Pasquereau [via Apache Spark User List] [hidden email]/user/SendEmail.jtp?type=nodenode=19443i=0: Hi Guys, I’m having an issue in standalone mode (Spark 1.1, Hadoop 2.4, Windows Server 2008). A very simple program runs fine in local mode but fails in standalone mode. Here is the error: 14/11/20 17:01:53 INFO DAGScheduler: Failed to run count at SimpleApp.scala:22 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, UK-RND-PN02.actixhost.euhttp://UK-RND-PN02.actixhost.eu): java.lang.ClassNotFoundException: SimpleApp$$anonfun$1 java.net.URLClassLoader$1.run(URLClassLoader.java:202) I have added the jar to the SparkConf() to be on the safe side and it appears in standard output (copied after the code): /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import java.net.URLClassLoader object SimpleApp { def main(args: Array[String]) { val logFile = S:\\spark-1.1.0-bin-hadoop2.4\\README.md val conf = new SparkConf()//.setJars(Seq(s:\\spark\\simple\\target\\scala-2.10\\simple-project_2.10-1.0.jar)) .setMaster(spark://UK-RND-PN02.actixhost.eu:7077http://UK-RND-PN02.actixhost.eu:7077) //.setMaster(local[4]) .setAppName(Simple Application) val sc = new SparkContext(conf) val cl = ClassLoader.getSystemClassLoader val urls = cl.asInstanceOf[URLClassLoader].getURLs urls.foreach(url = println(Executor classpath is: + url.getFile)) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) sc.stop() } } Simple-project is in the executor classpath list: 14/11/20 17:01:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 Executor classpath is:/S:/spark/simple/ Executor classpath is:/S:/spark/simple/target/scala-2.10/simple-project_2.10-1.0.jar Executor classpath is:/S:/spark-1.1.0-bin-hadoop2.4/conf/ Executor classpath is:/S:/spark-1.1.0-bin-hadoop2.4/lib/spark-assembly-1.1.0-hadoop2.4.0.jar Executor classpath is:/S:/spark/simple/ Executor classpath is:/S:/spark-1.1.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.1.jar Executor classpath is:/S:/spark-1.1.0-bin-hadoop2.4/lib/datanucleus-core-3.2.2.jar Executor classpath is:/S:/spark-1.1.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.1.jar Executor classpath is:/S:/spark/simple/ Would you have any idea how I could investigate further ? Thanks ! Benoit. PS: I could attach a debugger to the Worker where the ClassNotFoundException happens but it is a bit painful This message and the information contained herein is proprietary and confidential and subject to the Amdocs policy statement, you may review at http://www.amdocs.com/email_disclaimer.asp If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-in-standalone-mode-tp19391.html To start a new topic under Apache Spark User List,
RE: Writing collection to file error
Thanks for your help Akhil, however, this is creating an output folder and storing the result sets in multiple files. Also the record count in the result set seems to have multiplied!! Is there any other way to achieve this? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, November 24, 2014 5:55 PM To: Saurabh Agrawal Cc: user@spark.apache.org Subject: Re: Writing collection to file error Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile(/path/CFOutput) Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal saurabh.agra...@markit.commailto:saurabh.agra...@markit.com wrote: import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at
Spark SQL (1.0)
Hi, I build 2 tables from files. Table F1 join with table F2 on c5=d4. F1 has 46730613 rows F2 has 3386740 rows All keys d4 exists in F1.c5, so i expect to retrieve 46730613 rows. But it returns only 3437 rows // --- begin code --- val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD val rddFile = sc.textFile(hdfs://referential/F1/part-*) case class F1(c1:String, c2:String,c3:Double, c3:String, c5: String) val stkrdd = rddFile.map(x = x.split(|)).map(f = F1(f(44),f(3),f(10).toDouble, ,f(2))) stkrdd.registerAsTable(F1) sqlContext.cacheTable(F1) val prdfile = sc.textFile(hdfs://referential/F2/part-*) case class F2(d1: String, d2:String, d3:String,d4:String) val productrdd = prdfile.map(x = x.split(|)).map(f = F2(f(0),f(2),f(101),f(3))) productrdd.registerAsTable(F2) sqlContext.cacheTable(F2) val resrdd = sqlContext.sql(Select count(*) from F1, F2 where F1.c5 = F2.d4 ).collect() // --- end of code --- Does anybody know what i missed ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-tp19651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Converting a column to a map
jsonFiles in your code is schemaRDD rather than RDD[Array]. If it is a column in schemaRDD, you can first use Spark SQL query to get a certain column. Or schemaRDD support some SQL like operation such as select / where can also get specific column. 在 2014年11月24日,上午4:01,Daniel Haviv danielru...@gmail.com 写道: Hi, I have a column in my schemaRDD that is a map but I'm unable to convert it to a map.. I've tried converting it to a Tuple2[String,String]: val converted = jsonFiles.map(line= { line(10).asInstanceOf[Tuple2[String,String]]}) but I get ClassCastException: 14/11/23 11:51:30 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.Tuple2 And if if convert it to Iterable[String] I can only get the values without the keys. What it the correct data type I should convert it to ? Thanks, Daniel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Cassandra Guava version issues
I faced same problem, and s work around solution is here : https://github.com/datastax/spark-cassandra-connector/issues/292 best, /Shahab On Mon, Nov 24, 2014 at 3:21 PM, Ashic Mahtab as...@live.com wrote: I've got a Cassandra 2.1.1 + Spark 1.1.0 cluster running. I'm using sbt-assembly to create a uber jar to submit to the stand alone master. I'm using the hadoop 1 prebuilt binaries for Spark. As soon as I try to do sc.CassandraTable(...) I get an error that's likely to be a Guava versioning issue. I'm using the Spark Cassandra connector v 1.1.0-rc2 which just came out, though the issue was in rc1 as well. I can't see the cassandra connector using Guava directly, so I guess it's a dependency for some other thing that the cassandra spark connector is using. Does anybody have a workaround for this? The sbt file and the exception are given below. Regards, Ashic. sbt file: import sbt._ import Keys._ import sbtassembly.Plugin._ import AssemblyKeys._ assemblySettings name := foo version := 0.1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq ( org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.1.0-rc2 withSources() withJavadoc(), org.specs2 %% specs2 % 2.4 % test withSources() ) //allow provided for run run in Compile = Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)) mergeStrategy in assembly := { case PathList(META-INF, xs @ _*) = (xs map {_.toLowerCase}) match { case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: Nil) = MergeStrategy.discard case _ = MergeStrategy.discard } case _ = MergeStrategy.first } resolvers += Akka Repository at http://repo.akka.io/releases/; test in assembly := {} Exception: 14/11/24 14:20:11 INFO client.AppClient$ClientActor: Executor updated: app-20141124142008-0001/0 is now RUNNING Exception in thread main java.lang.NoSuchMethodError: com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set; at com.datastax.driver.core.Cluster$ConnectionReaper.init(Cluster.java:2065) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1163) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1110) at com.datastax.driver.core.Cluster.init(Cluster.java:118) at com.datastax.driver.core.Cluster.init(Cluster.java:105) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:174) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1075) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:81) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:165) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:227) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:226) at com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:266) at com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:263) at com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:292) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
Spark and Stanford CoreNLP
Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing collection to file error
To get the results in a single file, you could do a repartition(1) and then save it. ratesAndPreds.repartition(1).saveAsTextFile(/path/CFOutput) Thanks Best Regards On Mon, Nov 24, 2014 at 8:32 PM, Saurabh Agrawal saurabh.agra...@markit.com wrote: Thanks for your help Akhil, however, this is creating an output folder and storing the result sets in multiple files. Also the record count in the result set seems to have multiplied!! Is there any other way to achieve this? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, November 24, 2014 5:55 PM *To:* Saurabh Agrawal *Cc:* user@spark.apache.org *Subject:* Re: Writing collection to file error Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile(/path/CFOutput) Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal saurabh.agra...@markit.com wrote: import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile(/path/CFReady.txt) val ratings = data.map(_.split('\t') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) = (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) = ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) = ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) = val err = (r1 - r2) err * err }.mean() println(Mean Squared Error = + MSE) val pw = new PrintWriter(new File(/path/CFOutput.txt)) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at
Re: MLLib: LinearRegressionWithSGD performance
From the metrics page, it reveals that only two executors work parallel for each iteration. You need to improve parallel threads numbers. Some tips maybe helpful: Increase spark.default.parallelism; Use repartition() or coalesce() to increase partition number. 在 2014年11月22日,上午3:18,Sameer Tilak ssti...@live.com 写道: Hi All, I have been using MLLib's linear regression and I have some question regarding the performance. We have a cluster of 10 nodes -- each node has 24 cores and 148GB memory. I am running my app as follows: time spark-submit --class medslogistic.MedsLogistic --master yarn-client --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar I am also going to play with number of executors (reduce it) may be that will give us different results. The input is a 800MB sparse file in LibSVNM format. Total number of features is 150K. It takes approximately 70 minutes for the regression to finish. The job imposes very little load on CPU, memory, network, and disk. Total number of tasks is 104. Total time gets divided fairly uniformly across these tasks each task. I was wondering, is it possible to reduce the execution time further? Screen Shot 2014-11-21 at 11.09.20 AM.png Screen Shot 2014-11-21 at 10.59.42 AM.png - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to keep a local variable in each cluster?
发自我的 iPad 在 2014年11月24日,上午9:41,zh8788 78343...@qq.com 写道: Hi, I am new to spark. This is the first time I am posting here. Currently, I try to implement ADMM optimization algorithms for Lasso/SVM Then I come across a problem: Since the training data(label, feature) is large, so I created a RDD and cached the training data(label, feature ) in memory. Then for ADMM, it needs to keep local parameters (u,v) (which are different for each partition ). For each iteration, I need to use the training data(only on that partition), u, v to calculate the new value for u and v. RDD has a transform named mapPartitions(), it runs separately on each partition of RDD. Question1: One way is to zip (training data, u, v) into a rdd and update it in each iteration, but as we can see, training data is large and won't change for the whole time, only u, v (is small) are changed in each iteration. If I zip these three, I could not cache that rdd (since it changed for every iteration). But if did not cache that, I need to reuse the training data every iteration, how could I do it? Question2: Related to Question1, on the online documents, it said if we don't cache the rdd, it will not in the memory. And rdd uses delayed operation, then I am confused when can I view a previous rdd in memroy. Case1: B = A.map(function1). B.collect()#This forces B to be calculated ? After that, the node just release B since it is not cached ??? D = B.map(function3) D.collect() Case2: B = A.map(function1). D = B.map(function3) D.collect() Case3: B = A.map(function1). C = A.map(function2) D = B.map(function3) D.collect() In which case, can I view B is in memory in each cluster when I calculate D? If you want a certain RDD store in memory, use RDD.persistent(MEMORY_ONLY). Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. Question3: can I use a function to do operations on two rdds? Yes, but it can only be executed in driver. E.g Function newfun(rdd1, rdd2) #rdd1 is large and do not change for the whole time (training data), which I can use cache #rdd2 is small and change in each iteration (u, v ) Questions4: Or are there other ways to solve this kind of problem? I think this is common problem, but I could not find any good solutions. Thanks a lot Han -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-a-local-variable-in-each-cluster-tp19604.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Connected Components running for a long time and failing eventually
I am trying to run connected components on a graph generated by reading an edge file. Its running for a long time(3-4 hrs) and then eventually failing. Cant find any error in log file. The file I am testing it on has 27M rows(edges). Is there something obviously wrong with the code? I tested the same code with around 1000 rows input and it works just fine. object ConnectedComponentsTest { def main(args: Array[String]) { val inputFile = /user/hive/warehouse/spark_poc.db/window_compare_output_subset/00_0.snappy,/user/hive/warehouse/spark_poc.db/window_compare_output_subset/01_0.snappy // Should be some file on your system val conf = new SparkConf().setAppName(ConnectedComponentsTest) val sc = new SparkContext(conf) val graph = GraphLoader.edgeListFile(sc, inputFile, true); val cc = graph.connectedComponents().vertices; cc.saveAsTextFile(/user/kakn/output); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connected-Components-running-for-a-long-time-and-failing-eventually-tp19659.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
advantages of SparkSQL?
Hi, Is there any advantage to storing data as a parquet format, loading it using the sparkSQL context, but never registering as a table/using sql on it? Something like: Something like: data = sqc.parquetFile(path) results = data.map(lambda x: applyfunc(x.field)) Is this faster/more optimised than having the data stored as a text file and using Spark (non-SQL) to process it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Mllib native netlib-java/OpenBLAS
Hi, i'm trying to improve performance for Spark's Mllib, and I am having trouble getting native netlib-java libraries installed/recognized by Spark. I am running on a single machine, Ubuntu 14.04 and here is what I've tried: sudo apt-get install libgfortran3 sudo apt-get install libatlas3-base libopenblas-base (this is how netlib-java's website says to install it) I also double checked and it looks like the libraries are linked correctly in /usr/lib (see below): /usr/lib/libblas.so.3 - /etc/alternatives/libblas.so.3 /usr/lib/liblapack.so.3 - /etc/alternatives/liblapack.so.3 The Dependencies section on Spark's Mllib website also says to include com.github.fommil.netlib:all:1.1.2 as a dependency. I therefore tried adding this to my sbt file like so: libraryDependencies += com.github.fommil.netlib % all % 1.1.2 After all this, i'm still seeing the following error message. Does anyone have more detailed installation instructions? 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: advantages of SparkSQL?
Parquet is a column-oriented format, which means that you need to read in less data from the file system if you're only interested in a subset of your columns. Also, Parquet pushes down selection predicates, which can eliminate needless deserialization of rows that don't match a selection criterion. Other than that, you would also get compression, and likely save processor cycles when parsing lines from text files. On Mon, Nov 24, 2014 at 8:20 AM, mrm ma...@skimlinks.com wrote: Hi, Is there any advantage to storing data as a parquet format, loading it using the sparkSQL context, but never registering as a table/using sql on it? Something like: Something like: data = sqc.parquetFile(path) results = data.map(lambda x: applyfunc(x.field)) Is this faster/more optimised than having the data stored as a text file and using Spark (non-SQL) to process it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
Hello, I'm new to Stanford CoreNLP. Could any one share good training material and examples(java or scala) on NLP. Regards, Rajesh On Mon, Nov 24, 2014 at 9:38 PM, Ian O'Connell i...@ianoconnell.com wrote: object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
This is probably not the right venue for general questions on CoreNLP - the project website (http://nlp.stanford.edu/software/corenlp.shtml) provides documentation and links to mailing lists/stack overflow topics. On Mon, Nov 24, 2014 at 9:08 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hello, I'm new to Stanford CoreNLP. Could any one share good training material and examples(java or scala) on NLP. Regards, Rajesh On Mon, Nov 24, 2014 at 9:38 PM, Ian O'Connell i...@ianoconnell.com wrote: object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How does Spark SQL traverse the physical tree?
Hi All, I'm learning the code of Spark SQL. I'm confused about how SchemaRDD executes each operator. I'm tracing the code. I found toRDD() function in QueryExecution is the start for running a query. toRDD function will run SparkPlan, which is a tree structure. However, I didn't find any iterative sentence in execute function for any detail operators. It seems Spark SQL will only run the top node in this tree. I know the conclusion is wrong.But which code have I missed? Thanks, Tim
Using Spark Context as an attribute of a class cannot be used
Hello guys, I'm using Spark 1.0.0 and Kryo serialization In the Spark Shell, when I create a class that contains as an attribute the SparkContext, in this way: class AAA(val s: SparkContext) { } val aaa = new AAA(sc) and I execute any action using that attribute like: val myNumber = 5 aaa.s.textFile(FILE).filter(_ == myNumber.toString).count or aaa.s.parallelize(1 to 10).filter(_ == myNumber).count Returns a NonSerializibleException: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$AAA at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any thoughts about how to solve this issue and how can I give a workaround to it? I'm actually developing an Api that will need the usage of this SparkContext several times in different locations, so I will needed to be accessible. Thanks a lot for the cooperation -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark error in execution
I created an application in spark. When I run it with spark, everything works fine. But when I export my application with the libraries (via sbt), and trying to run it as an executable jar, I get the following error: 14/11/24 20:06:11 ERROR OneForOneStrategy: exception during creation akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:349) at akka.actor.Props.newActor(Props.scala:249) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.AbstractMethodError: akka.remote.RemoteActorRefProvider$RemotingTerminator.akka$actor$FSM$_setter_$Event_$eq(Lakka/actor/FSM$Event$;)V at akka.actor.FSM$class.$init$(FSM.scala:272) at akka.remote.RemoteActorRefProvider$RemotingTerminator.init(RemoteActorRefProvider.scala:36) ... 18 more 14/11/24 20:06:11 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem [sparkDriver] java.lang.AbstractMethodError at akka.actor.ActorCell.create(ActorCell.scala:580) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [11/24/2014 20:06:11.478] [sparkDriver-akka.actor.default-dispatcher-4] [ActorSystem(sparkDriver)] Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver] java.lang.AbstractMethodError at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [11/24/2014 20:06:11.481] [sparkDriver-akka.actor.default-dispatcher-3] [ActorSystem(sparkDriver)] Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem [sparkDriver] java.lang.AbstractMethodError at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at
Re: Spark streaming job failing after some time.
I have figured out the problem here. Turned out that there was a problem with my SparkConf when I was running my application with yarn in cluster mode. I was setting my master to be local[4] inside my application, whereas I was setting it to yarn-cluster with spark-submit. Now I have changed my SparkConf in my application to not to hardcore master and it works. The application was running for some time since yarn application master attempts retry for maxNumTries and waits between each retry before it completely fails. I was getting appropriate results from my streaming job during this time. Now, I can't figure out as to why it should run successfully during this time even if it could not find SparkContext. I am sure there should be good reason behind this behavior. Anyone has any idea on this? Thanks, Pankaj Channe On Saturday, November 22, 2014, pankaj channe pankajc...@gmail.com wrote: Thanks Akhil for your input. I have already tried with 3 executors and it still results into the same problem. So as Sean mentioned, the problem does not seem to be related to that. On Sat, Nov 22, 2014 at 11:00 AM, Sean Owen so...@cloudera.com wrote: That doesn't seem to be the problem though. It processes but then stops. Presumably there are many executors. On Nov 22, 2014 9:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For Spark streaming, you must always set *--executor-cores* to a value which is = 2. Or else it will not do any processing. Thanks Best Regards On Sat, Nov 22, 2014 at 8:39 AM, pankaj channe pankajc...@gmail.com wrote: I have seen similar posts on this issue but could not find solution. Apologies if this has been discussed here before. I am running a spark streaming job with yarn on a 5 node cluster. I am using following command to submit my streaming job. spark-submit --class class_name --master yarn-cluster --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar After running for some time, the job stops. The application log shows following two errors: 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) and later... Failed to list files for dir: /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169) Note: I am building my jar on my local with spark dependency added in pom.xml and running it on cluster running spark. -Pankaj
Python Scientific Libraries in Spark
Hello Folks: Since spark exposes python bindings and allows you to express your logic in Python, Is there a way to leverage some of the sophisticated libraries like NumPy, SciPy, Scikit in spark job and run at scale? What's been your experience, any insights you can share in terms of what's possible today and some of the active development in the community that's on the horizon. Thanks, Rohit Pujari Solutions Architect, Hortonworks -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
RE: Spark Cassandra Guava version issues
Did the workaround work for you? Doesn't seem to work for me. Date: Mon, 24 Nov 2014 16:44:17 +0100 Subject: Re: Spark Cassandra Guava version issues From: shahab.mok...@gmail.com To: as...@live.com CC: user@spark.apache.org I faced same problem, and s work around solution is here : https://github.com/datastax/spark-cassandra-connector/issues/292 best,/Shahab On Mon, Nov 24, 2014 at 3:21 PM, Ashic Mahtab as...@live.com wrote: I've got a Cassandra 2.1.1 + Spark 1.1.0 cluster running. I'm using sbt-assembly to create a uber jar to submit to the stand alone master. I'm using the hadoop 1 prebuilt binaries for Spark. As soon as I try to do sc.CassandraTable(...) I get an error that's likely to be a Guava versioning issue. I'm using the Spark Cassandra connector v 1.1.0-rc2 which just came out, though the issue was in rc1 as well. I can't see the cassandra connector using Guava directly, so I guess it's a dependency for some other thing that the cassandra spark connector is using. Does anybody have a workaround for this? The sbt file and the exception are given below. Regards, Ashic. sbt file: import sbt._ import Keys._ import sbtassembly.Plugin._ import AssemblyKeys._ assemblySettings name := foo version := 0.1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq ( org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.1.0-rc2 withSources() withJavadoc(), org.specs2 %% specs2 % 2.4 % test withSources() ) //allow provided for run run in Compile = Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)) mergeStrategy in assembly := { case PathList(META-INF, xs @ _*) = (xs map {_.toLowerCase}) match { case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: Nil) = MergeStrategy.discard case _ = MergeStrategy.discard } case _ = MergeStrategy.first } resolvers += Akka Repository at http://repo.akka.io/releases/; test in assembly := {} Exception: 14/11/24 14:20:11 INFO client.AppClient$ClientActor: Executor updated: app-20141124142008-0001/0 is now RUNNING Exception in thread main java.lang.NoSuchMethodError: com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set; at com.datastax.driver.core.Cluster$ConnectionReaper.init(Cluster.java:2065) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1163) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1110) at com.datastax.driver.core.Cluster.init(Cluster.java:118) at com.datastax.driver.core.Cluster.init(Cluster.java:105) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:174) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1075) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:81) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:165) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:227) at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:226) at com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:266) at com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:263) at com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:292) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120)
Re: Python Logistic Regression error
The data is in LIBSVM format. So this line won't work: values = [float(s) for s in line.split(' ')] Please use the util function in MLUtils to load it as an RDD of LabeledPoint. http://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point from pyspark.mllib.util import MLUtils examples = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt) -Xiangrui On Sun, Nov 23, 2014 at 11:38 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: Can you please suggest sample data for running the logistic_regression.py? I am trying to use a sample data file at https://github.com/apache/spark/blob/master/data/mllib/sample_linear_regression_data.txt I am running this on CDH5.2 Quickstart VM. [cloudera@quickstart mllib]$ spark-submit logistic_regression.py lr.txt 3 But, getting below error. 14/11/23 11:23:55 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 14/11/23 11:23:55 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/11/23 11:23:55 INFO TaskSchedulerImpl: Cancelling stage 0 14/11/23 11:23:55 INFO DAGScheduler: Failed to run runJob at PythonRDD.scala:296 Traceback (most recent call last): File /usr/lib/spark/examples/lib/mllib/logistic_regression.py, line 50, in module model = LogisticRegressionWithSGD.train(points, iterations) File /usr/lib/spark/python/pyspark/mllib/classification.py, line 110, in train initialWeights) File /usr/lib/spark/python/pyspark/mllib/_common.py, line 430, in _regression_train_wrapper initial_weights = _get_initial_weights(initial_weights, data) File /usr/lib/spark/python/pyspark/mllib/_common.py, line 415, in _get_initial_weights initial_weights = _convert_vector(data.first().features) File /usr/lib/spark/python/pyspark/rdd.py, line 1127, in first rs = self.take(1) File /usr/lib/spark/python/pyspark/rdd.py, line 1109, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /usr/lib/spark/python/pyspark/context.py, line 770, in runJob it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.139.145): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /usr/lib/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /usr/lib/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /usr/lib/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /usr/lib/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /usr/lib/spark/python/pyspark/rdd.py, line 1105, in takeUpToNumLeft yield next(iterator) File /usr/lib/spark/examples/lib/mllib/logistic_regression.py, line 37, in parsePoint values = [float(s) for s in line.split(' ')] ValueError: invalid literal for float(): 1:0.4551273600657362 Regards, Venkat This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
Try building Spark with -Pnetlib-lgpl, which includes the JNI library in the Spark assembly jar. This is the simplest approach. If you want to include it as part of your project, make sure the library is inside the assembly jar or you specify it via `--jars` with spark-submit. -Xiangrui On Mon, Nov 24, 2014 at 8:51 AM, agg212 alexander_galaka...@brown.edu wrote: Hi, i'm trying to improve performance for Spark's Mllib, and I am having trouble getting native netlib-java libraries installed/recognized by Spark. I am running on a single machine, Ubuntu 14.04 and here is what I've tried: sudo apt-get install libgfortran3 sudo apt-get install libatlas3-base libopenblas-base (this is how netlib-java's website says to install it) I also double checked and it looks like the libraries are linked correctly in /usr/lib (see below): /usr/lib/libblas.so.3 - /etc/alternatives/libblas.so.3 /usr/lib/liblapack.so.3 - /etc/alternatives/liblapack.so.3 The Dependencies section on Spark's Mllib website also says to include com.github.fommil.netlib:all:1.1.2 as a dependency. I therefore tried adding this to my sbt file like so: libraryDependencies += com.github.fommil.netlib % all % 1.1.2 After all this, i'm still seeing the following error message. Does anyone have more detailed installation instructions? 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
Additionally - I strongly recommend using OpenBLAS over the Atlas build from the default Ubuntu repositories. Alternatively, you can build ATLAS on the hardware you're actually going to be running the matrix ops on (the master/workers), but we've seen modest performance gains doing this vs. OpenBLAS, at least on the bigger EC2 machines (e.g. cc2.8xlarge, c3.8xlarge). On Mon, Nov 24, 2014 at 11:26 AM, Xiangrui Meng men...@gmail.com wrote: Try building Spark with -Pnetlib-lgpl, which includes the JNI library in the Spark assembly jar. This is the simplest approach. If you want to include it as part of your project, make sure the library is inside the assembly jar or you specify it via `--jars` with spark-submit. -Xiangrui On Mon, Nov 24, 2014 at 8:51 AM, agg212 alexander_galaka...@brown.edu wrote: Hi, i'm trying to improve performance for Spark's Mllib, and I am having trouble getting native netlib-java libraries installed/recognized by Spark. I am running on a single machine, Ubuntu 14.04 and here is what I've tried: sudo apt-get install libgfortran3 sudo apt-get install libatlas3-base libopenblas-base (this is how netlib-java's website says to install it) I also double checked and it looks like the libraries are linked correctly in /usr/lib (see below): /usr/lib/libblas.so.3 - /etc/alternatives/libblas.so.3 /usr/lib/liblapack.so.3 - /etc/alternatives/liblapack.so.3 The Dependencies section on Spark's Mllib website also says to include com.github.fommil.netlib:all:1.1.2 as a dependency. I therefore tried adding this to my sbt file like so: libraryDependencies += com.github.fommil.netlib % all % 1.1.2 After all this, i'm still seeing the following error message. Does anyone have more detailed installation instructions? 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/11/24 16:49:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark Context as an attribute of a class cannot be used
Marcelo Vanzin wrote Do you expect to be able to use the spark context on the remote task? Not At all, what I want to create is a wrapper of the SparkContext, to be used only on the driver node. I would like to have in this AAA wrapper several attributes, such as the SparkContext and other configurations for my project. I tested using -Dsun.io.serialization.extendedDebugInfo=true This is the stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$AAA - field (class $iwC$$iwC$$iwC$$iwC, name: aaa, type: class $iwC$$iwC$$iwC$$iwC$AAA) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@24e57dcb) - field (class $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@178cc62b) - field (class $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC, $iwC$$iwC@1e9f5eeb) - field (class $iwC, name: $iw, type: class $iwC$$iwC) - object (class $iwC, $iwC@37d8e87e) - field (class $line18.$read, name: $iw, type: class $iwC) - object (class $line18.$read, $line18.$read@124551f) - field (class $iwC$$iwC$$iwC, name: $VAL15, type: class $line18.$read) - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@2e846e6b) - field (class $iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@4b31ba1b) - field (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC) - object (class $iwC$$iwC$$iwC$$iwC$$anonfun$1, function1) - field (class org.apache.spark.rdd.FilteredRDD, name: f, type: interface scala.Function1) - root object (class org.apache.spark.rdd.FilteredRDD, FilteredRDD[3] at filter at console:20) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) I actually don't understand much about this stack trace. If you can help me, I would appreciate it. Transient didn't work either Thanks a lot -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668p19679.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to use Kryo
Hi, I want to test Kryo serialization but when starting spark-shell I'm hitting the following error: java.lang.ClassNotFoundException: org.apache.spark.KryoSerializer the kryo-2.21.jar is on the classpath so I'm not sure why it's not picking it up. Thanks for your help, Daniel
Re: How does Spark SQL traverse the physical tree?
You are pretty close. The QueryExecution is what drives the phases from parsing to execution. Once we have a final SparkPlan (the physical plan), toRdd just calls execute() which recursively calls execute() on children until we hit a leaf operator. This gives us an RDD[Row] that will compute the answer and from there the actual execution is driven by Spark Core. On Mon, Nov 24, 2014 at 9:52 AM, Tim Chou timchou@gmail.com wrote: Hi All, I'm learning the code of Spark SQL. I'm confused about how SchemaRDD executes each operator. I'm tracing the code. I found toRDD() function in QueryExecution is the start for running a query. toRDD function will run SparkPlan, which is a tree structure. However, I didn't find any iterative sentence in execute function for any detail operators. It seems Spark SQL will only run the top node in this tree. I know the conclusion is wrong.But which code have I missed? Thanks, Tim
Re: advantages of SparkSQL?
Akshat is correct about the benefits of parquet as a columnar format, but I'll add that some of this is lost if you just use a lambda function to process the data. Since your lambda function is a black box Spark SQL does not know which columns it is going to use and thus will do a full tablescan. I'd suggest writing a very simple SQL query that pulls out just the columns you need and does any filtering before dropping back into standard spark operations. The result of SQL queries is an RDD of rows so you can do any normal spark processing you want on them. Either way though it will often be faster than a text filed due to better encoding/compression. On Mon, Nov 24, 2014 at 8:54 AM, Akshat Aranya aara...@gmail.com wrote: Parquet is a column-oriented format, which means that you need to read in less data from the file system if you're only interested in a subset of your columns. Also, Parquet pushes down selection predicates, which can eliminate needless deserialization of rows that don't match a selection criterion. Other than that, you would also get compression, and likely save processor cycles when parsing lines from text files. On Mon, Nov 24, 2014 at 8:20 AM, mrm ma...@skimlinks.com wrote: Hi, Is there any advantage to storing data as a parquet format, loading it using the sparkSQL context, but never registering as a table/using sql on it? Something like: Something like: data = sqc.parquetFile(path) results = data.map(lambda x: applyfunc(x.field)) Is this faster/more optimised than having the data stored as a text file and using Spark (non-SQL) to process it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How can I read this avro file using spark scala?
Thanks for the feedback, I filed a couple of issues: https://github.com/databricks/spark-avro/issues On Fri, Nov 21, 2014 at 5:04 AM, thomas j beanb...@googlemail.com wrote: I've been able to load a different avro file based on GenericRecord with: val person = sqlContext.avroFile(/tmp/person.avro) When I try to call `first()` on it, I get NotSerializableException exceptions again: person.first() ... 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 20) java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ... Apart from this I want to transform the records into pairs of (user_id, record). I can do this by specifying the offset of the user_id column with something like this: person.map(r = (r.getInt(2), r)).take(4).collect() Is there any way to be able to specify the column name (user_id) instead of needing to know/calculate the offset somehow? Thanks again On Fri, Nov 21, 2014 at 11:48 AM, thomas j beanb...@googlemail.com wrote: Thanks for the pointer Michael. I've downloaded spark 1.2.0 from https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and built the spark-avro repo you linked to. When I run it against the example avro file linked to in the documentation it works. However, when I try to load my avro file (linked to in my original question) I receive the following error: java.lang.RuntimeException: Unsupported type LONG at scala.sys.package$.error(package.scala:27) at com.databricks.spark.avro.AvroRelation.com $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97) at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ... If this is useful I'm happy to try loading the various different avro files I have to try to battle-test spark-avro. Thanks On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust mich...@databricks.com wrote: One option (starting with Spark 1.2, which is currently in preview) is to use the Avro library for Spark SQL. This is very new, but we would love to get feedback: https://github.com/databricks/spark-avro On Thu, Nov 20, 2014 at 10:19 AM, al b beanb...@googlemail.com wrote: I've read several posts of people struggling to read avro in spark. The examples I've tried don't work. When I try this solution ( https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files) I get errors: spark java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper How can I read the following sample file in spark using scala? http://www.4shared.com/file/SxnYcdgJce/sample.html Thomas
Re: Using Spark Context as an attribute of a class cannot be used
Hello, On Mon, Nov 24, 2014 at 12:07 PM, aecc alessandroa...@gmail.com wrote: This is the stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$AAA - field (class $iwC$$iwC$$iwC$$iwC, name: aaa, type: class $iwC$$iwC$$iwC$$iwC$AAA) Ah. Looks to me that you're trying to run this in spark-shell, right? I'm not 100% sure of how it works internally, but I think the Scala repl works a little differently than regular Scala code in this regard. When you declare a val in the shell it will behave differently than a val inside a method in a compiled Scala class - the former will behave like an instance variable, the latter like a local variable. So, this is probably why you're running into this. Try compiling your code and running it outside the shell to see how it goes. I'm not sure whether there's a workaround for this when trying things out in the shell - maybe declare an `object` to hold your constants? Never really tried, so YMMV. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to insert complex types like mapstring,mapstring,int in spark sql
Can you give the full stack trace. You might be hitting: https://issues.apache.org/jira/browse/SPARK-4293 On Sun, Nov 23, 2014 at 3:00 PM, critikaled isasmani@gmail.com wrote: Hi, I am trying to insert particular set of data from rdd to a hive table I have Map[String,Map[String,Int]] in scala which I want to insert into the table of mapstring,maplt;string,int I was able to create the table but while inserting it says scala.MatchError: MapType(StringType,MapType(StringType,IntegerType,true),true) (of class org.apache.spark.sql.catalyst.types.MapType) can any one help me with this. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merging Parquet Files
Parquet does a lot of serial metadata operations on the driver which makes it really slow when you have a very large number of files (especially if you are reading from something like S3). This is something we are aware of and that I'd really like to improve in 1.3. You might try the (brand new and very experimental) new parquet support that I added into 1.2 at the last minute in an attempt to make our metadata handling more efficient. Basically you load the parquet files using the new data source API instead of using parquetFile: CREATE TEMPORARY TABLE data USING org.apache.spark.sql.parquet OPTIONS ( path 'path/to/parquet' ) This will at least parallelize the retrieval of file status object, but there is a lot more optimization that I hope to do. On Sat, Nov 22, 2014 at 1:53 PM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm ingesting a lot of small JSON files and convert them to unified parquet files, but even the unified files are fairly small (~10MB). I want to run a merge operation every hour on the existing files, but it takes a lot of time for such a small amount of data: about 3 GB spread of 3000 parquet files. Basically what I'm doing is load files in the existing directory, coalesce them and save to the new dir: val parquetFiles=sqlContext.parquetFile(/requests_merged/inproc) parquetFiles.coalesce(2).saveAsParquetFile(/requests_merged/$currday) Doing this takes over an hour on my 3 node cluster... Is there a better way to achieve this ? Any ideas what can cause such a simple operation take so long? Thanks, Daniel
Re: Mllib native netlib-java/OpenBLAS
Can you clarify what is the Spark master URL you are using ? Is it 'local' or is it a cluster ? If it is 'local' then rebuilding Spark wouldn't help as Spark is getting pulled in from Maven and that'll just pick up the released artifacts. Shivaram On Mon, Nov 24, 2014 at 1:08 PM, agg212 alexander_galaka...@brown.edu wrote: I tried building Spark from the source, by downloading it and running: mvn -Pnetlib-lgpl -DskipTests clean package I then installed OpenBLAS by doing the following: - Download and unpack .tar from http://www.openblas.net/ - Run `make` I then linked /usr/lib/libblas.so.3 to /usr/lib/libopenblas.so (which links to /usr/lib/libopenblas_sandybridgep-r0.2.12.so) I am still getting the following error when running a job after installing spark from the source with the -Pnetlib-lgpl flag: WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Any thoughts on what else I need to do to get the native libraries recognized by Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p19681.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark Context as an attribute of a class cannot be used
Yes, I'm running this in the Shell. In my compiled Jar it works perfectly, the issue is I need to do this on the shell. Any available workarounds? I checked sqlContext, they use it in the same way I would like to use my class, they make the class Serializable with transient. Does this affects somehow the whole pipeline of data moving? I mean, will I get performance issues when doing this because now the class will be Serialized for some reason that I still don't understand? 2014-11-24 22:33 GMT+01:00 Marcelo Vanzin [via Apache Spark User List] ml-node+s1001560n19687...@n3.nabble.com: Hello, On Mon, Nov 24, 2014 at 12:07 PM, aecc [hidden email] http://user/SendEmail.jtp?type=nodenode=19687i=0 wrote: This is the stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$AAA - field (class $iwC$$iwC$$iwC$$iwC, name: aaa, type: class $iwC$$iwC$$iwC$$iwC$AAA) Ah. Looks to me that you're trying to run this in spark-shell, right? I'm not 100% sure of how it works internally, but I think the Scala repl works a little differently than regular Scala code in this regard. When you declare a val in the shell it will behave differently than a val inside a method in a compiled Scala class - the former will behave like an instance variable, the latter like a local variable. So, this is probably why you're running into this. Try compiling your code and running it outside the shell to see how it goes. I'm not sure whether there's a workaround for this when trying things out in the shell - maybe declare an `object` to hold your constants? Never really tried, so YMMV. -- Marcelo - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19687i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19687i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668p19687.html To unsubscribe from Using Spark Context as an attribute of a class cannot be used, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=19668code=YWxlc3NhbmRyb2FlY2NAZ21haWwuY29tfDE5NjY4fDE2MzQ0ODgyMDU= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Alessandro Chacón Aecc_ORG -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668p19690.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark S3 Performance
Andrei, Ashish, To be clear, I don't think it's *counting* the entire file. It just seems from the logging and the timing that it is doing a get of the entire file, then figures out it only needs some certain blocks, does another get of only the specific block. Regarding # partitions - I think I see now it has to do with Hadoop's block size being set at 64MB. This is not a big deal to me, the main issue is the first one, why is every worker doing a call to get the entire file followed by the *real* call to get only the specific partitions it needs. Best, - Nitay Founder CTO On Sat, Nov 22, 2014 at 8:28 PM, Andrei faithlessfri...@gmail.com wrote: Concerning your second question, I believe you try to set number of partitions with something like this: rdd = sc.textFile(..., 8) but things like `textFile()` don't actually take fixed number of partitions. Instead, they expect *minimal* number of partitions. Since in your file you have 21 blocks of data, it creates exactly 21 worker (which is greater than 8, as expected). To set exact number of partitions, use `repartition()` or its full version - `coalesce()` (see example [1]) [1]: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole arang...@gmail.com wrote: What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem. I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data: 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864 On Nov 22, 2014 7:23 AM, Nitay Joffe ni...@actioniq.co wrote: Err I meant #1 :) - Nitay Founder CTO On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe ni...@actioniq.co wrote: Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong. - Nitay Founder CTO On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe ni...@actioniq.co wrote: I have a simple S3 job to read a text file and do a line count. Specifically I'm doing *sc.textFile(s3n://mybucket/myfile).count*.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 = Spark). The whole count is taking on the order of a couple of minutes, which seems extremely slow. I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do... 1) Every executor seems to make an S3 call to read the *entire file* before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this? 2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21. Thanks for the help, - Nitay Founder CTO
Re: Using Spark Context as an attribute of a class cannot be used
On Mon, Nov 24, 2014 at 1:56 PM, aecc alessandroa...@gmail.com wrote: I checked sqlContext, they use it in the same way I would like to use my class, they make the class Serializable with transient. Does this affects somehow the whole pipeline of data moving? I mean, will I get performance issues when doing this because now the class will be Serialized for some reason that I still don't understand? If you want to do the same thing, your AAA needs to be serializable and you need to mark all non-serializable fields as @transient. The only performance penalty you'll be paying is the serialization / deserialization of the AAA instance, which most probably will be really small compared to the actual work the task will be doing. Unless your class is holding a whole lot of data, in which case you should start thinking about using a broadcast instead. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ideas on how to use Spark for anomaly detection on a stream of data
Hi all, I am getting started with Spark. I would like to use for a spike on anomaly detection in a massive stream of metrics. Can Spark easily handle this use case ? Thanks, Natu
Re: Using Spark Context as an attribute of a class cannot be used
Ok, great, I'm gonna do do it that way, thanks :). However I still don't understand why this object should be serialized and shipped? aaa.s and sc are both the same object org.apache.spark.SparkContext@1f222881 However this : aaa.s.parallelize(1 to 10).filter(_ == myNumber).count Needs to be serialized, and this: sc.parallelize(1 to 10).filter(_ == myNumber).count does not. 2014-11-24 23:13 GMT+01:00 Marcelo Vanzin [via Apache Spark User List] ml-node+s1001560n19692...@n3.nabble.com: On Mon, Nov 24, 2014 at 1:56 PM, aecc [hidden email] http://user/SendEmail.jtp?type=nodenode=19692i=0 wrote: I checked sqlContext, they use it in the same way I would like to use my class, they make the class Serializable with transient. Does this affects somehow the whole pipeline of data moving? I mean, will I get performance issues when doing this because now the class will be Serialized for some reason that I still don't understand? If you want to do the same thing, your AAA needs to be serializable and you need to mark all non-serializable fields as @transient. The only performance penalty you'll be paying is the serialization / deserialization of the AAA instance, which most probably will be really small compared to the actual work the task will be doing. Unless your class is holding a whole lot of data, in which case you should start thinking about using a broadcast instead. -- Marcelo - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19692i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=19692i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668p19692.html To unsubscribe from Using Spark Context as an attribute of a class cannot be used, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=19668code=YWxlc3NhbmRyb2FlY2NAZ21haWwuY29tfDE5NjY4fDE2MzQ0ODgyMDU= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Alessandro Chacón Aecc_ORG -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-Context-as-an-attribute-of-a-class-cannot-be-used-tp19668p19694.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark S3 Performance
Can you verify that its reading the entire file on each worker using network monitoring stats? If it does, that would be a bug in my opinion. On Mon, Nov 24, 2014 at 2:06 PM, Nitay Joffe ni...@actioniq.co wrote: Andrei, Ashish, To be clear, I don't think it's *counting* the entire file. It just seems from the logging and the timing that it is doing a get of the entire file, then figures out it only needs some certain blocks, does another get of only the specific block. Regarding # partitions - I think I see now it has to do with Hadoop's block size being set at 64MB. This is not a big deal to me, the main issue is the first one, why is every worker doing a call to get the entire file followed by the *real* call to get only the specific partitions it needs. Best, - Nitay Founder CTO On Sat, Nov 22, 2014 at 8:28 PM, Andrei faithlessfri...@gmail.com wrote: Concerning your second question, I believe you try to set number of partitions with something like this: rdd = sc.textFile(..., 8) but things like `textFile()` don't actually take fixed number of partitions. Instead, they expect *minimal* number of partitions. Since in your file you have 21 blocks of data, it creates exactly 21 worker (which is greater than 8, as expected). To set exact number of partitions, use `repartition()` or its full version - `coalesce()` (see example [1]) [1]: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole arang...@gmail.com wrote: What makes you think that each executor is reading the whole file? If that is the case then the count value returned to the driver will be actual X NumOfExecutors. Is that the case when compared with actual lines in the input file? If the count returned is same as actual then you probably don't have an extra read problem. I also see this in your logs which indicates that it is a read that starts from an offset and reading one split size (64MB) worth of data: 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input split: s3n://mybucket/myfile:335544320+67108864 On Nov 22, 2014 7:23 AM, Nitay Joffe ni...@actioniq.co wrote: Err I meant #1 :) - Nitay Founder CTO On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe ni...@actioniq.co wrote: Anyone have any thoughts on this? Trying to understand especially #2 if it's a legit bug or something I'm doing wrong. - Nitay Founder CTO On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe ni...@actioniq.co wrote: I have a simple S3 job to read a text file and do a line count. Specifically I'm doing *sc.textFile(s3n://mybucket/myfile).count*.The file is about 1.2GB. My setup is standalone spark cluster with 4 workers each with 2 cores / 16GB ram. I'm using branch-1.2 code built against hadoop 2.4 (though I'm not actually using HDFS, just straight S3 = Spark). The whole count is taking on the order of a couple of minutes, which seems extremely slow. I've been looking into it and so far have noticed two things, hoping the community has seen this before and knows what to do... 1) Every executor seems to make an S3 call to read the *entire file* before making another call to read just it's split. Here's a paste I've cleaned up to show just one task: http://goo.gl/XCfyZA. I've verified this happens in every task. It is taking a long time (40-50 seconds), I don't see why it is doing this? 2) I've tried a few numPartitions parameters. When I make the parameter anything below 21 it seems to get ignored. Under the hood FileInputFormat is doing something that always ends up with at least 21 partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and have seen that the performance only gets worse as I increase it beyond 21. I would like to try 8 just to see, but again I don't see how to force it to go below 21. Thanks for the help, - Nitay Founder CTO
Spark SQL - Any time line to move beyond Alpha version ?
Is there any timeline where Spark SQL goes beyond alpha version? Thanks,
Re: Using Spark Context as an attribute of a class cannot be used
That's an interesting question for which I do not know the answer. Probably a question for someone with more knowledge of the internals of the shell interpreter... On Mon, Nov 24, 2014 at 2:19 PM, aecc alessandroa...@gmail.com wrote: Ok, great, I'm gonna do do it that way, thanks :). However I still don't understand why this object should be serialized and shipped? aaa.s and sc are both the same object org.apache.spark.SparkContext@1f222881 However this : aaa.s.parallelize(1 to 10).filter(_ == myNumber).count Needs to be serialized, and this: sc.parallelize(1 to 10).filter(_ == myNumber).count does not. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python Scientific Libraries in Spark
These libraries could be used in PySpark easily. For example, MLlib uses Numpy heavily, it can accept np.array or sparse matrix in SciPy as vectors. On Mon, Nov 24, 2014 at 10:56 AM, Rohit Pujari rpuj...@hortonworks.com wrote: Hello Folks: Since spark exposes python bindings and allows you to express your logic in Python, Is there a way to leverage some of the sophisticated libraries like NumPy, SciPy, Scikit in spark job and run at scale? What's been your experience, any insights you can share in terms of what's possible today and some of the active development in the community that's on the horizon. Thanks, Rohit Pujari Solutions Architect, Hortonworks CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Stanford CoreNLP
Neat hack! This is cute and actually seems to work. The fact that it works is a little surprising and somewhat unintuitive. On Mon, Nov 24, 2014 at 8:08 AM, Ian O'Connell i...@ianoconnell.com wrote: object MyCoreNLP { @transient lazy val coreNLP = new coreNLP() } and then refer to it from your map/reduce/map partitions or that it should be fine (presuming its thread safe), it will only be initialized once per classloader per jvm On Mon, Nov 24, 2014 at 7:58 AM, Evan Sparks evan.spa...@gmail.com wrote: We have gotten this to work, but it requires instantiating the CoreNLP object on the worker side. Because of the initialization time it makes a lot of sense to do this inside of a .mapPartitions instead of a .map, for example. As an aside, if you're using it from Scala, have a look at sistanlp, which provided a nicer, scala-friendly interface to CoreNLP. On Nov 24, 2014, at 7:46 AM, tvas theodoros.vasilou...@gmail.com wrote: Hello, I was wondering if anyone has gotten the Stanford CoreNLP Java library to work with Spark. My attempts to use the parser/annotator fail because of task serialization errors since the class StanfordCoreNLP cannot be serialized. I've tried the remedies of registering StanfordCoreNLP through kryo, as well as using chill.MeatLocker, but these still produce serialization errors. Passing the StanfordCoreNLP object as transient leads to a NullPointerException instead. Has anybody managed to get this work? Regards, Theodore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is spark streaming +MlLib for online learning?
Hi Gurus, Sorry for my naive question. I am new. I seemed to read somewhere that spark is still batch learning, but spark streaming could allow online learning. I could not find this on the website now. http://spark.apache.org/docs/latest/streaming-programming-guide.html I know MLLib uses incremental or iterative algorithms, I wonder if this is also true between batches of spark streaming. So the question is: say, when I call MLLib linear regression, does the training use one batch data as training data, if yes, then the model update between batches is already taken care of? That is, the model will eventually use all data that arrived from the beginning until current time of scoring as the training data, or the model only use data coming in the past limited number of batches as training data? Many thanks! J
Re: Is spark streaming +MlLib for online learning?
Hi, On Tue, Nov 25, 2014 at 9:40 AM, Joanne Contact joannenetw...@gmail.com wrote: I seemed to read somewhere that spark is still batch learning, but spark streaming could allow online learning. Spark doesn't do Machine Learning itself, but MLlib does. MLlib currently can do online learning only for linear regression https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html#streaming-linear-regression, as far as I know. Tobias
Re: Setup Remote HDFS for Spark
Hi, On Sat, Nov 22, 2014 at 12:13 AM, EH eas...@gmail.com wrote: Unfortunately whether it is possible to have both Spark and HDFS running on the same machine is not under our control. :( Right now we have Spark and HDFS running in different machines. In this case, is it still possible to hook up a remote HDFS with Spark so that we can use Spark Streaming checkpoints? Thank you for your help. In my case, after I copied my cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to my CLASSPATH, I could access YARN and HDFS remotely without any problems (read: modulo network/firewall issues), which I was pretty surprised by myself. Tobias
Re: Is spark streaming +MlLib for online learning?
Thank you Tobias! On Mon, Nov 24, 2014 at 5:13 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Nov 25, 2014 at 9:40 AM, Joanne Contact joannenetw...@gmail.com wrote: I seemed to read somewhere that spark is still batch learning, but spark streaming could allow online learning. Spark doesn't do Machine Learning itself, but MLlib does. MLlib currently can do online learning only for linear regression https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html#streaming-linear-regression, as far as I know. Tobias
Negative Accumulators
Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
I am running it in local. How can I use the built version (in local mode) so that I can use the native libraries? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p19705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark performance optimization examples
Hi, Is there any document that provides some guidelines with some examples that illustrate when different performance optimizations would be useful? I am interested in knowing the guidelines for using optimizations like cache(), persist(), repartition(), coalesce(), and broadcast variables. I studied the online programming guide, but I would like some more details (something along the lines of Aaron Davidson's talk which illustrates the use of repartition() with an example during the Spark summit). In particular, I have a dataset that is about 1.2TB (about 30 files) that I am trying to load using sc.textFile on a cluster with a total memory of 3TB (170 GB per node). But I am not able to successfully complete the loading. THe program is continuously active in the mapPartitions task but does not get past that even after a long time. I have tried some of the above optimizations. But that has not helped and I am not sure if I am using these optimizations in the right way or which of the above optimizations would be most appropriate to this problem. So I would appreciate any guidelines. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-examples-tp19707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark saveAsText file size
Hi Folks! I'm running a spark JOB on a cluster with 9 slaves and 1 master (250GB RAM, 32 cores each and 1TB of storage each). This job generates 1.200 TB of data on a RDD with 1200 partitions. When I call saveAsTextFile(hdfs://...), spark creates 1200 files named part-000* on HDFS's folder. However, just a few files have content (~450 files has 2.3GB) and all others with no content (0 bytes). Is there any explanation for this file size (2.3GB)? Shouldn't spark saves 1200 files with 1GB each? Thanks in advance. --- Regards, Alan Vidotti Prando.
Re: Negative Accumulators
int overflow? If so, you can use BigInt like this: scala import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam scala :paste// Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0)) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 Best Regards, Shixiong Zhu 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com: Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is Spark? or GraphX runs fast? a performance comparison on Page Rank
Hi All, I started exploring Spark from past 2 months. I'm looking for some concrete features from both Spark and GraphX so that I'll take some decisions what to use, based upon who get highest performance. According to documentation GraphX runs 10x faster than normal Spark. So I run Page Rank algorithm in both the applications: For Spark I used: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala For GraphX I used : https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala Input data : http://snap.stanford.edu/data/soc-LiveJournal1.html (1 Gb in size) No of Iterations : 2 *Time Taken : * Local Mode (Machine : 8 Core; 16 GB memory; 2.80 Ghz Intel i7; Executor Memory: 4Gb, No. of Partition: 50; No. of Iterations: 2); == *Spark Page Rank took - 21.29 mins GraphX Page Rank took - 42.01 mins * Cluster Mode (ubantu 12.4; spark 1.1/hadoop 2.4 cluster ; 3 workers , 1 driver , 8 cores, 30 gb memory) (Executor memory 4gb; No. of edge partitions : 50, random vertex cut ; no. of iteration : 2) = *Spark Page Rank took - 10.54 mins GraphX Page Rank took - 7.54 mins * Could you please help me to determine, when to use Spark and GraphX ? If GraphX took same amount of time than Spark then its better to use Spark because spark has variey of operators to deal with any type of RDD. Any suggestions or feedback or pointers will highly appreciate Thanks, - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-or-GraphX-runs-fast-a-performance-comparison-on-Page-Rank-tp19710.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
Great! Worked like a charm :) On Mon, Nov 24, 2014 at 9:56 PM, Shixiong Zhu zsxw...@gmail.com wrote: int overflow? If so, you can use BigInt like this: scala import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam scala :paste// Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0)) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 Best Regards, Shixiong Zhu 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com: Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mllib native netlib-java/OpenBLAS
You can try recompiling spark with that option, and doing an sbt/sbt publish-local, then change your spark version from 1.1.0 to 1.2.0-SNAPSHOT (assuming you're building from the 1.1 branch) - sbt or maven (whichever you're compiling your app with) will pick up the version of spark that you just built. On Mon, Nov 24, 2014 at 6:31 PM, agg212 alexander_galaka...@brown.edu wrote: I am running it in local. How can I use the built version (in local mode) so that I can use the native libraries? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p19705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark saveAsText file size
In memory cache may be blow up the size of RDD. It's general condition that RDD will take more space in memory than disk. There are options to configure and optimize storage space efficiency in Spark, take a look at this https://spark.apache.org/docs/latest/tuning.html 2014-11-25 10:38 GMT+08:00 Alan Prando a...@scanboo.com.br: Hi Folks! I'm running a spark JOB on a cluster with 9 slaves and 1 master (250GB RAM, 32 cores each and 1TB of storage each). This job generates 1.200 TB of data on a RDD with 1200 partitions. When I call saveAsTextFile(hdfs://...), spark creates 1200 files named part-000* on HDFS's folder. However, just a few files have content (~450 files has 2.3GB) and all others with no content (0 bytes). Is there any explanation for this file size (2.3GB)? Shouldn't spark saves 1200 files with 1GB each? Thanks in advance. --- Regards, Alan Vidotti Prando.
Is there a way to turn on spark eventLog on the worker node?
Hi, I'm going to debug some spark applications on our testing platform. And it would be helpful if we can see the eventLog on the worker node. I've tried to turn on spark.eventLog.enabled and set spark.eventLog.dir parameters on the worker node. However, it doesn't work. I do have event logs on my driver node, and I know how to turn it on. However, the same settings doesn't work on the worker node. Can anyone help me to clarify whether event log is only available on driver node?
Re: Is there a way to turn on spark eventLog on the worker node?
Hello, What exactly are you trying to see? Workers don't generate any events that would be logged by enabling that config option. Workers generate logs, and those are captured and saved to disk by the cluster manager, generally, without you having to do anything. On Mon, Nov 24, 2014 at 7:46 PM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, I'm going to debug some spark applications on our testing platform. And it would be helpful if we can see the eventLog on the worker node. I've tried to turn on spark.eventLog.enabled and set spark.eventLog.dir parameters on the worker node. However, it doesn't work. I do have event logs on my driver node, and I know how to turn it on. However, the same settings doesn't work on the worker node. Can anyone help me to clarify whether event log is only available on driver node? -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a way to turn on spark eventLog on the worker node?
You can set the same parameter when launching an application, if you use sppar-submit tried --conf to give those variables or from SparkConfig also you can set the logs for both driver and workers. - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-turn-on-spark-eventLog-on-the-worker-node-tp19714p19716.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? |jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions | On my machine I got these lines (the first line is the one reported as missing in your case): |org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class | On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. *From:*Cheng Lian [mailto:lian.cs@gmail.com] *Sent:* Friday, November 21, 2014 8:02 PM *To:* Judy Nash; u...@spark.incubator.apache.org *Subject:* Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1)Latest spark 1.2 branch build 2)Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3)Added hive-site.xml to \conf 4)Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 42) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :202) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:230) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv. scala:38) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh riftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr iftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Precondition s at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at
Control number of parquet generated from JavaSchemaRDD
Hello, I am reading around 1000 input files from disk in an RDD and generating parquet. It always produces same number of parquet files as number of input files. I tried to merge them using rdd.coalesce(n) and/or rdd.repatition(n). also tried using: int MB_128 = 128*1024*1024; sc.hadoopConfiguration().setInt(dfs.blocksize, MB_128); sc.hadoopConfiguration().setInt(parquet.block.size, MB_128); No luck. Is there a way to control the size/number of parquet files generated? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to access application name in the spark framework code.
Hi, When I submit a spark application like this: ./bin/spark-submit --class org.apache.spark.examples.SparkKMeans --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar /k-means 4 0.001 Which part of the spark framework code deals with the name of the application?. Basically, I want to access the name of the application in the spark scheduler code. Can someone please tell me where I should look for the code that deals with the name of the currently executing application (say, SparkKMeans)? Thank you.
Re: advantages of SparkSQL?
For the “never register a table” part, actually you /can/ use Spark SQL without registering a table via its DSL. Say you’re going to extract an |Int| field named |key| from the table and double it: |import org.apache.sql.catalyst.dsl._ val data = sqc.parquetFile(path) val double = (i:Int) = i *2 data.select(double.call('key) as'result).collect() | |SchemaRDD.select| constructs a proper SQL logical plan, which makes Spark SQL aware of the schema and enables Parquet fcolumn pruning optimization. The |double.call('key)| part is the expression DSL, which turns a plain Scala function into a Spark SQL UDF, and applies this UDF to the |key| field. Notice that the |.call| method is only available in the most recent master and branch-1.2. On 11/25/14 5:19 AM, Michael Armbrust wrote: Akshat is correct about the benefits of parquet as a columnar format, but I'll add that some of this is lost if you just use a lambda function to process the data. Since your lambda function is a black box Spark SQL does not know which columns it is going to use and thus will do a full tablescan. I'd suggest writing a very simple SQL query that pulls out just the columns you need and does any filtering before dropping back into standard spark operations. The result of SQL queries is an RDD of rows so you can do any normal spark processing you want on them. Either way though it will often be faster than a text filed due to better encoding/compression. On Mon, Nov 24, 2014 at 8:54 AM, Akshat Aranya aara...@gmail.com mailto:aara...@gmail.com wrote: Parquet is a column-oriented format, which means that you need to read in less data from the file system if you're only interested in a subset of your columns. Also, Parquet pushes down selection predicates, which can eliminate needless deserialization of rows that don't match a selection criterion. Other than that, you would also get compression, and likely save processor cycles when parsing lines from text files. On Mon, Nov 24, 2014 at 8:20 AM, mrm ma...@skimlinks.com mailto:ma...@skimlinks.com wrote: Hi, Is there any advantage to storing data as a parquet format, loading it using the sparkSQL context, but never registering as a table/using sql on it? Something like: Something like: data = sqc.parquetFile(path) results = data.map(lambda x: applyfunc(x.field)) Is this faster/more optimised than having the data stored as a text file and using Spark (non-SQL) to process it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: How to access application name in the spark framework code.
Hi, I think it should be accessible via the SparkConf in the SparkContext. Something like sc.getConf().get(spark.app.name)? Thanks, Deng On Tue, Nov 25, 2014 at 12:40 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, When I submit a spark application like this: ./bin/spark-submit --class org.apache.spark.examples.SparkKMeans --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar /k-means 4 0.001 Which part of the spark framework code deals with the name of the application?. Basically, I want to access the name of the application in the spark scheduler code. Can someone please tell me where I should look for the code that deals with the name of the currently executing application (say, SparkKMeans)? Thank you. -- Maria Odea Deng Ching-Mallete | och...@apache.org | http://www.linkedin.com/in/oching
Edge List File in GraphX
Hi, Is it necessary for every vertex to have an attribute when we load a graph to GraphX? In other words, if I have an edge list file containing pairs of vertices i.e., 1 2 means that there is an edge between node 1 and node 2. Now, when I run PageRank on this data it return a NaN. Can I use this type of data for any algorithm on GraphX? Thank You
Re: New Codes in GraphX
Could it be because my edge list file is in the form (1 2), where there is an edge between node 1 and node 2? On Tue, Nov 18, 2014 at 4:13 PM, Ankur Dave ankurd...@gmail.com wrote: At 2014-11-18 15:51:52 +0530, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes the above command works, but there is this problem. Most of the times, the total rank is Nan (Not a Number). Why is it so? I've also seen this, but I'm not sure why it happens. If you could find out which vertices are getting the NaN rank, it might be helpful in tracking down the problem. Ankur
Re: Spark performance optimization examples
Here's the tuning guidelines if you haven't seen it already. http://spark.apache.org/docs/latest/tuning.html You could try the following to get it loaded: - Use kryo Serialization http://spark.apache.org/docs/latest/tuning.html#data-serialization - Enable RDD Compression - Set Storage level to MEMORY_AND_DISK_SER Thanks Best Regards On Tue, Nov 25, 2014 at 8:02 AM, SK skrishna...@gmail.com wrote: Hi, Is there any document that provides some guidelines with some examples that illustrate when different performance optimizations would be useful? I am interested in knowing the guidelines for using optimizations like cache(), persist(), repartition(), coalesce(), and broadcast variables. I studied the online programming guide, but I would like some more details (something along the lines of Aaron Davidson's talk which illustrates the use of repartition() with an example during the Spark summit). In particular, I have a dataset that is about 1.2TB (about 30 files) that I am trying to load using sc.textFile on a cluster with a total memory of 3TB (170 GB per node). But I am not able to successfully complete the loading. THe program is continuously active in the mapPartitions task but does not get past that even after a long time. I have tried some of the above optimizations. But that has not helped and I am not sure if I am using these optimizations in the right way or which of the above optimizations would be most appropriate to this problem. So I would appreciate any guidelines. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-examples-tp19707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to insert complex types like mapstring,mapstring,int in spark sql
Thanks for the reply Micheal here is the stack trace org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) [info] org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) [info] org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) [info] org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) [info] org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) [info] org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66) [info] org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50) [info] scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [info] scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [info] org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149) [info] org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) [info] org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) [info] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [info] org.apache.spark.scheduler.Task.run(Task.scala:54) [info] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) [info] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [info] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [info] java.lang.Thread.run(Thread.java:745) [info] Driver stacktrace: [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) [info] at scala.Option.foreach(Option.scala:236) [info] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603p19728.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
This is what I got from jar tf: org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class I seem to have the line that reported missing, but I am missing this file: com/google/inject/internal/util/$Preconditions.class Any suggestion on how to fix this? Very much appreciate the help as I am very new to Spark and open source technologies. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, November 24, 2014 8:24 PM To: Judy Nash; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hm, I tried exactly the same commit and the build command locally, but couldn’t reproduce this. Usually this kind of errors are caused by classpath misconfiguration. Could you please try this to ensure corresponding Guava classes are included in the assembly jar you built? jar tf assembly/target/scala-2.10/spark-assembly-1.2.1-SNAPSHOT-hadoop2.4.0.jar | grep Preconditions On my machine I got these lines (the first line is the one reported as missing in your case): org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class On 11/25/14 6:25 AM, Judy Nash wrote: Thank you Cheng for responding. Here is the commit SHA1 on the 1.2 branch I saw this failure in: commit 6f70e0295572e3037660004797040e026e440dbd Author: zsxwing zsxw...@gmail.commailto:zsxw...@gmail.com Date: Fri Nov 21 00:42:43 2014 -0800 [SPARK-4472][Shell] Print Spark context available as sc. only when SparkContext is created... ... successfully It's weird that printing Spark context available as sc when creating SparkContext unsuccessfully. Let me know if you need anything else. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Friday, November 21, 2014 8:02 PM To: Judy Nash; u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Hi Judy, could you please provide the commit SHA1 of the version you're using? Thanks! On 11/22/14 11:05 AM, Judy Nash wrote: Hi, Thrift server is failing to start for me on latest spark 1.2 branch. I got the error below when I start thrift server. Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314)…. Here is my setup: 1) Latest spark 1.2 branch build 2) Used build command: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package 3) Added hive-site.xml to \conf 4) Version on the box: Hive 0.13, Hadoop 2.4 Is this a real bug or am I doing something wrong? --- Full Stacktrace: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/bas e/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:314) at org.apache.hadoop.conf.Configuration$DeprecationDelta.init(Configur ation.java:327) at org.apache.hadoop.conf.Configuration.clinit(Configuration.java:409) at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU til.scala:82) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 42) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :202) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:230) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv. scala:38) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh riftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr iftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)