Command exited with code 137
I use standalone mode submit task.But often,I got an error.The stacktrace as 2014-06-12 11:37:36,578 [INFO] [org.apache.spark.Logging$class] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18] - Executor updated: app-20140612092238-0007/0 is now FAILED (Command exited with code 137) 2014-06-12 11:37:36,670 [INFO] [org.apache.spark.Logging$class] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18] - Executor app-20140612092238-0007/0 removed: Command exited with code 137 2014-06-12 11:37:36,673 [INFO] [org.apache.spark.Logging$cla0ss] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-15] - Executor 0 disconnected, so removing it 2014-06-12 11:37:36,682 [ERROR] [org.apache.spark.Logging$class] [Method:logError] [Line:65] [Thread:spark-akka.actor.default-dispatcher-15] - Lost executor 0 on tj-hadoop-1.certus.com: Unknown executor exit code (137) (died from signal 9?) spark config is spark_worker_timeout=300 spark_akka_timeout=500 spark_akka_frameSize=1000 spark_akka_num_retries=30 spark_akka_askTimeout=300 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Command-exited-with-code-137-tp7557.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
openstack swift integration with Spark
If you are interested in openstack/swift integration with Spark, please drop me a line. We are looking into improving the integration. Thanks.
Spark 1.0.0 on yarn cluster problem
With the yarn-client mode,I submit a job from client to yarn,and the spark file spark-env.sh: export HADOOP_HOME=/usr/lib/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=4 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=1G SPARK_DRIVER_MEMORY=2G SPARK_YARN_APP_NAME=Spark 1.0.0 the command line and the result: $export JAVA_HOME=/usr/java/jdk1.7.0_45/ $export PATH=$JAVA_HOME/bin:$PATH $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client ./bin/spark-submit: line 44: /usr/lib/spark/bin/spark-class: Success How can I do with it? The yarn only accept the job but it cannot give memory to the job.Why? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-on-yarn-cluster-problem-tp7560.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Convert text into tfidf vectors for Classification
Hi all, I wanted to perform Text Classification using Spark1.0 Naïve Bayes. I was looking for the way to convert text into sparse vector with TFIDF weighting scheme. I found that MLI library supports that but it is compatible with Spark 0.8. What are all the options available to achieve text vectorization. Is there any pre-built api's which can be used or other way in which we can achieve this Please suggest Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: Convert text into tfidf vectors for Classification
You can create tf vectors and then use RowMatrix.computeColumnSummaryStatistics to get df (numNonzeros). For tokenizer and stemmer, you can use scalanlp/chalk. Yes, it is worth having a simple interface for it. -Xiangrui On Fri, Jun 13, 2014 at 1:21 AM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi all, I wanted to perform Text Classification using Spark1.0 Naïve Bayes. I was looking for the way to convert text into sparse vector with TFIDF weighting scheme. I found that MLI library supports that but it is compatible with Spark 0.8. What are all the options available to achieve text vectorization. Is there any pre-built api’s which can be used or other way in which we can achieve this Please suggest Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: wholeTextFiles() : java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
Hi guys, I ran into the same exception (while trying the same example), and after overriding hadoop-client artifact in my pom.xml, I got another error (below). System config: ubuntu 12.04 intellijj 13. scala 2.10.3 maven: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.4.0/version /dependency Any idea why spark 1.0 is incompatible with Hadoop 2? Thanks for your support in advance! Exception in thread main java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:791) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:423) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:356) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:186) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:27) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:61) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:171) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094) at org.apache.spark.rdd.RDD.collect(RDD.scala:717) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-java-lang-IncompatibleClassChangeError-Found-class-org-apache-hadoop-mapreduce-TaskAtd-tp6818p7563.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
list of persisted rdds
Hi, How do I check the rdds that I have persisted? I have some code that looks like: rd1.cache() rd2.cache() ... rdN.cache() How can I unpersist all rdd's at once? And is it possible to get the names of the rdd's that are currently persisted (list = rd1, rd2, ..., rdN)? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
list of persisted rdds
Hi, How do I check the rdds that I have persisted? I have some code that looks like: rd1.cache() rd2.cache() ... rdN.cache() How can I unpersist all rdd's at once? And is it possible to get the names of the rdd's that are currently persisted (list = rd1, rd2, ..., rdN)? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7565.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: list of persisted rdds
Check out SparkContext.getPersistentRDDs! On Fri, Jun 13, 2014 at 1:06 PM, mrm ma...@skimlinks.com wrote: Hi, How do I check the rdds that I have persisted? I have some code that looks like: rd1.cache() rd2.cache() ... rdN.cache() How can I unpersist all rdd's at once? And is it possible to get the names of the rdd's that are currently persisted (list = rd1, rd2, ..., rdN)? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7565.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)
Hi, I see this has been asked before but has not gotten any satisfactory answer so I'll try again: (here is the original thread I found: http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E ) I have a set of workers dying and coming back again. The master prints the following warning: Got heartbeat from unregistered worker What is the solution to this -- rolling the master is very undesirable to me as I have a Shark context sitting on top of it (it's meant to be highly available). Insights appreciated -- I don't think an executor going down is very unexpected but it does seem odd that it won't be able to rejoin the working set. I'm running Spark 0.9.1 on CDH
Re: list of persisted rdds
Hi Daniel, Thank you for your help! This is the sort of thing I was looking for. However, when I type sc.getPersistentRDDs, i get the error AttributeError: 'SparkContext' object has no attribute 'getPersistentRDDs'. I don't get any error when I type sc.defaultParallelism for example. I would appreciate it if you could help me with this, I have tried different ways and googling it! I suspect it might be a silly error but I can't figure it out. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7569.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: wholeTextFiles not working with HDFS
My exception stack looks about the same. java.io.FileNotFoundException: File /user/me/target/capacity-scheduler.xml does not exist. at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:397) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:173) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094) at org.apache.spark.rdd.RDD.collect(RDD.scala:717) I'm using Hadoop 1.2.1, and everything else I've tried in Spark with that version has worked, so I doubt it's a version error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7570.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
BUG? Why does MASTER have to be set to spark://hostname:port?
Hi, all When I try to run Spark PageRank using: ./bin/spark-submit \ --master spark://192.168.1.12:7077 \ --class org.apache.spark.examples.bagel.WikipediaPageRank \ ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar \ hdfs://192.168.1.12:9000/freebase-13G 0.05 100 True *I don't specify the Spark Master by SparkContext.setMaster() in PageRank code.* Unfortunately, it hanged on here: 14/06/13 22:09:43 INFO DAGScheduler: Submitting 104 missing tasks from Stage 0 (MappedRDD[1] at textFile at WikipediaPageRank.scala:59) 14/06/13 22:09:43 INFO TaskSchedulerImpl: Adding task set 0.0 with 104 tasks 14/06/13 22:09:58 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory But after I change --master to hostname:7077, it works normally. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: how to set spark.executor.memory and heap size
Hi, Laurent You could set Spark.executor.memory and heap size by following methods: 1. in you conf/spark-env.sh: *export SPARK_WORKER_MEMORY=38g* *export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m* 2. you could also add modification for executor memory and java opts in *spark-submit *parameters. Check the Spark *configure *and *tuning *docs, you could find full answers there. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jun 12, 2014 at 6:29 PM, Laurent T laurent.thou...@ldmobile.net wrote: Hi, Can you give us a little more insight on how you used that file to solve your problem ? We're having the same OOM as you were and haven't been able to solve it yet. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p7469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Transform K,V pair to a new K,V pair
Hi, You can use map functions like flatmapValues and mapValues, which will apply the map fucntion on each pairRDD contained in your input pairDstreamK,V and returns the paired DstreamK,V On Fri, Jun 13, 2014 at 8:48 AM, ryan_seq [via Apache Spark User List] ml-node+s1001560n7550...@n3.nabble.com wrote: Hi, Is there any function avialable in spark streaming to transform a K,V pair to a new K,V pair. Looking for a function that takes both the key and value as input to return a new tuple -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Transform-K-V-pair-to-a-new-K-V-pair-tp7550.html To start a new topic under Apache Spark User List, email ml-node+s1001560n...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=bGFsaXRAc2lnbW9pZGFuYWx5dGljcy5jb218MXwtMTIwMzUwMjA2MQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- -- Thanks Regards, Lalit Yadav +91-9901007692 - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Transform-K-V-pair-to-a-new-K-V-pair-tp7550p7573.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: list of persisted rdds
val myRdds = sc.getPersistentRDDs assert(myRdds.size === 1) It'll return a map. Its pretty old 0.8.0 onwards. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 9:42 AM, mrm ma...@skimlinks.com wrote: Hi Daniel, Thank you for your help! This is the sort of thing I was looking for. However, when I type sc.getPersistentRDDs, i get the error AttributeError: 'SparkContext' object has no attribute 'getPersistentRDDs'. I don't get any error when I type sc.defaultParallelism for example. I would appreciate it if you could help me with this, I have tried different ways and googling it! I suspect it might be a silly error but I can't figure it out. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7569.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)
I have also had trouble in worker joining the working set. I have typically moved to Mesos based setup. Frankly for high availability you are better off using a cluster manager. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 8:57 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, I see this has been asked before but has not gotten any satisfactory answer so I'll try again: (here is the original thread I found: http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E ) I have a set of workers dying and coming back again. The master prints the following warning: Got heartbeat from unregistered worker What is the solution to this -- rolling the master is very undesirable to me as I have a Shark context sitting on top of it (it's meant to be highly available). Insights appreciated -- I don't think an executor going down is very unexpected but it does seem odd that it won't be able to rejoin the working set. I'm running Spark 0.9.1 on CDH
Re: multiple passes in mapPartitions
Sorry if this is a dumb question but why not several calls to map-partitions sequentially. Are you looking to avoid function serialization or is your function damaging partitions? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 1:30 AM, zhen z...@latrobe.edu.au wrote: I want to take multiple passes through my data in mapPartitions. However, the iterator only allows you to take one pass through the data. If I transformed the iterator into an array using iter.toArray, it is too slow, since it copies all the data into a new scala array. Also it takes twice the memory. Which is also bad in terms of more GC. Is there a faster/better way of taking multiple passes without copying all the data? Thank you, Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: list of persisted rdds
This appears to be missing from PySpark. Reported in SPARK-2141 https://issues.apache.org/jira/browse/SPARK-2141. On Fri, Jun 13, 2014 at 10:43 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: val myRdds = sc.getPersistentRDDs assert(myRdds.size === 1) It'll return a map. Its pretty old 0.8.0 onwards. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 9:42 AM, mrm ma...@skimlinks.com wrote: Hi Daniel, Thank you for your help! This is the sort of thing I was looking for. However, when I type sc.getPersistentRDDs, i get the error AttributeError: 'SparkContext' object has no attribute 'getPersistentRDDs'. I don't get any error when I type sc.defaultParallelism for example. I would appreciate it if you could help me with this, I have tried different ways and googling it! I suspect it might be a silly error but I can't figure it out. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7569.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: specifying fields for join()
You can resolve the columns to create keys using them.. then join. Is that what you did? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 12, 2014 at 9:24 PM, SK skrishna...@gmail.com wrote: This issue is resolved. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Java Custom Receiver onStart method never called
I just forgot to call start on the context. Works now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Custom-Receiver-onStart-method-never-called-tp7525p7579.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: list of persisted rdds
Hi Nick, Thank you for the reply, I forgot to mention I was using pyspark in my first message. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7581.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
process local vs node local subtlety question/issue
There is probably a subtlety between the ability to run tasks with data process-local and node-local that I think I'm missing. I'm doing a basic test which is the following: 1) Copy a large text file from the local file system into HDFS using hadoop fs -copyFromLocal 2) Run Spark's wordcount example against the text file in HDFS Sometimes when I run, tasks are executed with the data presumably being process-local, such as the below when it starts up running tasks 0 through 3. 14/06/12 08:02:31 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: apex90.llnl.gov (PROCESS_LOCAL) 14/06/12 08:02:31 INFO TaskSetManager: Serialized task 1.0:0 as 2458 bytes in 2 ms 14/06/12 08:02:31 INFO TaskSetManager: Starting task 1.0:1 as TID 1 on executor 0: apex90.llnl.gov (PROCESS_LOCAL) 14/06/12 08:02:31 INFO TaskSetManager: Serialized task 1.0:1 as 2458 bytes in 0 ms 14/06/12 08:02:31 INFO TaskSetManager: Starting task 1.0:2 as TID 2 on executor 0: apex90.llnl.gov (PROCESS_LOCAL) 14/06/12 08:02:31 INFO TaskSetManager: Serialized task 1.0:2 as 2458 bytes in 0 ms 14/06/12 08:02:31 INFO TaskSetManager: Starting task 1.0:3 as TID 3 on executor 0: apex90.llnl.gov (PROCESS_LOCAL) 14/06/12 08:02:31 INFO TaskSetManager: Serialized task 1.0:3 as 2458 bytes in 0 ms sometimes almost all the tasks in the job run process-local, sometimes it goes to node-local / node-any somewhere in the middle. Other times (more commonly when I run this test with higher node counts), the tasks are always run with data presumably node-local, such as the below when it starts up running tasks 0 through 3. 14/06/11 22:58:38 INFO TaskSetManager: Starting task 1.0:21 as TID 0 on executor 5: apex80.llnl.gov (NODE_LOCAL) 14/06/11 22:58:38 INFO TaskSetManager: Serialized task 1.0:21 as 2458 bytes in 2 ms 14/06/11 22:58:38 INFO TaskSetManager: Starting task 1.0:1 as TID 1 on executor 27: apex78.llnl.gov (NODE_LOCAL) 14/06/11 22:58:38 INFO TaskSetManager: Serialized task 1.0:1 as 2458 bytes in 1 ms 14/06/11 22:58:38 INFO TaskSetManager: Starting task 1.0:3 as TID 2 on executor 14: apex82.llnl.gov (NODE_LOCAL) 14/06/11 22:58:38 INFO TaskSetManager: Serialized task 1.0:3 as 2458 bytes in 0 ms 14/06/11 22:58:38 INFO TaskSetManager: Starting task 1.0:11 as TID 3 on executor 15: apex105.llnl.gov (NODE_LOCAL) 14/06/11 22:58:38 INFO TaskSetManager: Serialized task 1.0:11 as 2458 bytes in 0 ms As expected, tasks run slower on node-local than the process-local tasks, and subsequently those jobs run slower. So my question 1) How is this data process-local? I *just* copied it into HDFS. No spark worker or executor should have loaded it. Between runs I delete the data from HDFS, delete the Spark local dir where data is cached, and restart the Spark daemons. I've seen the behavior with Spark 0.9.1 and 1.0.0, although with different varying node counts. My environment is a bit unique, where I run HDFS over a parallel networked file system, but I think what I'm seeing should be independent of that. I'm sure there's something subtle I'm missing or not understanding, thanks in advance. Al -- Albert Chu ch...@llnl.gov Computer Scientist High Performance Systems Division Lawrence Livermore National Laboratory
Re: list of persisted rdds
Yeah, unfortunately PySpark still lags behind the Scala API a bit, but it's being patched up at a good pace. On Fri, Jun 13, 2014 at 1:43 PM, mrm ma...@skimlinks.com wrote: Hi Nick, Thank you for the reply, I forgot to mention I was using pyspark in my first message. Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7564p7581.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spilled shuffle files not being cleared
Thanks Saisai, I think I will just try lowering my spark.cleaner.ttl value - I've set it to an hour. On Thu, Jun 12, 2014 at 7:32 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Michael, I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata cleaner, which will clean old un-used shuffle data when it is timeout. For Spark 1.0 another way is to clean shuffle data using weak reference (reference tracking based, configuration is spark.cleaner.referenceTracking), and it is enabled by default. Thanks Saisai *From:* Michael Chang [mailto:m...@tellapart.com] *Sent:* Friday, June 13, 2014 10:15 AM *To:* user@spark.apache.org *Subject:* Re: Spilled shuffle files not being cleared Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
Re: How to achieve reasonable performance on Spark Streaming?
I'm interested in this issue as well. I have spark streaming jobs that seems to run well for a while, but slowly degrade and don't recover. On Wed, Jun 11, 2014 at 11:08 PM, Boduo Li onpo...@gmail.com wrote: It seems that the slow reduce tasks are caused by slow shuffling. Here is the logs regarding one slow reduce task: 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_88_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_89_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_90_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_91_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_92_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_93_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_94_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_95_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_96_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_97_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_188_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_189_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_190_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_191_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_192_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_193_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_194_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_195_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_196_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_197_18 after 5029 ms 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is 1143 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to driver 14/06/11 23:42:45 INFO Executor: Finished task ID 23643 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
MLlib-a problem of example code for L-BFGS
Hi All, I'm new to Spark. Just tried out the example code on Spark website for L-BFGS. But the code val model = new LogisticRegressionModel(... gave me an error: console:19: error: constructor LogisticRegressionModel in class LogisticRegres sionModel cannot be accessed in class $iwC val model = new LogisticRegressionModel( ^ Then I checked the source code on github about the definition of the class LogisticRegressionModel. It says: It appears the reason is it has private[mllib] in the definition so access is restricted and it does not have a constructor either. So that's a contradiction. Thanks, BR, Congrui -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-a-problem-of-example-code-for-L-BFGS-tp7589.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to specify executor memory in EC2 ?
Aaron, spark.executor.memory is set to 2454m in my spark-defaults.conf, which is a reasonable value for EC2 instances which I use (they are m3.medium machines). However, it doesn't help and each executor uses only 512 MB of memory. To figure out why, I examined spark-submit and spark-class scripts and found that java options and java memory size are computed in the spark-class script (see OUR_JAVA_OPTS and OUR_JAVA_MEM variables in that script). Then these values are used to compose the following string: JAVA_OPTS=$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM Note that OUR_JAVA_MEM is appended to the end of the string. For some reason which I haven't found yet, OUR_JAVA_MEM is set to its default value - 512 MB. I was able to fix it only by setting the SPARM_MEM variable in the spark-env.sh file: export SPARK_MEM=2g However, this variable is deprecated, so my solution doesn't seem to be good :) On Thu, Jun 12, 2014 at 10:16 PM, Aaron Davidson ilike...@gmail.com wrote: The scripts for Spark 1.0 actually specify this property in /root/spark/conf/spark-defaults.conf I didn't know that this would override the --executor-memory flag, though, that's pretty odd. On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Yes, I am launching a cluster with the spark_ec2 script. I checked /root/spark/conf/spark-env.sh on the master node and on slaves and it looks like this: #!/usr/bin/env bash export SPARK_LOCAL_DIRS=/mnt/spark # Standalone cluster options export SPARK_MASTER_OPTS= export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=1 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=ec2-54-89-95-238.compute-1.amazonaws.com export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf # Bind Spark's web UIs to this machine's public EC2 hostname: export SPARK_PUBLIC_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname` http://169.254.169.254/latest/meta-data/public-hostname # Set a high ulimit for large shuffles ulimit -n 100 None of these variables seem to be related to memory size. Let me know if I am missing something. On Thu, Jun 12, 2014 at 7:17 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you launching this using our EC2 scripts? Or have you set up a cluster by hand? Matei On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: spark-env.sh doesn't seem to contain any settings related to memory size :( I will continue searching for a solution and will post it if I find it :) Thank you, anyway On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com wrote: It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: specifying fields for join()
I used groupBy to create the keys for both RDDs. Then I did the join. I think though it be useful if in the future Spark could allows us to specify the fields on which to join, even when the keys are different. Scalding allows this feature. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7591.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark-submit fails to get jar from http source
I'm running a 1.0.0 standalone cluster based on amplab/dockerscripts with 3 workers. I'm testing out spark-submit and I'm getting errors using *--deploy-mode cluster* and using an http:// url to my JAR. I'm getting the following error back. Sending launch command to spark://master:7077 Driver successfully submitted as driver-20140613191831-0009 ... waiting before polling master for driver state ... polling master for driver state State of driver-20140613191831-0009 is ERROR Exception from cluster was: java.io.IOException: No FileSystem for scheme: http I verified that my jar URL is accessible from within the spark nodes (workers and master). I also ran with the same URL and *--deploy-mode client*, and things worked. Documentation at http://spark.apache.org/docs/latest/submitting-applications.html in section /Advanced Dependency Management/ suggest that this should work. Is this a known issue, or are my expectations wrong? PS - I find it very limiting that spark-submit would not take care of uploading my jar to the cluster. This is a fundamental requirement that most frameworks support (i.e Storm, Hadoop, etc.). I do not consider this to be a requirement specific to the JobServer work, rather part of the master's api. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-fails-to-get-jar-from-http-source-tp7592.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLlib-a problem of example code for L-BFGS
Hi Congrui, Since it's private in mllib package, one workaround will be write your code in scala file with mllib package in order to use the constructor of LogisticRegressionModel. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jun 13, 2014 at 11:50 AM, Congrui Yi fixed-term.congrui...@us.bosch.com wrote: Hi All, I'm new to Spark. Just tried out the example code on Spark website for L-BFGS. But the code val model = new LogisticRegressionModel(... gave me an error: console:19: error: constructor LogisticRegressionModel in class LogisticRegres sionModel cannot be accessed in class $iwC val model = new LogisticRegressionModel( ^ Then I checked the source code on github about the definition of the class LogisticRegressionModel. It says: It appears the reason is it has private[mllib] in the definition so access is restricted and it does not have a constructor either. So that's a contradiction. Thanks, BR, Congrui -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-a-problem-of-example-code-for-L-BFGS-tp7589.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Command exited with code 137
I've seen these caused by the OOM killer. I recommend checking /var/log/syslog to see if it was activated due to lack of system memory. On Thu, Jun 12, 2014 at 11:45 PM, libl 271592...@qq.com wrote: I use standalone mode submit task.But often,I got an error.The stacktrace as 2014-06-12 11:37:36,578 [INFO] [org.apache.spark.Logging$class] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18] - Executor updated: app-20140612092238-0007/0 is now FAILED (Command exited with code 137) 2014-06-12 11:37:36,670 [INFO] [org.apache.spark.Logging$class] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18] - Executor app-20140612092238-0007/0 removed: Command exited with code 137 2014-06-12 11:37:36,673 [INFO] [org.apache.spark.Logging$cla0ss] [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-15] - Executor 0 disconnected, so removing it 2014-06-12 11:37:36,682 [ERROR] [org.apache.spark.Logging$class] [Method:logError] [Line:65] [Thread:spark-akka.actor.default-dispatcher-15] - Lost executor 0 on tj-hadoop-1.certus.com: Unknown executor exit code (137) (died from signal 9?) spark config is spark_worker_timeout=300 spark_akka_timeout=500 spark_akka_frameSize=1000 spark_akka_num_retries=30 spark_akka_askTimeout=300 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Command-exited-with-code-137-tp7557.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLlib-a problem of example code for L-BFGS
Hi DB, Thank you for the help! I'm new to this, so could you give a bit more details how this could be done? Sincerely, Congrui Yi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-a-problem-of-example-code-for-L-BFGS-tp7589p7596.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming not processing file with particular number of entries
This is very odd. If it is running fine on mesos, I dont see a obvious reason why it wont work on Spark standalone cluster. Is the .4 million file already present in the monitored directory when the context is started? In that case, the file will not be picked up (unless textFileStream is created with that option to process existing files set). By default, only new files in a directory are picked up. TD On Tue, Jun 10, 2014 at 11:35 PM, praveshjain1991 praveshjain1...@gmail.com wrote: Well i was able to get it to work by running spark over mesos. But it looks like a bug while running spark alone. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7382.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)
I get the same problem, but I'm running in a dev environment based on docker scripts. The additional issue is that the worker processes do not die and so the docker container does not exit. So I end up with worker containers that are not participating in the cluster. On Fri, Jun 13, 2014 at 9:44 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: I have also had trouble in worker joining the working set. I have typically moved to Mesos based setup. Frankly for high availability you are better off using a cluster manager. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 13, 2014 at 8:57 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, I see this has been asked before but has not gotten any satisfactory answer so I'll try again: (here is the original thread I found: http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E ) I have a set of workers dying and coming back again. The master prints the following warning: Got heartbeat from unregistered worker What is the solution to this -- rolling the master is very undesirable to me as I have a Shark context sitting on top of it (it's meant to be highly available). Insights appreciated -- I don't think an executor going down is very unexpected but it does seem odd that it won't be able to rejoin the working set. I'm running Spark 0.9.1 on CDH
Re: Spark Streaming not processing file with particular number of entries
There doesn't seem to be any obvious reason - that's why it looks like a bug. The .4 million file is present in the directory when the context is started - same as for all other files (which are processed just fine by the application). In the logs we can see that the file is being picked up by the app but it is not starting the tasks that are supposed to begin. -- Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7599.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How Spark Choose Worker Nodes for respective HDFS block
Hi All I am new to Spark, workin on 3 node test cluster. I am trying to explore Spark scope in analytics, my Spark codes interacts with HDFS mostly. I have a confusion that how Spark choose on which node it will distribute its work. Since we assume that it can be an alternative to Hadoop MapReduce. In MapReduce we know that internally framework will distribute code (or logic) to the nearest TaskTracker which are co-located with DataNode or in same rack or probably nearest to the data blocks. My confusion is when I give HDFS path inside a Spark program how it choose which node is nearest (if it does). If it does not then how it will work when I have TBs of data where high network latency will be involved. My apologies for asking basic question, please suggest. TIA -- Anish Sneh Experience is the best teacher. http://www.anishsneh.com
Re: Spark Streaming not processing file with particular number of entries
If you look at the file 400k.output, you'll see the string file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt This file contains 0.4 mn records. So the file is being picked up but the app goes on to hang later on. Also you mentioned the term Standalone cluster in your previous reply which i would like to clarify - I am running spark in clustered mode (over a 3 node cluster). -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7602.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: process local vs node local subtlety question/issue
On Fri, Jun 13, 2014 at 1:55 PM, Albert Chu ch...@llnl.gov wrote: 1) How is this data process-local? I *just* copied it into HDFS. No spark worker or executor should have loaded it. Yeah, I thought that PROCESS_LOCAL meant the data was already in the JVM on the worker node, but I do see the same thing for data loaded from S3... Nick
convert List to RDD
Hi, I have a List[ (String, Int, Int) ] that I would liek to convert to an RDD. I tried to use sc.parallelize and sc.makeRDD, but in each case the original order of items in the List gets modified. Is there a simple way to convert a List to RDD without using SparkContext? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: convert List to RDD
I may be wrong, but I think RDDs must be created inside a SparkContext. To somehow preserve the order of the list, perhaps you could try something like: sc.parallelize((1 to xs.size).zip(xs)) On Fri, Jun 13, 2014 at 6:08 PM, SK skrishna...@gmail.com wrote: Hi, I have a List[ (String, Int, Int) ] that I would liek to convert to an RDD. I tried to use sc.parallelize and sc.makeRDD, but in each case the original order of items in the List gets modified. Is there a simple way to convert a List to RDD without using SparkContext? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark master UI does not keep detailed application history
I have been trying to get detailed history of previous spark shell executions (after exiting spark shell). In standalone mode and Spark 1.0, I think the spark master UI is supposed to provide detailed execution statistics of all previously run jobs. This is supposed to be viewable by clicking on the Spark shell link for each job. But when I click link Spark shell nothing happens. Am I missing something? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-UI-does-not-keep-detailed-application-history-tp7608.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: convert List to RDD
Thanks. But that did not work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606p7609.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: convert List to RDD
Sorry I wasn't being clear. The idea off the top of my head was that you could append an original position index to each element (using the line above), and modified what ever processing functions you have in mind to make them aware of these indices. And I think you are right that RDD collections are unordered by default. On Fri, Jun 13, 2014 at 6:33 PM, SK skrishna...@gmail.com wrote: Thanks. But that did not work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/convert-List-to-RDD-tp7606p7609.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
printing in unit test
Hi, My unit test is failing (the output is not matching the expected output). I would like to printout the value of the output. But rdd.foreach(r=println(r)) does not work from the unit test. How can I print or write out the output to a file/screen? thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/printing-in-unit-test-tp7611.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: guidance on simple unit testing with Spark
You need to factor your program so that it’s not just a main(). This is not a Spark-specific issue, it’s about how you’d unit test any program in general. In this case, your main() creates a SparkContext, so you can’t pass one from outside, and your code has to read data from a file and write it to a file. It would be better to move your code for transforming data into a new function: def processData(lines: RDD[String]): RDD[String] = { // build and return your “res” variable } Then you can unit-test this directly on data you create in your program: val myLines = sc.parallelize(Seq(“line 1”, “line 2”)) val result = GetInfo.processData(myLines).collect() assert(result.toSet === Set(“res 1”, “res 2”)) Matei On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote: Hi, I have looked through some of the test examples and also the brief documentation on unit testing at http://spark.apache.org/docs/latest/programming-guide.html#unit-testing, but still dont have a good understanding of writing unit tests using the Spark framework. Previously, I have written unit tests using specs2 framework and have got them to work in Scalding. I tried to use the specs2 framework with Spark, but could not find any simple examples I could follow. I am open to specs2 or Funsuite, whichever works best with Spark. I would like some additional guidance, or some simple sample code using specs2 or Funsuite. My code is provided below. I have the following code in src/main/scala/GetInfo.scala. It reads a Json file and extracts some data. It takes the input file (args(0)) and output file (args(1)) as arguments. object GetInfo{ def main(args: Array[String]) { val inp_file = args(0) val conf = new SparkConf().setAppName(GetInfo) val sc = new SparkContext(conf) val res = sc.textFile(log_file) .map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val aid = (json \ d \ TypeID).extract[Int] val ts = (json \ d \ TimeStamp).extract[Long] val gid = (json \ d \ ID).extract[String] (aid, ts, gid) } ) .groupBy(tup = tup._3) .sortByKey(true) .map(g = (g._1, g._2.map(_._2).max)) res.map(tuple= %s, %d.format(tuple._1, tuple._2)).saveAsTextFile(args(1)) } I would like to test the above code. My unit test is in src/test/scala. The code I have so far for the unit test appears below: import org.apache.spark._ import org.specs2.mutable._ class GetInfoTest extends Specification with java.io.Serializable{ val data = List ( (d: {TypeID = 10, Timestamp: 1234, ID: ID1}), (d: {TypeID = 11, Timestamp: 5678, ID: ID1}), (d: {TypeID = 10, Timestamp: 1357, ID: ID2}), (d: {TypeID = 11, Timestamp: 2468, ID: ID2}) ) val expected_out = List( (ID1,5678), (ID2,2468), ) A GetInfo job should { //* How do I pass data define above as input and output which GetInfo expects as arguments? ** val sc = new SparkContext(local, GetInfo) //*** how do I get the output *** //assuming out_buffer has the output I want to match it to the expected output match expected output in { ( out_buffer == expected_out) must beTrue } } } I would like some help with the tasks marked with in the unit test code above. If specs2 is not the right way to go, I am also open to FunSuite. I would like to know how to pass the input while calling my program from the unit test and get the output. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Multi-dimensional Uniques over large dataset
Hi, Would appreciate insights and wisdom on a problem we are working on: 1. Context: - Given a csv file like: - d1,c1,a1 - d1,c1,a2 - d1,c2,a1 - d1,c1,a1 - d2,c1,a3 - d2,c2,a1 - d3,c1,a1 - d3,c3,a1 - d3,c2,a1 - d3,c3,a2 - d5,c1,a3 - d5,c2,a2 - d5,c3,a2 - Want to find uniques and totals (of the d_ across the c_ and a_ dimensions): - Tot Unique - c1 6 4 - c2 4 4 - c3 2 2 - a1 7 3 - a2 4 3 - a3 2 2 - c1-a1 ... - c1-a2 ... - c1-a3 ... - c2-a1 ... - c2-a2 ... - ... - c3-a3 - Obviously there are millions of records and more attributes/dimensions. So scalability is key 2. We think Spark is a good stack for this problem: Have a few questions: 3. From a Spark substrate perspective, what are some of the optimum transformations things to watch out for ? 4. Is PairRDD the best data representation ? GroupByKey et al is only available for PairRDD. 5. On a pragmatic level, file.map().map() results in RDD. How do I transform it to a PairRDD ? 1. .map(fields = (fields(1), fields(0)) - results in Unit 2. .map(fields = fields(1) - fields(0)) also is not working 3. Both these do not result in a PairRDD 4. Am missing something fundamental. Cheers Have a nice weekend k/
MLLib : Decision Tree with minimum points per node
Hello, I have been playing around with mllib's decision tree library. It is working great, thanks. I have a question regarding overfitting. It appears to me that the current implementation doesn't allows user to specify the minimum number of samples per node. This results in some nodes only contain very few samples, which potentially leads to overfitting. I would like to know if there is workaround or any way to prevent overfitting? Or will decision tree supports min-samples-per-node in future releases? Thanks. Justin
Re: multiple passes in mapPartitions
Thank you for your suggestion. We will try it out and see how it performs. We think the single call to mapPartitions will be faster but we could be wrong. It would be nice to have a clone method on the iterator. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555p7616.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Multi-dimensional Uniques over large dataset
Answered one of my questions (#5) : val pairs = new PairRDDFunctions(RDD) works fine locally. Now I can do groupByKey et al. Am not sure if it is scalable for millions of records memory efficient. heers k/ On Fri, Jun 13, 2014 at 8:52 PM, Krishna Sankar ksanka...@gmail.com wrote: Hi, Would appreciate insights and wisdom on a problem we are working on: 1. Context: - Given a csv file like: - d1,c1,a1 - d1,c1,a2 - d1,c2,a1 - d1,c1,a1 - d2,c1,a3 - d2,c2,a1 - d3,c1,a1 - d3,c3,a1 - d3,c2,a1 - d3,c3,a2 - d5,c1,a3 - d5,c2,a2 - d5,c3,a2 - Want to find uniques and totals (of the d_ across the c_ and a_ dimensions): - Tot Unique - c1 6 4 - c2 4 4 - c3 2 2 - a1 7 3 - a2 4 3 - a3 2 2 - c1-a1 ... - c1-a2 ... - c1-a3 ... - c2-a1 ... - c2-a2 ... - ... - c3-a3 - Obviously there are millions of records and more attributes/dimensions. So scalability is key 2. We think Spark is a good stack for this problem: Have a few questions: 3. From a Spark substrate perspective, what are some of the optimum transformations things to watch out for ? 4. Is PairRDD the best data representation ? GroupByKey et al is only available for PairRDD. 5. On a pragmatic level, file.map().map() results in RDD. How do I transform it to a PairRDD ? 1. .map(fields = (fields(1), fields(0)) - results in Unit 2. .map(fields = fields(1) - fields(0)) also is not working 3. Both these do not result in a PairRDD 4. Am missing something fundamental. Cheers Have a nice weekend k/