Re: How to achieve reasonable performance on Spark Streaming?
It seems that the slow reduce tasks are caused by slow shuffling. Here is the logs regarding one slow reduce task: 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_88_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_89_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_90_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_91_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_92_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_93_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_94_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_95_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_96_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_97_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_188_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_189_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_190_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_191_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_192_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_193_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_194_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_195_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_196_18 after 5029 ms 14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got remote block shuffle_69_197_18 after 5029 ms 14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is 1143 14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to driver 14/06/11 23:42:45 INFO Executor: Finished task ID 23643 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
shuffling using netty in spark streaming
Hi, 1. Does netty perform better than the basic method for shuffling? I found the latency caused by shuffling in a streaming job is not stable with the basic method. 2. However, after I turn on netty for shuffling, I can only see the results for the first two batches, and then no output at all. I'm not sure whether the way I turn on netty is correct: val conf = new SparkConf().set(spark.shuffle.use.netty, true) Thanks. Boduo Li
Re: Using Spark to crack passwords
Hi Nick, The great thing about any *unsalted* hashes is you can precompute them ahead of time, then it is just a lookup to find the password which matches the hash in seconds -- always makes for a more exciting demo than come back in a few hours. It is a no-brainer to write a generator function to create all possible passwords from a charset like abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash them and store them to lookup later. It is however incredibly wasteful on storage space. - all passwords from 1 to 9 letters long - using the charset above = 13,759,005,997,841,642 passwords - assuming 20 bytes to store the SHA-1 and up to 9 to store the password equals approximately 375.4 Petabytes Thankfully there is already a more efficient/compact mechanism to achieve this using Rainbow Tables http://en.wikipedia.org/wiki/Rainbow_table -- better still, there is an active community of people who have already precomputed many of these datasets already. The above dataset is readily available to download and is just 864GB -- much more feasible. All you need to do then is write a rainbow-table lookup function in Spark and leverage the precomputed files stored in HDFS. Done right you should be able to achieve interactive (few second) lookups. Have fun! MC *Michael Cutler* Founder, CTO *Mobile: +44 789 990 7847Email: mich...@tumra.com mich...@tumra.comWeb: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email* *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote: Spark is obviously well-suited to crunching massive amounts of data. How about to crunch massive amounts of numbers? A few years ago I put together a little demo for some co-workers to demonstrate the dangers of using SHA1 http://codahale.com/how-to-safely-store-a-password/ to hash and store passwords. Part of the demo included a live brute-forcing of hashes to show how SHA1's speed made it unsuitable for hashing passwords. I think it would be cool to redo the demo, but utilize the power of a cluster managed by Spark to crunch through hashes even faster. But how would you do that with Spark (if at all)? I'm guessing you would create an RDD that somehow defined the search space you're going to go through, and then partition it to divide the work up equally amongst the cluster's cores. Does that sound right? I wonder if others have already used Spark for computationally-intensive workloads like this, as opposed to just data-intensive ones. Nick -- View this message in context: Using Spark to crack passwords http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark SQL incorrect result on GROUP BY query
I reran with master and looks like it is fixed. 2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com: I'd try rerunning with master. It is likely you are running into SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994. Michael On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Re: Spark SQL incorrect result on GROUP BY query
Thanks for verifying! On Thu, Jun 12, 2014 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote: I reran with master and looks like it is fixed. 2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com: I'd try rerunning with master. It is likely you are running into SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994. Michael On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark 1.0.0 and found in spark sql some queries use GROUP BY give weird results. To reproduce, type the following commands in spark-shell connecting to a standalone server: case class Foo(k: String, v: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++ List.fill(300)(Foo(c, 3)) sc.makeRDD(rows).registerAsTable(foo) sql(select k,count(*) from foo group by k).collect the result will be something random like: res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75], [c,270], [4,56], [1,1]) and if I run the same query again, the new result will be correct: sql(select k,count(*) from foo group by k).collect res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300]) Should I file a bug? -- Pei-Lun Lee
Re: Using Spark to crack passwords
This actually what I've already mentioned - with rainbow tables kept in memory it could be really fast! Marek 2014-06-12 9:25 GMT+02:00 Michael Cutler mich...@tumra.com: Hi Nick, The great thing about any *unsalted* hashes is you can precompute them ahead of time, then it is just a lookup to find the password which matches the hash in seconds -- always makes for a more exciting demo than come back in a few hours. It is a no-brainer to write a generator function to create all possible passwords from a charset like abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash them and store them to lookup later. It is however incredibly wasteful on storage space. - all passwords from 1 to 9 letters long - using the charset above = 13,759,005,997,841,642 passwords - assuming 20 bytes to store the SHA-1 and up to 9 to store the password equals approximately 375.4 Petabytes Thankfully there is already a more efficient/compact mechanism to achieve this using Rainbow Tables http://en.wikipedia.org/wiki/Rainbow_table -- better still, there is an active community of people who have already precomputed many of these datasets already. The above dataset is readily available to download and is just 864GB -- much more feasible. All you need to do then is write a rainbow-table lookup function in Spark and leverage the precomputed files stored in HDFS. Done right you should be able to achieve interactive (few second) lookups. Have fun! MC *Michael Cutler* Founder, CTO * Mobile: +44 789 990 7847 Email: mich...@tumra.com mich...@tumra.com Web: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email * *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote: Spark is obviously well-suited to crunching massive amounts of data. How about to crunch massive amounts of numbers? A few years ago I put together a little demo for some co-workers to demonstrate the dangers of using SHA1 http://codahale.com/how-to-safely-store-a-password/ to hash and store passwords. Part of the demo included a live brute-forcing of hashes to show how SHA1's speed made it unsuitable for hashing passwords. I think it would be cool to redo the demo, but utilize the power of a cluster managed by Spark to crunch through hashes even faster. But how would you do that with Spark (if at all)? I'm guessing you would create an RDD that somehow defined the search space you're going to go through, and then partition it to divide the work up equally amongst the cluster's cores. Does that sound right? I wonder if others have already used Spark for computationally-intensive workloads like this, as opposed to just data-intensive ones. Nick -- View this message in context: Using Spark to crack passwords http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
How to read a snappy-compressed text file?
Hi, Maybe this is a newbie question: How to read a snappy-compressed text file? The OS is Windows 7. Currently, I've done the following steps: 1. Built Hadoop 2.4.0 with snappy option. 'hadoop checknative' command displays the following line: snappy: true D:\hadoop-2.4.0\bin\snappy.dll So, I assume hadoop can do snappy compression. BTW, snapp.dll was copied from snapp64.dll file in snappy-windows-1.1.1.8. 2. Added the following configurations to both core-site.xml and yarn-site.xml. property nameio.compression.codecs/name valueorg.apache.hadoop.io.compress.SnappyCodec/value /property 3. Added the following environment variable. SPARK_LIBRARY_PATH=D:\hadoop-2.4.0\bin Since I use IntelliJ, the above line was included to the Environment variables section in Run Configuration. 4. Compressed the input text file with snzip.exe which was included in snappy-windows-1.1.1.8. 4. Wrote the code. sc.textFile(compressed_file_name); // no other argument. .map(.) Now when I run my spark application, the results are as follows: 1. 'snappy' string cannot be found in DEBUG log. The most relevant logs are as follows: 14/06/12 18:57:55 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/06/12 18:57:55 DEBUG NativeCodeLoader: Loaded the native-hadoop library 2. Application fails. The log is as follows: 14/06/12 18:57:57 WARN: int from string failed for: [(some binary characters)] So apparently sc.textFile() does not recognize the file format and read it as-is, so map function receives a garbage. How can I fix this? Thanks.
Re: how to set spark.executor.memory and heap size
Hi, Can you give us a little more insight on how you used that file to solve your problem ? We're having the same OOM as you were and haven't been able to solve it yet. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p7469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: initial basic question from new user
The goal of rdd.persist is to created a cached rdd that breaks the DAG lineage. Therefore, computations *in the same job* that use that RDD can re-use that intermediate result, but it's not meant to survive between job runs. for example: val baseData = rawDataRdd.map(...).flatMap(...).reduceByKey(...).persist val metric1 = baseData.flatMap(op1).reduceByKey.collect val metric2 = baseData.flatMap(op2).reduceByKey.collect Without persist, computing metric1 and metric2 would trigger the computation starting from rawData. With persist, both metric1 and metric2 will start from the intermediate result (baseData) If you need to ad-hoc persist to files, you can can save RDDs using rdd.saveAsObjectFile(...) [1] and load them afterwards using sparkContext.objectFile(...) If you want to preserve the RDDs in memory between job runs, you should look at the Spark-JobServer [3] [1] https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD [2] https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.SparkContext [3] https://github.com/ooyala/spark-jobserver On Thu, Jun 12, 2014 at 11:24 AM, Toby Douglass t...@avocet.io wrote: Gents, I am investigating Spark with a view to perform reporting on a large data set, where the large data set receives additional data in the form of log files on an hourly basis. Where the data set is large there is a possibility we will create a range of aggregate tables, to reduce the volume of data which has to be processed. Having spent a little while reading up about Spark, my thought was that I could create an RDD which is an agg, persist this to disk, have reporting queries run against that RDD and when new data arrives, convert the new log file into an agg and add that to the agg RDD. However, I begin now to get the impression that RDDs cannot be persisted across jobs - I can generate an RDD, I can persist it, but I can see no way for a later job to load a persisted RDD (and I begin to think it will have been GCed anyway, at the end of the first job). Is this correct?
Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1
Hi, The problem was due to a pre-built/binary Tachyon-0.4.1 jar in the SPARK_CLASSPATH, and that Tachyon jar had been built against Hadoop-1.0.4.Building the Tachyon against Hadoop-2.0.0 resolved the issue. Thanks On Wed, Jun 11, 2014 at 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote: The error is saying that your client libraries are older than what your server is using (2.0.0-mr1-cdh4.6.0 is IPC version 7). Try double-checking that your build is actually using that version (e.g., by looking at the hadoop jar files in lib_managed/jars). On Wed, Jun 11, 2014 at 2:07 AM, bijoy deb bijoy.comput...@gmail.com wrote: Any suggestions from anyone? Thanks Bijoy On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com wrote: Hi all, I have build Shark-0.9.1 using sbt using the below command: SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0. But when I try to execute the below command from Spark shell,which reads a file from HDFS, I get the IPC version mismatch- IPC version 7 on server versus IPC version 4 on client error on org.apache.hadoop.hdfs.DFSClient class. scala val s = sc.textFile(hdfs://host:port/test.txt) scala s.count() 14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not loaded org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) Apparently this error is because of version mismatch of the hadoop-hdfs jar between client (one referred by Spark) and server(hadoop cluster).But what I don't understand is why is this mismatch (since I had built Spark with the correct Hadoop version). Any suggestions would be highly appreciated. Thanks Bijoy -- Marcelo
Re: pmml with augustus
@villu: thank you for your help. In prommis I gonna try it! thats cools :-) do you know also the other way around from pmml to a model object in spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7473.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Writing data to HBase using Spark
Is there anyone else who is facing this problem of writing to HBase when running Spark on YARN mode or Standalone mode using this example? If not, then do I need to explicitly, specify something in the classpath? Regards, Gaurav On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta gaurav.d...@gmail.com wrote: Hi Kanwaldeep, I have tried your code but arrived into a problem. The code is working fine in *local* mode. But if I run the same code in Spark stand alone mode or YARN mode, then it is continuously executing, but not saving anything in the HBase table. I guess, it is stopping data streaming once the *saveToHBase* method is called for the first time. This is strange. I just want to know whether you have tested the code on all Spark execution modes? Thanks, Gaurav On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List] ml-node+s1001560n7305...@n3.nabble.com wrote: Please see sample code attached at https://issues.apache.org/jira/browse/SPARK-944. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Z2F1cmF2LmRnMTlAZ21haWwuY29tfDF8LTk5NzA0ODAy . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to use SequenceFileRDDFunctions.saveAsSequenceFile() in Java?
Hi, How to use SequenceFileRDDFunctions.saveAsSequenceFile() in Java? A simple example will be a great help. Thanks in advance.
RE: shuffling using netty in spark streaming
Hi, 1. The performance is based on your hardware and system configurations, you can test it yourself. In my test, the two shuffle implementations have no special performance difference in latest version. 2. That’s correct to turn on netty based shuffle, and there’s no shuffle fetch related metrics in netty based shuffle, so you may not see the shuffle fetch related metrics in web portal. Thanks Jerry From: onpoq l [mailto:onpo...@gmail.com] Sent: Thursday, June 12, 2014 2:35 PM To: user@spark.apache.org Subject: shuffling using netty in spark streaming Hi, 1. Does netty perform better than the basic method for shuffling? I found the latency caused by shuffling in a streaming job is not stable with the basic method. 2. However, after I turn on netty for shuffling, I can only see the results for the first two batches, and then no output at all. I'm not sure whether the way I turn on netty is correct: val conf = new SparkConf().set(spark.shuffle.use.netty, true) Thanks. Boduo Li
Spark 1.0.0 Standalone AppClient cannot connect Master
Hi, all Why does the Spark 1.0.0 official doc remove how to build Spark with corresponding Hadoop version? It means that if I don't need to specify the Hadoop version with I build my Spark 1.0.0 with `sbt/sbt assembly`? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: initial basic question from new user
Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want for your use case. As for Parquet support, that's newly arrived in Spark 1.0.0 together with SparkSQL so continue to watch this space. Gerard's suggestion to look at JobServer, which you can generalize as building a long-running application which allows multiple clients to load/share/persist/save/collaborate-on RDDs satisfies a larger, more complex use case. That is indeed the job of a higher-level application, subject to a wide variety of higher-level design choices. A number of us have successfully built Spark-based apps around that model. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass t...@avocet.io wrote: On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas gerard.m...@gmail.com wrote: The goal of rdd.persist is to created a cached rdd that breaks the DAG lineage. Therefore, computations *in the same job* that use that RDD can re-use that intermediate result, but it's not meant to survive between job runs. As I understand it, Spark is designed for interactive querying, in the sense that the caching of intermediate results eliminates the need to recompute those results. However, if intermediate results last only for the duration of a job (e.g. say a python script), how exactly is interactive querying actually performed? a script is not an interactive medium. Is the shell the only medium for interactive querying? Consider a common usage case : a web-site, which offers reporting upon a large data set. Users issue arbitrary queries. A few queries (just with different arguments) dominate the query load, so we thought to create intermediate RDDs to service those queries, so only those order of magnitude or smaller RDDs would need to be processed. Where this is not possible, we can only use Spark for reporting by issuing each query over the whole data set - e.g. Spark is just like Impala is just like Presto is just like [nnn]. The enourmous benefit of RDDs - the entire point of Spark - so profoundly useful here - is not available. What a huge and unexpected loss! Spark seemingly renders itself ordinary. It is for this reason I am surprised to find this functionality is not available. If you need to ad-hoc persist to files, you can can save RDDs using rdd.saveAsObjectFile(...) [1] and load them afterwards using sparkContext.objectFile(...) I've been using this site for docs; http://spark.apache.org Here we find through the top-of-the-page menus the link API Docs - Python API which brings us to; http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html Where this page does not show the function saveAsObjectFile(). I find now from your link here; https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD What appears to be a second and more complete set of the same documentation, using a different web-interface to boot. It appears at least that there are two sets of documentation for the same APIs, where one set is out of the date and the other not, and the out of date set is that which is linked to from the main site? Given that our agg sizes will exceed memory, we expect to cache them to disk, so save-as-object (assuming there are no out of the ordinary performance issues) may solve the problem, but I was hoping to store data is a column orientated format. However I think this in general is not possible - Spark can *read* Parquet, but I think it cannot write Parquet as a disk-based RDD format. If you want to preserve the RDDs in memory between job runs, you should look at the Spark-JobServer [3] Thankyou. I view this with some trepidation. It took two man-days to get Spark running (and I've spent another man day now trying to get a map/reduce to run; I'm getting there, but not there yet) - the bring-up/config experience for end-users is not tested or accurated documented (although to be clear, no better and no worse than is normal for open source; Spark is not exceptional). Having to bring up another open source project is a significant barrier to entry; it's always such a headache. The save-to-disk function you mentioned earlier will allow intermediate RDDs to go to disk, but we do in fact have a use case where in-memory would be useful; it might allow us to ditch Cassandra, which would be wonderful, since it would reduce the system count by one. I have to say, having to install JobServer to achieve this one end seems an extraordinarily heavyweight solution - a whole new application, when all that is wished for is that Spark persists RDDs across jobs, where so small a feature seems to open the door to so much functionality.
Re: initial basic question from new user
RE: Given that our agg sizes will exceed memory, we expect to cache them to disk, so save-as-object (assuming there are no out of the ordinary performance issues) may solve the problem, but I was hoping to store data is a column orientated format. However I think this in general is not possible - Spark can *read* Parquet, but I think it cannot write Parquet as a disk-based RDD format. Spark can write Parquet, via the ParquetOutputFormat which is distributed from Parquet. If you'd like example code for writing out to Parquet, please see the adamSave function in https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala, starting at line 62. There is a bit of setup necessary for the Parquet write codec, but otherwise it is fairly straightforward. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Thu, Jun 12, 2014 at 7:03 AM, Christopher Nguyen c...@adatao.com wrote: Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want for your use case. As for Parquet support, that's newly arrived in Spark 1.0.0 together with SparkSQL so continue to watch this space. Gerard's suggestion to look at JobServer, which you can generalize as building a long-running application which allows multiple clients to load/share/persist/save/collaborate-on RDDs satisfies a larger, more complex use case. That is indeed the job of a higher-level application, subject to a wide variety of higher-level design choices. A number of us have successfully built Spark-based apps around that model. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass t...@avocet.io wrote: On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas gerard.m...@gmail.com wrote: The goal of rdd.persist is to created a cached rdd that breaks the DAG lineage. Therefore, computations *in the same job* that use that RDD can re-use that intermediate result, but it's not meant to survive between job runs. As I understand it, Spark is designed for interactive querying, in the sense that the caching of intermediate results eliminates the need to recompute those results. However, if intermediate results last only for the duration of a job (e.g. say a python script), how exactly is interactive querying actually performed? a script is not an interactive medium. Is the shell the only medium for interactive querying? Consider a common usage case : a web-site, which offers reporting upon a large data set. Users issue arbitrary queries. A few queries (just with different arguments) dominate the query load, so we thought to create intermediate RDDs to service those queries, so only those order of magnitude or smaller RDDs would need to be processed. Where this is not possible, we can only use Spark for reporting by issuing each query over the whole data set - e.g. Spark is just like Impala is just like Presto is just like [nnn]. The enourmous benefit of RDDs - the entire point of Spark - so profoundly useful here - is not available. What a huge and unexpected loss! Spark seemingly renders itself ordinary. It is for this reason I am surprised to find this functionality is not available. If you need to ad-hoc persist to files, you can can save RDDs using rdd.saveAsObjectFile(...) [1] and load them afterwards using sparkContext.objectFile(...) I've been using this site for docs; http://spark.apache.org Here we find through the top-of-the-page menus the link API Docs - Python API which brings us to; http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html Where this page does not show the function saveAsObjectFile(). I find now from your link here; https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD What appears to be a second and more complete set of the same documentation, using a different web-interface to boot. It appears at least that there are two sets of documentation for the same APIs, where one set is out of the date and the other not, and the out of date set is that which is linked to from the main site? Given that our agg sizes will exceed memory, we expect to cache them to disk, so save-as-object (assuming there are no out of the ordinary performance issues) may solve the problem, but I was hoping to store data is a column orientated format. However I think this in general is not possible - Spark can *read* Parquet, but I think it cannot write Parquet as a disk-based RDD format. If you want to preserve the RDDs in memory between job runs, you should look at the Spark-JobServer [3] Thankyou. I view this with some trepidation. It took two man-days to get Spark running (and I've spent another man day now trying to get a map/reduce to run; I'm getting there, but not there yet) - the bring-up/config experience for end-users is not
Re: Using Spark to crack passwords
You need a use case where a lot of computation is applied to a little data. How about any of the various distributed computing projects out there? Although the SETI@home use case seems like a cool example, I doubt you want to reimplement its client. It might be far simpler to reimplement a search for Mersenne primes or optimal Golomb rulers or something. Although you're not going to get great speed through the JVM, it may be just fine as an example. And it stands some remote chance of getting useful work done. On Thu, Jun 12, 2014 at 11:11 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Indeed, rainbow tables are helpful for working on unsalted hashes. They turn a large amount of computational work into a bit of computational work and a bit of lookup work. The rainbow tables could easily be captured as RDDs. I guess I derailed my own discussion by focusing on password cracking, since my intention was to explore how Spark applications are written for compute-intensive workloads as opposed to data intensive ones. And for certain types of password cracking, the best approach is to turn compute work into data work. :) On Thu, Jun 12, 2014 at 5:32 AM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: This actually what I've already mentioned - with rainbow tables kept in memory it could be really fast! Marek 2014-06-12 9:25 GMT+02:00 Michael Cutler mich...@tumra.com: Hi Nick, The great thing about any *unsalted* hashes is you can precompute them ahead of time, then it is just a lookup to find the password which matches the hash in seconds -- always makes for a more exciting demo than come back in a few hours. It is a no-brainer to write a generator function to create all possible passwords from a charset like abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash them and store them to lookup later. It is however incredibly wasteful on storage space. - all passwords from 1 to 9 letters long - using the charset above = 13,759,005,997,841,642 passwords - assuming 20 bytes to store the SHA-1 and up to 9 to store the password equals approximately 375.4 Petabytes Thankfully there is already a more efficient/compact mechanism to achieve this using Rainbow Tables http://en.wikipedia.org/wiki/Rainbow_table -- better still, there is an active community of people who have already precomputed many of these datasets already. The above dataset is readily available to download and is just 864GB -- much more feasible. All you need to do then is write a rainbow-table lookup function in Spark and leverage the precomputed files stored in HDFS. Done right you should be able to achieve interactive (few second) lookups. Have fun! MC *Michael Cutler* Founder, CTO * Mobile: +44 789 990 7847 Email: mich...@tumra.com mich...@tumra.com Web: tumra.com http://tumra.com/?utm_source=signatureutm_medium=email * *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq* *Registered in England Wales, 07916412. VAT No. 130595328 130595328* This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email. On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote: Spark is obviously well-suited to crunching massive amounts of data. How about to crunch massive amounts of numbers? A few years ago I put together a little demo for some co-workers to demonstrate the dangers of using SHA1 http://codahale.com/how-to-safely-store-a-password/ to hash and store passwords. Part of the demo included a live brute-forcing of hashes to show how SHA1's speed made it unsuitable for hashing passwords. I think it would be cool to redo the demo, but utilize the power of a cluster managed by Spark to crunch through hashes even faster. But how would you do that with Spark (if at all)? I'm guessing you would create an RDD that somehow defined the search space you're going to go through, and then partition it to divide the work up equally amongst the cluster's cores. Does that sound right? I wonder if others have already used Spark for computationally-intensive workloads like this, as opposed to just data-intensive ones. Nick -- View this message in context: Using Spark to crack passwords http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: initial basic question from new user
Hi, On 06/12/2014 05:47 PM, Toby Douglass wrote: In these future jobs, when I come to load the aggregted RDD, will Spark load and only load the columns being accessed by the query? or will Spark load everything, to convert it into an internal representation, and then execute the query? The aforementioned native Parquet support in Spark 1.0 supports column projections which means only the columns that appear in the query will be loaded. The next release will also support record filters for simple pruning predicates (int-column smaller value and such). This is different from using a Hadoop Input/Output format and requires no additional setup (jars in classpath and such). For more details see: http://spark.apache.org/docs/latest/sql-programming-guide.html#using-parquet Andre
Re: HELP!? Re: Streaming trouble (reduceByKey causes all printing to stop)
I don't know if this matters, but I'm looking at the web site that spark puts up, and I see under the streaming tab: - *Started at: *Thu Jun 12 11:42:10 EDT 2014 - *Time since start: *6 minutes 3 seconds - *Network receivers: *1 - *Batch interval: *5 seconds - *Processed batches: *0 - *Waiting batches: *1 Why would a batch be waiting for long over my batch time of 5 seconds? On Thu, Jun 12, 2014 at 10:18 AM, Michael Campbell michael.campb...@gmail.com wrote: Ad... it's NOT working. Here's the code: val bytes = kafkaStream.map({ case (key, messageBytes) = messageBytes}) // Map to just get the bytes part out... val things = bytes.flatMap(bytesArrayToThings) // convert to a thing val srcDestinations = things.map(thing = (ipToString(thing.getSourceIp), Set(ipToString(thing.getDestinationIp // up to this point works fine. // this fails to print val srcDestinationSets = srcDestinations.reduceByKey((exist: Set[String], addl: Set[String]) = exist ++ addl) What it does... From a kafka message containing many things, convert the message to an array of said things, flatMap them out to a stream of 1 thing at a time, pull out and make a Tuple of a (SourceIP, DestinationIP). ALL THAT WORKS. If I do a srcDestinations.print() I get output like the following, every 5 seconds, which is my batch size. --- Time: 140258200 ms --- (10.30.51.216,Set(10.20.1.1)) (10.20.11.3,Set(10.10.61.98)) (10.20.11.3,Set(10.10.61.95)) ... What I want is a SET of (sourceIP - Set(all the destination Ips)) Using a set because as you can see above, the same source may have the same destination multiple times and I want to eliminate dupes on the destination side. When I call the reduceByKey() method, I never get any output. When I do a srcDestinationSets.print() NOTHING EVER PRINTS. Ever. Never. What am I doing wrong? (The same happens for reduceByKeyAndWindow(..., Seconds(5)).) I'm sure this is something I've done, but I cannot figure out what it was. Help, please?
Re: initial basic question from new user
On Thu, Jun 12, 2014 at 4:48 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: On 06/12/2014 05:47 PM, Toby Douglass wrote: In these future jobs, when I come to load the aggregted RDD, will Spark load and only load the columns being accessed by the query? or will Spark load everything, to convert it into an internal representation, and then execute the query? The aforementioned native Parquet support in Spark 1.0 supports column projections which means only the columns that appear in the query will be loaded. [snip] Thankyou!
Re: Writing data to HBase using Spark
Are you able to use HadoopInputoutput reader for hbase in new hadoop Api reader? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 12, 2014 at 7:49 AM, gaurav.dasgupta gaurav.d...@gmail.com wrote: Is there anyone else who is facing this problem of writing to HBase when running Spark on YARN mode or Standalone mode using this example? If not, then do I need to explicitly, specify something in the classpath? Regards, Gaurav On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=0 wrote: Hi Kanwaldeep, I have tried your code but arrived into a problem. The code is working fine in *local* mode. But if I run the same code in Spark stand alone mode or YARN mode, then it is continuously executing, but not saving anything in the HBase table. I guess, it is stopping data streaming once the *saveToHBase* method is called for the first time. This is strange. I just want to know whether you have tested the code on all Spark execution modes? Thanks, Gaurav On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=1 wrote: Please see sample code attached at https://issues.apache.org/jira/browse/SPARK-944. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=2 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Writing data to HBase using Spark http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark 1.0.0 Standalone AppClient cannot connect Master
Hi Wang Hao, This is not removed. We moved it here: http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html If you're building with SBT, and you don't specify the SPARK_HADOOP_VERSION, then it defaults to 1.0.4. Andrew 2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com: Hi, all Why does the Spark 1.0.0 official doc remove how to build Spark with corresponding Hadoop version? It means that if I don't need to specify the Hadoop version with I build my Spark 1.0.0 with `sbt/sbt assembly`? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: use spark-shell in the source
Not sure if this is what you're looking for, but have you looked at java's ProcessBuilder? You can do something like for (line - lines) { val command = line.split( ) // You may need to deal with quoted strings val process = new ProcessBuilder(command) // redirect output of process to main thread process.start() } Or are you trying to launch an interactive REPL in the middle of your application? 2014-06-11 22:56 GMT-07:00 JaeBoo Jung itsjb.j...@samsung.com: Hi all, Can I use spark-shell programmatically in my spark application(in java or scala)? Because I want to convert scala lines to string array and run automatically in my application. For example, for( var line - lines){ //run this line in spark shell style and get outputs. run(line); } Thanks _ *JaeBoo, Jung* Assistant Engineer / BDA Lab / Samsung SDS
Re: Access DStream content?
You can use ForeachRDD then access RDD data. Hope this works for you. Gianluca On 12 Jun 2014, at 10:06, Wolfinger, Fred fwolfin...@cyberpointllc.commailto:fwolfin...@cyberpointllc.com wrote: Good morning. I have a question related to Spark Streaming. I have reduced some data down to a simple count value (by window), and would like to take immediate action on that value before storing in HDFS. I don't see any DStream member functions that would allow me to access its contents. Is what I am trying to do not in the spirit of Spark Streaming? If it's not, what would be the best practice for doing so? Thanks so much! Fred
Re: Hanging Spark jobs
I learned this from my co-worker, but it is relevant here. Spark has lazy evaluation by default, which means that all of your code does not get executed until you run your saveAsTextFile, which does not tell you much about where the problem is occurring. In order to debug this better, you might want to put in a saveAsTextFile after each RDD operation, so that you can figure out where it is getting stuck. HTH Shivani On Wed, Jun 11, 2014 at 2:17 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: These stack traces come from the stuck node? Looks like it's waiting on data in BlockFetcherIterator. Waiting for data from another node. But you say all other nodes were done? Very curious. Maybe you could try turning on debug logging, and try to figure out what happens in BlockFetcherIterator ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala). I do not think it is supposed to get stuck indefinitely. On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel dhurw...@ebay.com wrote: Hi, We are observing a recurring issue where our Spark jobs are hanging for several hours, even days, until we kill them. We are running Spark v0.9.1 over YARN. Our input is a list of edges of a graph on which we use Bagel to compute connected components using the following method: *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long) *extends* Message[Long] *with* Serializable *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int): (CC, Array[CCMessage]) = { *val* smallestComponentId = msgs.map(sq = *sq.map(_.* *myComponentId**)*.min).getOrElse(Long.MaxValue) *val* newComponentId = math.min(self.clusterID, smallestComponentId ) *val* halt = (newComponentId == self.clusterID) || (superstep = maxIters) self.active = *if* (superstep == 0) *true* *else* !halt *val* outGoingMessages = *if* (halt superstep 0) Array[CCMessage]() *else* self.edges.map(targetId = *new* CCMessage(targetId, newComponentId)).toArray self.clusterID = newComponentId (self, outGoingMessages) } Our output is a text file in which each line is a list of the node IDs in each component. The size of the output may be up to 6 GB. We see in the job tracker that most of the time jobs usually get stuck on the “saveAsTextFile” command, the final line in our code. In some cases, the job will hang during one of the iterations of Bagel during the computation of the connected components. Oftentimes, when we kill the job and re-execute it, it will finish successfully within an hour which is the expected duration. We notice that if our Spark jobs don’t finish after a few hours, they will never finish until they are killed, regardless of the load on our cluster. After consulting with our Hadoop support team, they noticed that after a particular hanging Spark job was running for 38 hours, all Spark processes on all nodes were completed except for one node which was running more than 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then back to calm. Also, the other nodes were not relinquishing their resources until our Hadoop admin killed the process on that problematic node and suddenly the job finished and “success” was reported in the job tracker. The output seemed to be fine too. If it helps you understand the issue, the Hadoop admin suggested this was a Spark issue and sent us two stack dumps which I attached to this email: before killing the node’s Spark process (dump1.txt) and after (dump2.txt). Any advice on how to resolve this issue? How can we debug this? Thanks, ~Daniel -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: Adding external jar to spark-shell classpath in spark 1.0
@Marcelo: The command ./bin/spark-shell --jars jar1,jar2,etc,etc did not work for me on a linux machine What I did is to append the class path in the bin/compute-classpath.sh file. Ran the script, then started the spark shell, and that worked Thanks Shivani On Wed, Jun 11, 2014 at 10:52 AM, Andrew Or and...@databricks.com wrote: Ah, of course, there are no application jars in spark-shell, then it seems that there are no workarounds for this at the moment. We will look into a fix shortly, but for now you will have to create an application and use spark-submit (or use spark-shell on Linux). 2014-06-11 10:42 GMT-07:00 Ulanov, Alexander alexander.ula...@hp.com: Could you elaborate on this? I don’t have an application, I just use spark shell. *From:* Andrew Or [mailto:and...@databricks.com] *Sent:* Wednesday, June 11, 2014 9:40 PM *To:* user@spark.apache.org *Subject:* Re: Adding external jar to spark-shell classpath in spark 1.0 This is a known issue: https://issues.apache.org/jira/browse/SPARK-1919. We haven't found a fix yet, but for now, you can workaround this by including your simple class in your application jar. 2014-06-11 10:25 GMT-07:00 Ulanov, Alexander alexander.ula...@hp.com: Hi, I am currently using spark 1.0 locally on Windows 7. I would like to use classes from external jar in the spark-shell. I followed the instruction in: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd but this didn’t work. I also tried running “spark-shell.cmd --jars my.jar --driver-class-path my.jar --driver-library-path my.jar” and it didn’t work either. I cannot load any class from my jar into spark shell. Btw my.jar contains a simple Scala class. Best regards, Alexander -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Announcing MesosCon schedule, incl Spark presentation
Spark friends, Yesterday the Mesos community announced the program http://mesos.apache.org/blog/mesoscon-2014-program-announced/ for #MesosCon http://events.linuxfoundation.org/events/mesoscon, the ever Apache Mesos conference taking place August 21-22, 2014 in Chicago, IL. Given the close relationship between Spark and Mesos, I wanted to make sure we let you all know. https://twitter.com/mesoscon/status/476810946141093888 The MesosCon program http://mesoscon14.sched.org includes a spark presentation by Paco Nathan, Spark on Mesos: Handling Big Data in a Distributed, Multi-Tenant Environment. Keynote presentations are by Benjamin Hindman (Twitter) and John Wilkes (Google). Hope to see many of you there. Dave
overwriting output directory
Hi, When we have multiple runs of a program writing to the same output file, the execution fails if the output directory already exists from a previous run. Is there some way we can have it overwrite the existing directory, so that we dont have to manually delete it after each run? Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overwriting-output-directory-tp7498.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: overwriting output directory
Hi, SK For 1.0.0 you have to delete it manually in 1.0.1 there will be a parameter to enable overwriting https://github.com/apache/spark/pull/947/files Best, -- Nan Zhu On Thursday, June 12, 2014 at 1:57 PM, SK wrote: Hi, When we have multiple runs of a program writing to the same output file, the execution fails if the output directory already exists from a previous run. Is there some way we can have it overwrite the existing directory, so that we dont have to manually delete it after each run? Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overwriting-output-directory-tp7498.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com).
Re: Writing data to HBase using Spark
you are using spark streaming? master = “local[n]” where n 1? Best, -- Nan Zhu On Wednesday, June 11, 2014 at 4:23 AM, gaurav.dasgupta wrote: Hi Kanwaldeep, I have tried your code but arrived into a problem. The code is working fine in local mode. But if I run the same code in Spark stand alone mode or YARN mode, then it is continuously executing, but not saving anything in the HBase table. I guess, it is stopping data streaming once the saveToHBase method is called for the first time. This is strange. I just want to know whether you have tested the code on all Spark execution modes? Thanks, Gaurav On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List] [hidden email] (/user/SendEmail.jtp?type=nodenode=7389i=0) wrote: Please see sample code attached at https://issues.apache.org/jira/browse/SPARK-944. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html To start a new topic under Apache Spark User List, email [hidden email] (/user/SendEmail.jtp?type=nodenode=7389i=1) To unsubscribe from Apache Spark User List, click here. NAML (http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml) View this message in context: Re: Writing data to HBase using Spark (http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7389.html) Sent from the Apache Spark User List mailing list archive (http://apache-spark-user-list.1001560.n3.nabble.com/) at Nabble.com (http://Nabble.com).
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
The old behavior (A) was dangerous, so it's good that (B) is now the default. But in some cases I really do want to replace the old data, as per (C). For example, I may rerun a previous computation (perhaps the input data was corrupt and I'm rerunning with good input). Currently I have to write separate code to remove the files before calling Spark. It would be very convenient if Spark could do this for me. Has anyone created a JIRA issue to support (C)? On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com wrote: It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as you said, and only delete the input data once the entire output has been successfully created. On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Without (C), what is the best practice to implement the following scenario? 1. rdd = sc.textFile(FileA) 2. rdd = rdd.map(...) // actually modifying the rdd 3. rdd.saveAsTextFile(FileA) Since the rdd transformation is 'lazy', rdd will not materialize until saveAsTextFile(), so FileA must still exist, but it must be deleted before saveAsTextFile(). What I can think is: 3. rdd.saveAsTextFile(TempFile) 4. delete FileA 5. rename TempFile to FileA This is not very convenient... Thanks. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, June 03, 2014 11:40 AM To: user@spark.apache.org Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output format check and overwrite files in the destination directory. But it won't clobber the directory entirely. I.e. if the directory already had part1 part2 part3 part4 and you write a new job outputing only two files (part1, part2) then it would leave the other two files intact, confusingly. (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check which means the directory must not exist already or an excpetion is thrown. (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK): Spark will delete/clobber an existing destination directory if it exists, then fully over-write it with new data. I'm fine to add a flag that allows (B) for backwards-compatibility reasons, but my point was I'd prefer not to have (C) even though I see some cases where it would be useful. - Patrick On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Is there a third way? Unless I miss something. Hadoop's OutputFormat wants the target dir to not exist no matter what, so it's just a question of whether Spark deletes it for you or errors. On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com wrote: We can just add back a flag to make it backwards compatible - it was just missed during the original PR. Adding a *third* set of clobber semantics, I'm slightly -1 on that for the following reasons: 1. It's scary to have Spark recursively deleting user files, could easily lead to users deleting data by mistake if they don't understand the exact semantics. 2. It would introduce a third set of semantics here for saveAsXX... 3. It's trivial for users to implement this with two lines of code (if output dir exists, delete it) before calling saveAsHadoopFile. - Patrick -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
Actually this has been merged to the master branch https://github.com/apache/spark/pull/947 -- Nan Zhu On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote: The old behavior (A) was dangerous, so it's good that (B) is now the default. But in some cases I really do want to replace the old data, as per (C). For example, I may rerun a previous computation (perhaps the input data was corrupt and I'm rerunning with good input). Currently I have to write separate code to remove the files before calling Spark. It would be very convenient if Spark could do this for me. Has anyone created a JIRA issue to support (C)? On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com (mailto:ilike...@gmail.com) wrote: It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as you said, and only delete the input data once the entire output has been successfully created. On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr (mailto:taeyun@innowireless.co.kr) wrote: Without (C), what is the best practice to implement the following scenario? 1. rdd = sc.textFile(FileA) 2. rdd = rdd.map(...) // actually modifying the rdd 3. rdd.saveAsTextFile(FileA) Since the rdd transformation is 'lazy', rdd will not materialize until saveAsTextFile(), so FileA must still exist, but it must be deleted before saveAsTextFile(). What I can think is: 3. rdd.saveAsTextFile(TempFile) 4. delete FileA 5. rename TempFile to FileA This is not very convenient... Thanks. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, June 03, 2014 11:40 AM To: user@spark.apache.org (mailto:user@spark.apache.org) Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output format check and overwrite files in the destination directory. But it won't clobber the directory entirely. I.e. if the directory already had part1 part2 part3 part4 and you write a new job outputing only two files (part1, part2) then it would leave the other two files intact, confusingly. (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check which means the directory must not exist already or an excpetion is thrown. (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK): Spark will delete/clobber an existing destination directory if it exists, then fully over-write it with new data. I'm fine to add a flag that allows (B) for backwards-compatibility reasons, but my point was I'd prefer not to have (C) even though I see some cases where it would be useful. - Patrick On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: Is there a third way? Unless I miss something. Hadoop's OutputFormat wants the target dir to not exist no matter what, so it's just a question of whether Spark deletes it for you or errors. On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com (mailto:pwend...@gmail.com) wrote: We can just add back a flag to make it backwards compatible - it was just missed during the original PR. Adding a *third* set of clobber semantics, I'm slightly -1 on that for the following reasons: 1. It's scary to have Spark recursively deleting user files, could easily lead to users deleting data by mistake if they don't understand the exact semantics. 2. It would introduce a third set of semantics here for saveAsXX... 3. It's trivial for users to implement this with two lines of code (if output dir exists, delete it) before calling saveAsHadoopFile. - Patrick -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io (mailto:daniel.siegm...@velos.io) W: www.velos.io (http://www.velos.io)
Re: Spark-Streaming window processing
To get the streaming latency I just look at the stats on the application drivers UI webpage. I don't know if you can do that programatically, but you could CURL and parse the page if you had to. Jeremy Lee BCompSci (Hons) The Unorthodox Engineers On 10 Jun 2014, at 3:36 pm, Yingjun Wu wu.yj0...@gmail.com wrote: Hi Sean, Thanks for your reply! So for the first question, any idea for measuring latency of Spark-Streaming? Regards, Yingjun -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-window-processing-tp7234p7300.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerException on reading checkpoint files
I am also seeing similar problem when trying to continue job using saved checkpoint. Can somebody help in solving this problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-reading-checkpoint-files-tp7306p7507.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
I do not want the behavior of (A) - that is dangerous and should only be enabled to account for legacy code. Personally, I think this option should eventually be removed. I want the option (C), to have Spark delete any existing part files before creating any new output. I don't necessarily want this to be a global option, but one on the API for saveTextFile (i.e. an additional boolean parameter). As it stands now, I need to precede every saveTextFile call with my own deletion code. In other words, instead of writing ... if ( cleanOutput ) { MyUtil.clean(outputDir) } rdd.writeTextFile( outputDir ) I'd like to write rdd.writeTextFile(outputDir, cleanOutput) Does that make sense? On Thu, Jun 12, 2014 at 2:51 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Actually this has been merged to the master branch https://github.com/apache/spark/pull/947 -- Nan Zhu On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote: The old behavior (A) was dangerous, so it's good that (B) is now the default. But in some cases I really do want to replace the old data, as per (C). For example, I may rerun a previous computation (perhaps the input data was corrupt and I'm rerunning with good input). Currently I have to write separate code to remove the files before calling Spark. It would be very convenient if Spark could do this for me. Has anyone created a JIRA issue to support (C)? On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com wrote: It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as you said, and only delete the input data once the entire output has been successfully created. On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Without (C), what is the best practice to implement the following scenario? 1. rdd = sc.textFile(FileA) 2. rdd = rdd.map(...) // actually modifying the rdd 3. rdd.saveAsTextFile(FileA) Since the rdd transformation is 'lazy', rdd will not materialize until saveAsTextFile(), so FileA must still exist, but it must be deleted before saveAsTextFile(). What I can think is: 3. rdd.saveAsTextFile(TempFile) 4. delete FileA 5. rename TempFile to FileA This is not very convenient... Thanks. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, June 03, 2014 11:40 AM To: user@spark.apache.org Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output format check and overwrite files in the destination directory. But it won't clobber the directory entirely. I.e. if the directory already had part1 part2 part3 part4 and you write a new job outputing only two files (part1, part2) then it would leave the other two files intact, confusingly. (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check which means the directory must not exist already or an excpetion is thrown. (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK): Spark will delete/clobber an existing destination directory if it exists, then fully over-write it with new data. I'm fine to add a flag that allows (B) for backwards-compatibility reasons, but my point was I'd prefer not to have (C) even though I see some cases where it would be useful. - Patrick On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Is there a third way? Unless I miss something. Hadoop's OutputFormat wants the target dir to not exist no matter what, so it's just a question of whether Spark deletes it for you or errors. On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com wrote: We can just add back a flag to make it backwards compatible - it was just missed during the original PR. Adding a *third* set of clobber semantics, I'm slightly -1 on that for the following reasons: 1. It's scary to have Spark recursively deleting user files, could easily lead to users deleting data by mistake if they don't understand the exact semantics. 2. It would introduce a third set of semantics here for saveAsXX... 3. It's trivial for users to implement this with two lines of code (if output dir exists, delete it) before calling saveAsHadoopFile. - Patrick -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: creating new ami image for spark ec2 commands
Creating AMIs from scratch is a complete pain in the ass. If you have a spare week, sure. I understand why the team avoids it. The easiest way is probably to spin up a working instance and then use Amazons save as new AMI, but that has some major limitations, especially with software not expecting it. (There are two of me now!) Worker nodes might cope better than the master. But yes, I also would love new AMIs that don't pull down 200 meg every time I spin up. (Spin up a cluster in five minutes HA!) Also, AMIs per region are also good for costs. I've thought of doing up new ones, (since I have experience) but I have no time and other issues first. Perhaps once I know Spark better. At least with spark, we have more control over the scripts exactly because they are Primitive. I had a quick look at YARN/Ambari, and it wasn't obvious they were any better with EC2, and a hundred times the complexity. I expect most AWS-heavy companies have a full time person just managing AMIs. They are that annoying. It's what makes Cloudera attractive. Jeremy Lee BCompSci (Hons) The Unorthodox Engineers On 6 Jun 2014, at 6:44 am, Matt Work Coarr mattcoarr.w...@gmail.com wrote: How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
spark EC2 bring-up problems
Gents, I have been bringing up a cluster on EC2 using the spark_ec2.py script. This works if the cluster has a single slave. This fails if the cluster has sixteen slaves, during the work to transfer the SSH key to the slaves. I cannot currently bring up a large cluster. Can anyone shed any light on this issue? As an aside, that script does not work out of the box in Amazon EC2 instances. Python 2.7 must be installed, and then boto for 2.7.
Re: creating new ami image for spark ec2 commands
Yeah, we badly need new AMIs that include at a minimum package/security updates and Python 2.7. There is an open issue to track the 2.7 AMI update https://issues.apache.org/jira/browse/SPARK-922, at least. On Thu, Jun 12, 2014 at 3:34 PM, unorthodox.engine...@gmail.com wrote: Creating AMIs from scratch is a complete pain in the ass. If you have a spare week, sure. I understand why the team avoids it. The easiest way is probably to spin up a working instance and then use Amazons save as new AMI, but that has some major limitations, especially with software not expecting it. (There are two of me now!) Worker nodes might cope better than the master. But yes, I also would love new AMIs that don't pull down 200 meg every time I spin up. (Spin up a cluster in five minutes HA!) Also, AMIs per region are also good for costs. I've thought of doing up new ones, (since I have experience) but I have no time and other issues first. Perhaps once I know Spark better. At least with spark, we have more control over the scripts exactly because they are Primitive. I had a quick look at YARN/Ambari, and it wasn't obvious they were any better with EC2, and a hundred times the complexity. I expect most AWS-heavy companies have a full time person just managing AMIs. They are that annoying. It's what makes Cloudera attractive. Jeremy Lee BCompSci (Hons) The Unorthodox Engineers On 6 Jun 2014, at 6:44 am, Matt Work Coarr mattcoarr.w...@gmail.com wrote: How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
Re: spark EC2 bring-up problems
On Thu, Jun 12, 2014 at 8:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, you need Python 2.7 to run spark-ec2 and most AMIs come with 2.6 Ah, yes - I mean to say, Amazon Linux. .Have you tried either: 1. Retrying launch with the --resume option? 2. Increasing the value of the --wait option? No. I will try the first, now. I think the latter is not the issue - the instances are up; something else is amiss. Thankyou.
Re: Not fully cached when there is enough memory
I too have seen cached RDDs not hit 100%, even when they are DISK_ONLY. Just saw that yesterday in fact. In some cases RDDs I expected didn't show up in the list at all. I have no idea if this is an issue with Spark or something I'm not understanding about how persist works (probably the latter). If I figure out the reason for this I'll let you know. On Wed, Jun 11, 2014 at 8:54 PM, Shuo Xiang shuoxiang...@gmail.com wrote: Xiangrui, clicking into the RDD link, it gives the same message, say only 96 of 100 partitions are cached. The disk/memory usage are the same, which is far below the limit. Is this what you want to check or other issue? On Wed, Jun 11, 2014 at 4:38 PM, Xiangrui Meng men...@gmail.com wrote: Could you try to click one that RDD and see the storage info per partition? I tried continuously caching RDDs, so new ones kick old ones out when there is not enough memory. I saw similar glitches but the storage info per partition is correct. If you find a way to reproduce this error, please create a JIRA. Thanks! -Xiangrui -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Increase storage.MemoryStore size
Hey everyone, I'm having some trouble increasing the default storage size for a broadcast variable. It looks like it defaults to a little less than 512MB every time, and I can't figure out which configuration to change to increase this. INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 426.5 MB, free 64.2 MB) (I'm seeing this in the terminal on my driver computer) I can change spark.executor.memory, and that seems to increase the amount of RAM available on my nodes, but it doesn't seem to adjust this storage size for my broadcast variables. Any ideas? Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Increase-storage-MemoryStore-size-tp7516.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Increase storage.MemoryStore size
If you are launching your application with spark-submit you can manually edit the spark-class file to make it 1g as baseline. It’s pretty easy to do and to figure out how once you open the file. This worked for me even if it’s not a final solution of course. Gianluca On 12 Jun 2014, at 15:16, ericjohnston1989 ericjohnston1...@gmail.com wrote: Hey everyone, I'm having some trouble increasing the default storage size for a broadcast variable. It looks like it defaults to a little less than 512MB every time, and I can't figure out which configuration to change to increase this. INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 426.5 MB, free 64.2 MB) (I'm seeing this in the terminal on my driver computer) I can change spark.executor.memory, and that seems to increase the amount of RAM available on my nodes, but it doesn't seem to adjust this storage size for my broadcast variables. Any ideas? Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Increase-storage-MemoryStore-size-tp7516.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
An attempt to implement dbscan algorithm on top of Spark
Hi. I'm not sure if messages like this are appropriate in this list; I just want to share with you an application I am working on. This is my personal project which I started to learn more about Spark and Scala, and, if it succeeds, to contribute it to the Spark community. Maybe someone will find it useful. Or maybe someone will want to join development. The application is available at https://github.com/alitouka/spark_dbscan Any questions, comments, suggestions, as well as criticism are welcome :) Best regards, Aliaksei Litouka
Re: How to specify executor memory in EC2 ?
spark-env.sh doesn't seem to contain any settings related to memory size :( I will continue searching for a solution and will post it if I find it :) Thank you, anyway On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com wrote: It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: Normalizations in MLBase
Hi Asian, I'm not sure if mlbase code is maintained for the current spark master. The following is the code we use for standardization in my company. I'm intended to clean up, and submit a PR. You could use it for now. def standardize(data: RDD[Vector]): RDD[Vector] = { val summarizer = new RowMatrix(data).computeColumnSummaryStatistics val mean = summarizer.mean val variance = summarizer.variance // The standardization will always densify the output, so the output // will be stored in dense vector. data.map(x = { val n = x.toBreeze.length val output = BDV.zeros[Double](n) var i = 0 while(i n) { if(variance(i) == 0) { output(i) = Double.NaN } else { output(i) = (x(i) - mean(i)) / Math.sqrt(variance(i)) } i += 1 } Vectors.fromBreeze(output) }) } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, Jun 12, 2014 at 1:49 AM, Aslan Bekirov aslanbeki...@gmail.com wrote: Hi DB, I found a piece of code that uses znorm to normalize data. /** * build training data set from sample and summary data */ val train_data = sample_data.map( v = Array.tabulate[Double](field_cnt)( i = zscore(v._2(i),sample_mean(i),sample_stddev(i)) ) ).cache Please make your comments if you find something wrong. BR, Aslan On Thu, Jun 12, 2014 at 11:13 AM, Aslan Bekirov aslanbeki...@gmail.com wrote: Thanks a lot DB. I will try to do Znorm normalization using map transformation. BR, Aslan On Thu, Jun 12, 2014 at 12:16 AM, DB Tsai dbt...@stanford.edu wrote: Hi Aslan, Currently, we don't have the utility function to do so. However, you can easily implement this by another map transformation. I'm working on this feature now, and there will be couple different available normalization option users can chose. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Jun 11, 2014 at 6:25 AM, Aslan Bekirov aslanbeki...@gmail.com wrote: Hi All, I have to normalize a set of values in the range 0-500 to the [0-1] range. Is there any util method in MLBase to normalize large set of data? BR, Aslan
NullPointerExceptions when using val or broadcast on a standalone cluster.
Hi, I'm consistently getting NullPointerExceptions when trying to use String val objects defined in my main application -- even for broadcast vals! I'm deploying on a standalone cluster with a master and 4 workers on the same machine, which is not the machine I'm submitting from. The following example works in spark-shell, but does not when submitted to the cluster with spark-submit, and also does not work locally. Is there anything I can do to fix this? Do vals need to be explicitly synchronized for RDD operations? One workaround in would be to inline the vals, but the logic in my actual application doesn't allow for this. Thanks, Brandon. --- sbt-shell --master my-server val suffix = -suffix val l = sc.parallelize(List(a, b, c)) println(l.map(_+suffix).collect().mkString(,)) Result: a-suffix,b-suffix,c-suffix --- Standalone Cluster with `submit.sh` (my script below): TestApp.scala: package com.adobe.spark // Spark. import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.broadcast._ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel // Scala. import scala.collection.mutable.ArrayBuffer object TestApp extends App { val memory = 1g val maxCores = 1 val conf = new SparkConf() .setMaster(spark://myserver:7077) //.setMaster(local[4]) .setAppName(ValError) .setSparkHome(/usr/local/spark-1.0.0) .setJars(Seq(/tmp/val-error.jar)) .set(spark.executor.memory, memory) .set(spark.cores.max, maxCores) val sc = new SparkContext(conf) val suffix = -suffix val l = sc.parallelize(List(a, b, c)) println(l.map(_+suffix).collect().mkString(,)) val suffix_bc = sc.broadcast(suffix) println(l.map(_+suffix_bc.value).collect().mkString(,)) sc.stop() } build.sbt: import AssemblyKeys._ assemblySettings jarName in assembly := val-error.jar // Load provided libraries with `sbt run`. run in Compile = Defaults.runTask( fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run) ) name := TestApp version := 1.0 scalaVersion := 2.10.3 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.0 % provided, org.slf4j % slf4j-simple % 1.7.7 // Logging. ) resolvers ++= Seq( Akka Repository at http://repo.akka.io/releases/; ) submit.sh: #!/bin/bash rm -f *.log driver-id.txt JAR=val-error.jar CLASS=com.adobe.spark.TestApp SPARK=/usr/local/spark-1.0.0 set -x sbt assembly assembly.log || exit 1 scp target/scala-2.10/$JAR eagle:/tmp || exit 2 $SPARK/bin/spark-submit \ --class $CLASS \ --master spark://myserver:7077 \ --deploy-mode cluster \ /tmp/$JAR | tee submit.log set +x DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed 's/Driver successfully submitted as \(.*\)/\1/g') [ -z $DRIVER_ID ] exit 3 echo $DRIVER_ID driver-id.txt Output: anull,bnull,cnull (For the first part.) Stack Trace: (For the broadcast var.) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
Java Custom Receiver onStart method never called
I create a Java custom receiver and then call ssc.receiverStream(new MyReceiver(localhost, 8081)); // where ssc is the JavaStreamingContext I am expecting that the receiver's onStart method gets called but it does not. Can anyone give me some guidance? What am I not doing? Here's the dependency info: dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.0.0/version /dependency Thanks for your help -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Custom-Receiver-onStart-method-never-called-tp7525.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: An attempt to implement dbscan algorithm on top of Spark
Great! I was going to implement one of my own - but I may not need to do that any more :) I haven't had a chance to look deep into your code but I would recommend accepting an RDD[Double,Double] as well, instead of just a file. val data = IOHelper.readDataset(sc, /path/to/my/data.csv) And other distance measures ofcourse. Thanks, Vipul On Jun 12, 2014, at 2:31 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Hi. I'm not sure if messages like this are appropriate in this list; I just want to share with you an application I am working on. This is my personal project which I started to learn more about Spark and Scala, and, if it succeeds, to contribute it to the Spark community. Maybe someone will find it useful. Or maybe someone will want to join development. The application is available at https://github.com/alitouka/spark_dbscan Any questions, comments, suggestions, as well as criticism are welcome :) Best regards, Aliaksei Litouka
Re: NullPointerExceptions when using val or broadcast on a standalone cluster.
That stack trace is quite similar to the one that is generated when trying to do a collect within a closure. In this case, it feels wrong to collect in a closure, but I wonder what's reason behind the NPE. Curious to know whether they are related. Here's a very simple example: rrd1.flatMap(x= rrd2.collect.flatMap(y= List(y,x))) res7: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[10] at flatMap at console:17 scala res7.collect 14/06/13 01:11:48 INFO SparkContext: Starting job: collect at console:19 14/06/13 01:11:48 INFO DAGScheduler: Got job 2 (collect at console:19) with 3 output partitions (allowLocal=false) 14/06/13 01:11:48 INFO DAGScheduler: Final stage: Stage 4(collect at console:19) 14/06/13 01:11:48 INFO DAGScheduler: Parents of final stage: List() 14/06/13 01:11:48 INFO DAGScheduler: Missing parents: List() 14/06/13 01:11:48 INFO DAGScheduler: Submitting Stage 4 (FlatMappedRDD[10] at flatMap at console:17), which has no missing parents 14/06/13 01:11:48 INFO DAGScheduler: Submitting 3 missing tasks from Stage 4 (FlatMappedRDD[10] at flatMap at console:17) 14/06/13 01:11:48 INFO TaskSchedulerImpl: Adding task set 4.0 with 3 tasks 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:0 as TID 16 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:0 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:1 as TID 17 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:1 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:2 as TID 18 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:2 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO Executor: Running task ID 16 14/06/13 01:11:48 INFO Executor: Running task ID 17 14/06/13 01:11:48 INFO Executor: Running task ID 18 14/06/13 01:11:48 ERROR Executor: Exception in task ID 18 java.lang.NullPointerException at org.apache.spark.rdd.RDD.collect(RDD.scala:728) at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:17) at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:17) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/13 01:11:48 ERROR Executor: Exception in task ID 16 ... same for each partition. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1037) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1019) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1019) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:637) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1211) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
ah, I see, I think it’s hard to do something like fs.delete() in spark code (it’s scary as we discussed in the previous PR ) so if you want (C), I guess you have to do some delete work manually Best, -- Nan Zhu On Thursday, June 12, 2014 at 3:31 PM, Daniel Siegmann wrote: I do not want the behavior of (A) - that is dangerous and should only be enabled to account for legacy code. Personally, I think this option should eventually be removed. I want the option (C), to have Spark delete any existing part files before creating any new output. I don't necessarily want this to be a global option, but one on the API for saveTextFile (i.e. an additional boolean parameter). As it stands now, I need to precede every saveTextFile call with my own deletion code. In other words, instead of writing ... if ( cleanOutput ) { MyUtil.clean(outputDir) } rdd.writeTextFile( outputDir ) I'd like to write rdd.writeTextFile(outputDir, cleanOutput) Does that make sense? On Thu, Jun 12, 2014 at 2:51 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Actually this has been merged to the master branch https://github.com/apache/spark/pull/947 -- Nan Zhu On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote: The old behavior (A) was dangerous, so it's good that (B) is now the default. But in some cases I really do want to replace the old data, as per (C). For example, I may rerun a previous computation (perhaps the input data was corrupt and I'm rerunning with good input). Currently I have to write separate code to remove the files before calling Spark. It would be very convenient if Spark could do this for me. Has anyone created a JIRA issue to support (C)? On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com (mailto:ilike...@gmail.com) wrote: It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as you said, and only delete the input data once the entire output has been successfully created. On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr (mailto:taeyun@innowireless.co.kr) wrote: Without (C), what is the best practice to implement the following scenario? 1. rdd = sc.textFile(FileA) 2. rdd = rdd.map(...) // actually modifying the rdd 3. rdd.saveAsTextFile(FileA) Since the rdd transformation is 'lazy', rdd will not materialize until saveAsTextFile(), so FileA must still exist, but it must be deleted before saveAsTextFile(). What I can think is: 3. rdd.saveAsTextFile(TempFile) 4. delete FileA 5. rename TempFile to FileA This is not very convenient... Thanks. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, June 03, 2014 11:40 AM To: user@spark.apache.org (mailto:user@spark.apache.org) Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output format check and overwrite files in the destination directory. But it won't clobber the directory entirely. I.e. if the directory already had part1 part2 part3 part4 and you write a new job outputing only two files (part1, part2) then it would leave the other two files intact, confusingly. (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check which means the directory must not exist already or an excpetion is thrown. (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK): Spark will delete/clobber an existing destination directory if it exists, then fully over-write it with new data. I'm fine to add a flag that allows (B) for backwards-compatibility reasons, but my point was I'd prefer not to have (C) even though I see some cases where it would be useful. - Patrick On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: Is there a third way? Unless I miss something. Hadoop's OutputFormat wants the target dir to not exist no matter what, so it's just a question of whether Spark deletes it for you or errors. On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com (mailto:pwend...@gmail.com) wrote: We can just add back a flag to make it backwards compatible - it was
Re: Doubts about MLlib.linalg in python
Hi Congrui Yi, Spark is implemented in Scala, so all Scala features are first available in Scala/Java. PySpark is a python wrapper for the Scala code, so it won't always have the latest features. This is especially true for the Machine learning library. Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Doubts-about-MLlib-linalg-in-python-tp7523p7530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Question about RDD cache, unpersist, materialization
Maybe It would be nice that unpersist() ‘triggers’ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: mailto:daniel.siegm...@velos.io daniel.siegm...@velos.io W: http://www.velos.io www.velos.io
Re: How to specify executor memory in EC2 ?
Are you launching this using our EC2 scripts? Or have you set up a cluster by hand? Matei On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: spark-env.sh doesn't seem to contain any settings related to memory size :( I will continue searching for a solution and will post it if I find it :) Thank you, anyway On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com wrote: It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: running Spark Streaming just once and stop it
You should be able to see the streaming tab in the Spark web ui (running on port 4040) if you have created StreamingContext and you are using Spark 1.0 TD On Thu, Jun 12, 2014 at 1:06 AM, Ravi Hemnani raviiihemn...@gmail.com wrote: Hey, I did sparkcontext.addstreaminglistener(streaminglistener object) in my code and i am able to see some stats in the logs and cant see anything in web UI. How to add the Streaming Tab to the web UI ? I need to get queuing delays and related information. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/running-Spark-Streaming-just-once-and-stop-it-tp1382p7463.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Question about RDD cache, unpersist, materialization
FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ‘triggers’ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Friday, June 13, 2014 5:38 AM *To:* user@spark.apache.org *Subject:* Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
RE: Question about RDD cache, unpersist, materialization
Currently I use rdd.count() for forceful computation, as Nick Pentreath suggested. I think that it will be nice to have a method that forcefully computes a rdd, so that the unnecessary rdds are safely unpersist()ed. Let’s think a case that a rdd_a is a parent of both: (1) a short-term rdd_s that depends only on the rdd (transformation of rdd_a) (2) and a long-term rdd_t that depends on the rdds that may be computed later. (may be for some aggregation) Usually rdd_s is computed early, and rdd_a is computed for it. But it has to remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t is computed) It would be nice if rdd_t could be computed when rdd_s is computed, so that rdd_a can be unpersist()ed, since it will not be used anymore. (Currently I use rdd_t.count() for that) sc.prune() which was suggested in the related discussion you provided is not helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t. (Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.) From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] Sent: Friday, June 13, 2014 9:31 AM To: user Subject: Re: Question about RDD cache, unpersist, materialization FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. -
Re: use spark-shell in the source
Thanks for answer. Yes, I tried to launch an interactive REPL in the middle of my application :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-spark-shell-in-the-source-tp7453p7539.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Question about RDD cache, unpersist, materialization
(I¡¯ve clarified the statement (1) of my previous mail. See below.) From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, June 13, 2014 10:05 AM To: user@spark.apache.org Subject: RE: Question about RDD cache, unpersist, materialization Currently I use rdd.count() for forceful computation, as Nick Pentreath suggested. I think that it will be nice to have a method that forcefully computes a rdd, so that the unnecessary rdds are safely unpersist()ed. Let’s think a case that a rdd_a is a parent of both: (1) a short-term rdd_s that depends only on rdd_a (maybe rdd_s is a transformation of rdd_a) (2) and a long-term rdd_t that depends on the rdds that may be computed later. (may be for some aggregation) Usually rdd_s is computed early, and rdd_a is computed for it. But it has to remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t is computed) It would be nice if rdd_t could be computed when rdd_s is computed, so that rdd_a can be unpersist()ed, since it will not be used anymore. (Currently I use rdd_t.count() for that) sc.prune() which was suggested in the related discussion you provided is not helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t. (Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.) From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] Sent: Friday, June 13, 2014 9:31 AM To: user Subject: Re: Question about RDD cache, unpersist, materialization FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10
Re: spark EC2 bring-up problems
On Thu, Jun 12, 2014 at 9:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Hi Toby, It is usually the case that even if the EC2 console says the nodes are up, they are not really fully initialized. For 16 nodes I have found `--wait 800` to be the norm that makes things work. It seems so! resume worked fine, which fits. In my previous experience I have found this to be the culprit, so if you immediately do 'launch --resume' when you see the first SSH error it's still very likely to fail. But if you wait a little bit longer and do 'launch --resume', it could work. It did. Thankyou :-)
Re: Spark SQL - input command in web ui/event log
Yeah, we should probably add that. Feel free to file a JIRA. You can get it manually by calling sc.setJobDescription with the query text before running the query. Michael On Thu, Jun 12, 2014 at 5:49 PM, shlee0605 shlee0...@gmail.com wrote: In shark, the input SQL string was shown at the description of the web UI so that it was easy to keep track which stage is corresponding to which query. However, spark-sql does not show any information of the input SQL string. It rather shows the lower level operations to the web ui/event log. (screenshots are attached) I'm trying to find a way to match stage/or job to its input query string. Can input string information be easily added to the event-logger/or web UI? Thanks, Seunghyun http://apache-spark-user-list.1001560.n3.nabble.com/file/n7536/shark_ui.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n7536/spark-sql_ui.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-input-command-in-web-ui-event-log-tp7536.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: An attempt to implement dbscan algorithm on top of Spark
Vipul, Thanks for your feedback. As far as I understand, mean RDD[(Double, Double)] (note the parenthesis), and each of these Double values is supposed to contain one coordinate of a point. It limits us to 2-dimensional space, which is not suitable for many tasks. I want the algorithm to be able to work in multidimensional space. Actually, there is a class org.alitouka.spark.dbscan.spatial.Point in my code, which represents a point with an arbitrary number of coordinates. IOHelper.readDataset is just a convenience method which reads a CSV file and returns an RDD of Points (more precisely, it returns a value of type RawDataset, which is just an alias for RDD[Point]). If your data is stored in a format other than CSV, you will have to write your own code to convert your data to RawDataset. I can add support for other data formats in future versions. As for other distance measures - it is a high priority issue in my list ;) On Thu, Jun 12, 2014 at 6:02 PM, Vipul Pandey vipan...@gmail.com wrote: Great! I was going to implement one of my own - but I may not need to do that any more :) I haven't had a chance to look deep into your code but I would recommend accepting an RDD[Double,Double] as well, instead of just a file. val data = IOHelper.readDataset(sc, /path/to/my/data.csv) And other distance measures ofcourse. Thanks, Vipul On Jun 12, 2014, at 2:31 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Hi. I'm not sure if messages like this are appropriate in this list; I just want to share with you an application I am working on. This is my personal project which I started to learn more about Spark and Scala, and, if it succeeds, to contribute it to the Spark community. Maybe someone will find it useful. Or maybe someone will want to join development. The application is available at https://github.com/alitouka/spark_dbscan Any questions, comments, suggestions, as well as criticism are welcome :) Best regards, Aliaksei Litouka
Re: specifying fields for join()
This issue is resolved. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: When to use CombineByKey vs reduceByKey?
Matei, Thanks for the answer this clarifies this very much. Based on my usage I would use combineByKey, since the output is another custom data structures. I found out my issues with combineByKey were relieved after doing more tuning with the level of parallelism. I've found that it really depends on the size of my dataset, since I did tests for 1000, 10K, 100K, 1M data points, for now the GC issue is under control once I modified my data structures to be mutable and the key part I was missing was that all classes within it need it to be serializable Thanks! - Diana On Wed, Jun 11, 2014 at 6:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: combineByKey is designed for when your return type from the aggregation is different from the values being aggregated (e.g. you group together objects), and it should allow you to modify the leftmost argument of each function (mergeCombiners, mergeValue, etc) and return that instead of allocating a new object. So it should work with mutable objects — please post what problems you had with that. reduceByKey actually also allows this if your types are the same. Matei On Jun 11, 2014, at 3:21 PM, Diana Hu siyin...@gmail.com wrote: Hello all, I've seen some performance improvements using combineByKey as opposed to reduceByKey or a groupByKey+map function. I have a couple questions. it'd be great if any one can provide some light into this. 1) When should I use combineByKey vs reduceByKey? 2) Do the containers need to be immutable for combineByKey? I've created custom data structures for the containers, one mutable and one immutable. The tests with the mutable containers, spark crashed with an error on missing references. However the downside of immutable containers (which works on my tests), is that for large datasets the garbage collector gets called many more times, and it tends to run out of heap space as the GC can't catch up. I tried some of the tips here http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the JVM params, but this seems to be too much tuning? Thanks in advance, - Diana
Re: Spilled shuffle files not being cleared
Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
RE: Spilled shuffle files not being cleared
Hi Michael, I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata cleaner, which will clean old un-used shuffle data when it is timeout. For Spark 1.0 another way is to clean shuffle data using weak reference (reference tracking based, configuration is spark.cleaner.referenceTracking), and it is enabled by default. Thanks Saisai From: Michael Chang [mailto:m...@tellapart.com] Sent: Friday, June 13, 2014 10:15 AM To: user@spark.apache.org Subject: Re: Spilled shuffle files not being cleared Bump On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.commailto:m...@tellapart.com wrote: Hi all, I'm seeing exceptions that look like the below in Spark 0.9.1. It looks like I'm running out of inodes on my machines (I have around 300k each in a 12 machine cluster). I took a quick look and I'm seeing some shuffle spill files that are around even around 12 minutes after they are created. Can someone help me understand when these shuffle spill files should be cleaned up (Is it as soon as they are used?) Thanks, Michael java.io.FileNotFoundException: /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0 (No space left on device) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7) 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
Re: wholeTextFiles not working with HDFS
Hi Sguj, Could you give me the exception stack? I test it on my laptop and find that it gets the wrong FileSystem. It should be DistributedFileSystem, but it finds the RawLocalFileSystem. If we get the same exception stack, I'll try to fix it. Here is my exception stack: java.io.FileNotFoundException: File /sen/reuters-out/reut2-000.sgm-0.txt does not exist. at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:397) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:173) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:201) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097) at org.apache.spark.rdd.RDD.collect(RDD.scala:728) Besides, what's your hadoop version? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7548.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to specify executor memory in EC2 ?
The scripts for Spark 1.0 actually specify this property in /root/spark/conf/spark-defaults.conf I didn't know that this would override the --executor-memory flag, though, that's pretty odd. On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Yes, I am launching a cluster with the spark_ec2 script. I checked /root/spark/conf/spark-env.sh on the master node and on slaves and it looks like this: #!/usr/bin/env bash export SPARK_LOCAL_DIRS=/mnt/spark # Standalone cluster options export SPARK_MASTER_OPTS= export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=1 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=ec2-54-89-95-238.compute-1.amazonaws.com export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf # Bind Spark's web UIs to this machine's public EC2 hostname: export SPARK_PUBLIC_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname` http://169.254.169.254/latest/meta-data/public-hostname # Set a high ulimit for large shuffles ulimit -n 100 None of these variables seem to be related to memory size. Let me know if I am missing something. On Thu, Jun 12, 2014 at 7:17 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you launching this using our EC2 scripts? Or have you set up a cluster by hand? Matei On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: spark-env.sh doesn't seem to contain any settings related to memory size :( I will continue searching for a solution and will post it if I find it :) Thank you, anyway On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com wrote: It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: Spark 1.0.0 Standalone AppClient cannot connect Master
Hi, Andrew Got it, Thanks! Hao Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Fri, Jun 13, 2014 at 12:42 AM, Andrew Or and...@databricks.com wrote: Hi Wang Hao, This is not removed. We moved it here: http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html If you're building with SBT, and you don't specify the SPARK_HADOOP_VERSION, then it defaults to 1.0.4. Andrew 2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com: Hi, all Why does the Spark 1.0.0 official doc remove how to build Spark with corresponding Hadoop version? It means that if I don't need to specify the Hadoop version with I build my Spark 1.0.0 with `sbt/sbt assembly`? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
multiple passes in mapPartitions
I want to take multiple passes through my data in mapPartitions. However, the iterator only allows you to take one pass through the data. If I transformed the iterator into an array using iter.toArray, it is too slow, since it copies all the data into a new scala array. Also it takes twice the memory. Which is also bad in terms of more GC. Is there a faster/better way of taking multiple passes without copying all the data? Thank you, Zhen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555.html Sent from the Apache Spark User List mailing list archive at Nabble.com.