Re: spark ui on yarn
Hi Koert, Just curious did you find any information like CANNOT FIND ADDRESS after clicking into some stage? I've seen similar problems due to lost of executors. Best, On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote: I just tested a long lived application (that we normally run in standalone mode) on yarn in client mode. it looks to me like cached rdds are missing in the storage tap of the ui. accessing the rdd storage information via the spark context shows rdds as fully cached but they are missing on storage page. spark 1.0.0
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Thank you, Tathagata. That explains. Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 7:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Task slot is equivalent to core number. So one core can only run one task at a time. TD On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi Tathagata, Thank you. Is task slot equivalent to the core number? Or actually one core can run multiple tasks at the same time? Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data* ). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: KMeans for large training data
Thanks, setting the number of partitions to the number of executors helped a lot and training with 20k entries got a lot faster. However, when I tried training with 1M entries, after about 45 minutes of calculations, I get this: It's stuck at this point. The CPU load for the master is at 100% (so 1 of 8 cores is used), but the WebUI shows no active task, and after 30 more minutes of no visible change I had to leave for an appointment. I've never seen an error referring to this library before. Could that be due to the new partitioning? Edit: Just before sending, in a new test I realized this error also appears when the amount of testdata is very low (here 500 items). This time it includes a Java stacktrace though, instead of just stopping: So, to sum it up, KMeans.train works somewhere inbetween 10k and 200k items, but not outside this range. Can you think of an explanation for this behavior? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9508.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Announcing Spark 1.0.1
Hi All, Congrats to the entire Spark team on the 1.0.1 release. In checking out the new features, I noticed that it looks like the python API docs have been updated, but the title and the header at the top of the page still say Spark 1.0.0. Clearly not a big deal... I just wouldn't want anyone to get confused and miss out. http://spark.apache.org/docs/1.0.1/api/python/index.html best, -Brad On Fri, Jul 11, 2014 at 8:44 PM, Henry Saputra henry.sapu...@gmail.com wrote: Congrats to the Spark community ! On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
Thanks TD. BTW - If I have input file ~ 250 GBs - Is there any guideline on whether to use: * a single input (250 GB) (in this case is there any max upper bound) or * split into 1000 files each of 250 MB (hdfs block size is 250 MB) or * a multiple of hdfs block size. Mans On Friday, July 11, 2014 4:38 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The model for file stream is to pick up and process new files written atomically (by move) into a directory. So your file is being processed in a single batch, and then its waiting for any new files to be written into that directory. TD On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote: So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Akka Client disconnected
I am run logistic regression with SGD on a problem with about 19M parameters (the kdda dataset from the libsvm library) I consistently see that the nodes on my computer get disconnected and soon the whole job goes to a grinding halt. 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on pachy4 remote Akka client disassociated Does this have anything to do with the akka.frame_size? I have tried upto 1024 MB and I still get the same thing. I don't have any more information in the logs about why the clients are getting disconnected. Any thoughts? Regards, Krishna
Re: Akka Client disconnected
Are you using 1.0 or current master? A bug related to this is fixed in master. On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote: I am run logistic regression with SGD on a problem with about 19M parameters (the kdda dataset from the libsvm library) I consistently see that the nodes on my computer get disconnected and soon the whole job goes to a grinding halt. 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on pachy4 remote Akka client disassociated Does this have anything to do with the akka.frame_size? I have tried upto 1024 MB and I still get the same thing. I don't have any more information in the logs about why the clients are getting disconnected. Any thoughts? Regards, Krishna
Putting block rdd failed when running example svm on large data
Hi, I am trying to run the example BinaryClassification (org.apache.spark.examples.mllib.BinaryClassification) on a 202G file. I am constantly getting the messages looks like below, it is normal or I am missing something. 14/07/12 09:49:04 WARN BlockManager: Block rdd_4_196 could not be dropped from memory as it does not exist 14/07/12 09:49:04 WARN BlockManager: Putting block rdd_4_196 failed 14/07/12 09:49:05 WARN BlockManager: Block rdd_4_201 could not be dropped from memory as it does not exist 14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_201 failed 14/07/12 09:49:05 WARN BlockManager: Block rdd_4_202 could not be dropped from memory as it does not exist 14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_202 failed 14/07/12 09:49:05 WARN BlockManager: Block rdd_4_198 could not be dropped from memory as it does not exist 14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_198 failed 14/07/12 09:49:05 WARN BlockManager: Block rdd_4_199 could not be dropped from memory as it does not exist 14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_199 failed 14/07/12 09:49:05 WARN BlockManager: Block rdd_4_204 could not be dropped from memory as it does not exist 14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_204 failed 14/07/12 09:49:06 WARN BlockManager: Block rdd_4_203 could not be dropped from memory as it does not exist 14/07/12 09:49:06 WARN BlockManager: Putting block rdd_4_203 failed 14/07/12 09:49:07 WARN BlockManager: Block rdd_4_205 could not be dropped from memory as it does not exist 14/07/12 09:49:07 WARN BlockManager: Putting block rdd_4_205 failed Some info: 8 node cluster with 28G RAM per node, I configure 25G memory for spark. (So this does not seem to be fit in memory). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Confused by groupByKey() and the default partitioner
Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: spark ui on yarn
hey shuo, so far all stage links work fine for me. i did some more testing, and it seems kind of random what shows up on the gui and what does not. some partially cached RDDs make it to the GUI, while some fully cached ones do not. I have not been able to detect a pattern. is the codebase for the gui different in standalone than in yarn-client mode? On Sat, Jul 12, 2014 at 3:34 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi Koert, Just curious did you find any information like CANNOT FIND ADDRESS after clicking into some stage? I've seen similar problems due to lost of executors. Best, On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote: I just tested a long lived application (that we normally run in standalone mode) on yarn in client mode. it looks to me like cached rdds are missing in the storage tap of the ui. accessing the rdd storage information via the spark context shows rdds as fully cached but they are missing on storage page. spark 1.0.0
Re: Anaconda Spark AMI
Hi All, Thanks to Jey's help, I have a release AMI candidate for spark-1.0/anaconda-2.0 integration. It's currently limited to availability in US-EAST: ami-3ecd0c56 Give it a try if you have some time. This should* just work* with spark 1.0: ./spark-ec2 -k my_key -i ~/.ssh/mykey.rsa -a ami-3ecd0c56 If you have suggestions or run into trouble please email, --Ben PS: I found that writing a noop map function is a decent way to install pkgs on worker nodes (though most scientific pkgs are pre-installed with anaconda: def subprocess_noop(x): import os os.system(/opt/anaconda/bin/conda install h5py) return 1 install_noop = rdd.map(subprocess_noop) install_noop.count() On Thu, Jul 3, 2014 at 2:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote: Hi Ben, Has the PYSPARK_PYTHON environment variable been set in spark/conf/spark-env.sh to the path of the new python binary? FYI, there's a /root/copy-dirs script that can be handy when updating files on an already-running cluster. You'll want to restart the spark cluster for the changes to take effect, as described at https://spark.apache.org/docs/latest/ec2-scripts.html Hope that helps, -Jey On Thu, Jul 3, 2014 at 11:54 AM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi All, I'm a dev a Continuum and we are developing a fair amount of tooling around Spark. A few days ago someone expressed interest in numpy+pyspark and Anaconda came up as a reasonable solution. I spent a number of hours yesterday trying to rework the base Spark AMI on EC2 but sadly was defeated by a number of errors. Aggregations seemed to choke -- where as small takes executed as aspected (errors are linked to the gist): sc.appName u'PySparkShell' sc._conf.getAll() [(u'spark.executor.extraLibraryPath', u'/root/ephemeral-hdfs/lib/native/'), (u'spark.executor.memory', u'6154m'), (u'spark.submit.pyFiles', u''), (u'spark.app.name', u' PySparkShell'), (u'spark.executor.extraClassPath', u'/root/ephemeral-hdfs/conf'), (u'spark.master', u'spark://.compute-1.amazonaws.com:7077')] file = sc.textFile(hdfs:///user/root/chekhov.txt) file.take(2) [uProject Gutenberg's Plays by Chekhov, Second Series, by Anton Chekhov, u''] lines = file.filter(lambda x: len(x) 0) lines.count() VARIOUS ERROS DISCUSSED BELOW My first thought was that I could simply get away with including anaconda on the base AMI, point the path at /dir/anaconda/bin, and bake a new one. Doing so resulted in some strange py4j errors like the following: Py4JError: An error occurred while calling o17.partitions. Trace: py4j.Py4JException: Method partitions([]) does not exist At some point I also saw: SystemError: Objects/cellobject.c:24: bad argument to internal function which is really strange, possibly the result of a version mismatch? I had another thought of building spark from master on the AMI, leaving the spark directory in place, and removing the spark call from the modules list in spark-ec2 launch script. Unfortunately, this resulted in the following errors: https://gist.github.com/quasiben/da0f4778fbc87d02c088 If a spark dev was willing to make some time in the near future, I'm sure she/he and I could sort out these issues and give the Spark community a python distro ready to go for numerical computing. For instance, I'm not sure how pyspark calls out to launching a python session on a slave? Is this done as root or as the hadoop user? (i believe i changed /etc/bashrc to point to my anaconda bin directory so it shouldn't really matter. Is there something special about the py4j zip include in spark dir compared with the py4j in pypi? Thoughts? --Ben
Scalability issue in Spark with SparkPageRank example
Hello, I ran SparkPageRank example (the one included in the package) to evaluate scale-in capability of Spark. I ran experiments on a 8-node 48-core AMD machine with local[N] master. But, for N 10, the completion time of the experiment kept increasing, rather than decreasing. When I profiled it using Jprofiler, I observed that it wasn't any lock which consumed the CPU time. Instead, the amount of time spent in the following functions kept increasing: 1) java.io.ObjectOutputStream.writeObject0 2) scala.Tuple2.hashCode I confirmed the same with Oprofile as well. The findings are consistent. I am attaching the jstack output which I took twice during the whole execution with N=48 run. I ran the tests with Spark 1.0.0 and Spark 0.9.0 Can someone please suggest me what is wrong. Regards, Lokesh Gidra lessoutput3.lessoutput3 http://apache-spark-user-list.1001560.n3.nabble.com/file/n9521/lessoutput3.lessoutput3 lessoutput4.lessoutput4 http://apache-spark-user-list.1001560.n3.nabble.com/file/n9521/lessoutput4.lessoutput4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scalability-issue-in-Spark-with-SparkPageRank-example-tp9521.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Stopping StreamingContext does not kill receiver
From the interactive shell I’ve created a StreamingContext. I call ssc.start() and take a look at http://master_url:4040/streaming/ and see that I have an active Twitter receiver. Then I call ssc.stop(stopSparkContext = false, stopGracefully = true) and wait a bit, but the receiver seems to stay active. Is this expected? I’m running 1.0.1 on EC2. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Stopping StreamingContext does not kill receiver
Yes, thats a bug i just discovered. Race condition in the Twitter Receiver, will fix asap. Here is the JIRA https://issues.apache.org/jira/browse/SPARK-2464 TD On Sat, Jul 12, 2014 at 3:21 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: To add a potentially relevant piece of information, around when I stop the StreamingContext, I get the following warning: 14/07/12 22:16:18 WARN ReceiverTracker: All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,TwitterReceiver-0,Actor[akka.tcp://spark@url-here:49776/user/Receiver-0-140520314#-273455949],true,url-here-too,,)) Nick On Sat, Jul 12, 2014 at 6:03 PM, Nick Chammas nicholas.cham...@gmail.com wrote: From the interactive shell I’ve created a StreamingContext. I call ssc.start() and take a look at http://master_url:4040/streaming/ and see that I have an active Twitter receiver. Then I call ssc.stop(stopSparkContext = false, stopGracefully = true) and wait a bit, but the receiver seems to stay active. Is this expected? I’m running 1.0.1 on EC2. Nick -- View this message in context: Stopping StreamingContext does not kill receiver http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Stopping StreamingContext does not kill receiver
Okie doke. Thanks for filing the JIRA. On Sat, Jul 12, 2014 at 6:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, thats a bug i just discovered. Race condition in the Twitter Receiver, will fix asap. Here is the JIRA https://issues.apache.org/jira/browse/SPARK-2464 TD On Sat, Jul 12, 2014 at 3:21 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: To add a potentially relevant piece of information, around when I stop the StreamingContext, I get the following warning: 14/07/12 22:16:18 WARN ReceiverTracker: All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,TwitterReceiver-0,Actor[akka.tcp://spark@url-here:49776/user/Receiver-0-140520314#-273455949],true,url-here-too,,)) Nick On Sat, Jul 12, 2014 at 6:03 PM, Nick Chammas nicholas.cham...@gmail.com wrote: From the interactive shell I’ve created a StreamingContext. I call ssc.start() and take a look at http://master_url:4040/streaming/ and see that I have an active Twitter receiver. Then I call ssc.stop(stopSparkContext = false, stopGracefully = true) and wait a bit, but the receiver seems to stay active. Is this expected? I’m running 1.0.1 on EC2. Nick -- View this message in context: Stopping StreamingContext does not kill receiver http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Confused by groupByKey() and the default partitioner
Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: Akka Client disconnected
I am using the master that I compiled 2 days ago. Can you point me to the JIRA? On Sat, Jul 12, 2014 at 9:13 AM, DB Tsai dbt...@dbtsai.com wrote: Are you using 1.0 or current master? A bug related to this is fixed in master. On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote: I am run logistic regression with SGD on a problem with about 19M parameters (the kdda dataset from the libsvm library) I consistently see that the nodes on my computer get disconnected and soon the whole job goes to a grinding halt. 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on pachy4 remote Akka client disassociated Does this have anything to do with the akka.frame_size? I have tried upto 1024 MB and I still get the same thing. I don't have any more information in the logs about why the clients are getting disconnected. Any thoughts? Regards, Krishna
Re: KMeans for large training data
The netlib.BLAS: Failed to load implementation warning only means that the BLAS implementation may be slower than using a native one. The reason why it only shows up at the end is that the library is only used for the finalization step of the KMeans algorithm, so your job should've been wrapping up at this point. I am not familiar with the algorithm beyond that, so I'm not sure if for some reason we're trying to collect too much data back to the driver here. SPARK_DRIVER_MEMORY can increase the driver memory, by the way (or by using the --driver-memory option when using spark-submit). On Sat, Jul 12, 2014 at 2:38 AM, durin m...@simon-schaefer.net wrote: Your latest response doesn't show up here yet, I only got the mail. I'll still answer here in the hope that it appears later: Which memory setting do you mean? I can go up with spark.executor.memory a bit, it's currently set to 12G. But thats already way more than the whole SchemaRDD of Vectors that I currently use for training, which shouldn't be more than a few hundred M. I suppose you rather mean something comparable to SHARK_MASTER_MEM in Shark. I can't find the equivalent for Spark in the documentations, though. And if it helps, I can summarize the whole code currently that I currently use. It's nothing really fancy at the moment, I'm just trying to classify Strings that each contain a few words (words are handled each as atomic items). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9509.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Convert from RDD[Object] to RDD[Array[Object]]
If you don't really care about the batchedDegree, but rather just want to do operations over some set of elements rather than one at a time, then just use mapPartitions(). Otherwise, if you really do want certain sized batches and you are able to relax the constraints slightly, is to construct these batches within each partition. For instance: val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] = new Iterator[Array[Int]] { def hasNext: Boolean = iter.hasNext def next(): Array[Int] = { iter.take(batchedDegree).toArray } } } This function is efficient in that it does not load the entire partition into memory, just enough to construct each batch. However, there will be one smaller batch at the end of each partition (rather than just one over the entire dataset). On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote: Hi there, I have a bunch of data in a RDD, which I processed it one by one previously. For example, there was a RDD denoted by data: RDD[Object] and then I processed it using data.map(...). However, I got a new requirement to process the data in a patched way. It means that I need to convert the RDD from RDD[Object] to RDD[Array[Object]] and then process it, which is to fill out this function: def convert2array(inputs: RDD[Object], batchedDegree: Int): RDD[Array[Object]] = {...}. I hope that after the conversion, each element of the new RDD is an array of the previous RDD elements. The parameter batchedDegree specifies how many elements are batched together. For example, if I have 210 elements in the previous RDD, the result of the conversion functions should be a RDD with 3 elements. Each element is an array, and the first two arrays contains 1~100 and 101~200 elements. The third element contains 201~210 elements. I was wondering if anybody could help me complete this scala function with an efficient way. Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pyspark sc.parallelize running OOM with smallish data
I think this is probably dying on the driver itself, as you are probably materializing the whole dataset inside your python driver. How large is spark_data_array compared to your driver memory? On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote: I put the same dataset into scala (using spark-shell) and it acts weird. I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96 in the status bar, shows details about the worker nodes but there is no progress. sc.parallelize does finish (takes too long for the data size) in scala. On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote: spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in the cluster and gave 48g to executors. also tried kyro serialization. traceback (most recent call last): File /mohit/./m.py, line 58, in module spark_data = sc.parallelize(spark_data_array) File /mohit/spark/python/pyspark/context.py, line 265, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Akka Client disconnected
https://issues.apache.org/jira/browse/SPARK-2156 Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sat, Jul 12, 2014 at 5:23 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the master that I compiled 2 days ago. Can you point me to the JIRA? On Sat, Jul 12, 2014 at 9:13 AM, DB Tsai dbt...@dbtsai.com wrote: Are you using 1.0 or current master? A bug related to this is fixed in master. On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote: I am run logistic regression with SGD on a problem with about 19M parameters (the kdda dataset from the libsvm library) I consistently see that the nodes on my computer get disconnected and soon the whole job goes to a grinding halt. 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on pachy4 remote Akka client disassociated Does this have anything to do with the akka.frame_size? I have tried upto 1024 MB and I still get the same thing. I don't have any more information in the logs about why the clients are getting disconnected. Any thoughts? Regards, Krishna
Re: Putting block rdd failed when running example svm on large data
Hi Xiangrui, Thanks for the information. Also, it is possible to figure out the execution time per iteration for SVM? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Putting block rdd failed when running example svm on large data
Also check the web ui for that. Each iteration will have one or more stages associated with it in the driver web ui. On Sat, Jul 12, 2014 at 6:47 PM, crater cq...@ucmerced.edu wrote: Hi Xiangrui, Thanks for the information. Also, it is possible to figure out the execution time per iteration for SVM? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Supported SQL syntax in Spark SQL
Is there a place where we can find an up-to-date list of supported SQL syntax in Spark SQL? Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-tp9538.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Large Task Size?
I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Large Task Size?
I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: not getting output from socket connection
Thanks! I thought it would get passed through netcat, but given your email, I was able to follow this tutorial and get it to work: http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote: netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: Convert from RDD[Object] to RDD[Array[Object]]
And if you can relax your constraints even further to only require RDD[List[Int]], then it's even simpler: rdd.mapPartitions(_.grouped(batchedDegree)) On Sat, Jul 12, 2014 at 6:26 PM, Aaron Davidson ilike...@gmail.com wrote: If you don't really care about the batchedDegree, but rather just want to do operations over some set of elements rather than one at a time, then just use mapPartitions(). Otherwise, if you really do want certain sized batches and you are able to relax the constraints slightly, is to construct these batches within each partition. For instance: val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] = new Iterator[Array[Int]] { def hasNext: Boolean = iter.hasNext def next(): Array[Int] = { iter.take(batchedDegree).toArray } } } This function is efficient in that it does not load the entire partition into memory, just enough to construct each batch. However, there will be one smaller batch at the end of each partition (rather than just one over the entire dataset). On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote: Hi there, I have a bunch of data in a RDD, which I processed it one by one previously. For example, there was a RDD denoted by data: RDD[Object] and then I processed it using data.map(...). However, I got a new requirement to process the data in a patched way. It means that I need to convert the RDD from RDD[Object] to RDD[Array[Object]] and then process it, which is to fill out this function: def convert2array(inputs: RDD[Object], batchedDegree: Int): RDD[Array[Object]] = {...}. I hope that after the conversion, each element of the new RDD is an array of the previous RDD elements. The parameter batchedDegree specifies how many elements are batched together. For example, if I have 210 elements in the previous RDD, the result of the conversion functions should be a RDD with 3 elements. Each element is an array, and the first two arrays contains 1~100 and 101~200 elements. The third element contains 201~210 elements. I was wondering if anybody could help me complete this scala function with an efficient way. Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark ui on yarn
The UI code is the same in both, but one possibility is that your executors were given less memory on YARN. Can you check that? Or otherwise, how do you know that some RDDs were cached? Matei On Jul 12, 2014, at 4:12 PM, Koert Kuipers ko...@tresata.com wrote: hey shuo, so far all stage links work fine for me. i did some more testing, and it seems kind of random what shows up on the gui and what does not. some partially cached RDDs make it to the GUI, while some fully cached ones do not. I have not been able to detect a pattern. is the codebase for the gui different in standalone than in yarn-client mode? On Sat, Jul 12, 2014 at 3:34 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi Koert, Just curious did you find any information like CANNOT FIND ADDRESS after clicking into some stage? I've seen similar problems due to lost of executors. Best, On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote: I just tested a long lived application (that we normally run in standalone mode) on yarn in client mode. it looks to me like cached rdds are missing in the storage tap of the ui. accessing the rdd storage information via the spark context shows rdds as fully cached but they are missing on storage page. spark 1.0.0