Re: How to achieve reasonable performance on Spark Streaming?

2014-06-12 Thread Boduo Li
It seems that the slow reduce tasks are caused by slow shuffling. Here is
the logs regarding one slow reduce task:

14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_88_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_89_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_90_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_91_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_92_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_93_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_94_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_95_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_96_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_97_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_188_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_189_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_190_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_191_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_192_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_193_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_194_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_195_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_196_18 after  5029 ms
14/06/11 23:42:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Got
remote block shuffle_69_197_18 after  5029 ms
14/06/11 23:42:45 INFO Executor: Serialized size of result for 23643 is 1143
14/06/11 23:42:45 INFO Executor: Sending result for 23643 directly to driver
14/06/11 23:42:45 INFO Executor: Finished task ID 23643




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-achieve-reasonable-performance-on-Spark-Streaming-tp7262p7454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


shuffling using netty in spark streaming

2014-06-12 Thread onpoq l
Hi,

1. Does netty perform better than the basic method for shuffling? I found
the latency caused by shuffling in a streaming job is not stable with the
basic method.

2. However, after I turn on netty for shuffling, I can only see the results
for the first two batches, and then no output at all. I'm not sure whether
the way I turn on netty is correct:

val conf = new SparkConf().set(spark.shuffle.use.netty, true)

Thanks.

Boduo Li


Re: Using Spark to crack passwords

2014-06-12 Thread Michael Cutler
Hi Nick,

The great thing about any *unsalted* hashes is you can precompute them
ahead of time, then it is just a lookup to find the password which matches
the hash in seconds -- always makes for a more exciting demo than come
back in a few hours.

It is a no-brainer to write a generator function to create all possible
passwords from a charset like 
abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash them
and store them to lookup later.  It is however incredibly wasteful on
storage space.

- all passwords from 1 to 9 letters long
- using the charset above = 13,759,005,997,841,642 passwords
- assuming 20 bytes to store the SHA-1 and up to 9 to store the password
equals approximately 375.4 Petabytes

Thankfully there is already a more efficient/compact mechanism to achieve
this using Rainbow Tables http://en.wikipedia.org/wiki/Rainbow_table --
better still, there is an active community of people who have already
precomputed many of these datasets already.  The above dataset is readily
available to download and is just 864GB -- much more feasible.

All you need to do then is write a rainbow-table lookup function in Spark
and leverage the precomputed files stored in HDFS.  Done right you should
be able to achieve interactive (few second) lookups.

Have fun!

MC


*Michael Cutler*
Founder, CTO


*Mobile: +44 789 990 7847Email:   mich...@tumra.com mich...@tumra.comWeb:
tumra.com http://tumra.com/?utm_source=signatureutm_medium=email*
*Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq*
*Registered in England  Wales, 07916412. VAT No. 130595328*


This email and any files transmitted with it are confidential and may also
be privileged. It is intended only for the person to whom it is addressed.
If you have received this email in error, please inform the sender immediately.
If you are not the intended recipient you must not use, disclose, copy,
print, distribute or rely on this email.


On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote:

 Spark is obviously well-suited to crunching massive amounts of data. How
 about to crunch massive amounts of numbers?

 A few years ago I put together a little demo for some co-workers to
 demonstrate the dangers of using SHA1
 http://codahale.com/how-to-safely-store-a-password/ to hash and store
 passwords. Part of the demo included a live brute-forcing of hashes to show
 how SHA1's speed made it unsuitable for hashing passwords.

 I think it would be cool to redo the demo, but utilize the power of a
 cluster managed by Spark to crunch through hashes even faster.

 But how would you do that with Spark (if at all)?

 I'm guessing you would create an RDD that somehow defined the search space
 you're going to go through, and then partition it to divide the work up
 equally amongst the cluster's cores. Does that sound right?

 I wonder if others have already used Spark for computationally-intensive
 workloads like this, as opposed to just data-intensive ones.

 Nick


 --
 View this message in context: Using Spark to crack passwords
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Spark SQL incorrect result on GROUP BY query

2014-06-12 Thread Pei-Lun Lee
I reran with master and looks like it is fixed.



2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com:

 I'd try rerunning with master.  It is likely you are running into
 SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994.

 Michael


 On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
 give weird results.
 To reproduce, type the following commands in spark-shell connecting to a
 standalone server:

 case class Foo(k: String, v: Int)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
 List.fill(300)(Foo(c, 3))
 sc.makeRDD(rows).registerAsTable(foo)
 sql(select k,count(*) from foo group by k).collect

 the result will be something random like:
 res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
 [c,270], [4,56], [1,1])

 and if I run the same query again, the new result will be correct:
 sql(select k,count(*) from foo group by k).collect
 res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

 Should I file a bug?

 --
 Pei-Lun Lee





Re: Spark SQL incorrect result on GROUP BY query

2014-06-12 Thread Michael Armbrust
Thanks for verifying!


On Thu, Jun 12, 2014 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote:

 I reran with master and looks like it is fixed.



 2014-06-12 1:26 GMT+08:00 Michael Armbrust mich...@databricks.com:

 I'd try rerunning with master.  It is likely you are running into
 SPARK-1994 https://issues.apache.org/jira/browse/SPARK-1994.

 Michael


 On Wed, Jun 11, 2014 at 3:01 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 I am using spark 1.0.0 and found in spark sql some queries use GROUP BY
 give weird results.
 To reproduce, type the following commands in spark-shell connecting to a
 standalone server:

 case class Foo(k: String, v: Int)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val rows = List.fill(100)(Foo(a, 1)) ++ List.fill(200)(Foo(b, 2)) ++
 List.fill(300)(Foo(c, 3))
 sc.makeRDD(rows).registerAsTable(foo)
 sql(select k,count(*) from foo group by k).collect

 the result will be something random like:
 res1: Array[org.apache.spark.sql.Row] = Array([b,180], [3,18], [a,75],
 [c,270], [4,56], [1,1])

 and if I run the same query again, the new result will be correct:
 sql(select k,count(*) from foo group by k).collect
 res2: Array[org.apache.spark.sql.Row] = Array([b,200], [a,100], [c,300])

 Should I file a bug?

 --
 Pei-Lun Lee






Re: Using Spark to crack passwords

2014-06-12 Thread Marek Wiewiorka
This actually what I've already mentioned -  with rainbow tables kept in
memory it could be really fast!

Marek


2014-06-12 9:25 GMT+02:00 Michael Cutler mich...@tumra.com:

 Hi Nick,

 The great thing about any *unsalted* hashes is you can precompute them
 ahead of time, then it is just a lookup to find the password which matches
 the hash in seconds -- always makes for a more exciting demo than come
 back in a few hours.

 It is a no-brainer to write a generator function to create all possible
 passwords from a charset like 
 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash
 them and store them to lookup later.  It is however incredibly wasteful on
 storage space.

 - all passwords from 1 to 9 letters long
 - using the charset above = 13,759,005,997,841,642 passwords
 - assuming 20 bytes to store the SHA-1 and up to 9 to store the password
  equals approximately 375.4 Petabytes

 Thankfully there is already a more efficient/compact mechanism to achieve
 this using Rainbow Tables http://en.wikipedia.org/wiki/Rainbow_table --
 better still, there is an active community of people who have already
 precomputed many of these datasets already.  The above dataset is readily
 available to download and is just 864GB -- much more feasible.

 All you need to do then is write a rainbow-table lookup function in Spark
 and leverage the precomputed files stored in HDFS.  Done right you should
 be able to achieve interactive (few second) lookups.

 Have fun!

 MC


  *Michael Cutler*
 Founder, CTO


 * Mobile: +44 789 990 7847 Email:   mich...@tumra.com mich...@tumra.com
 Web: tumra.com
 http://tumra.com/?utm_source=signatureutm_medium=email *
 *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq*
 *Registered in England  Wales, 07916412. VAT No. 130595328 130595328*


 This email and any files transmitted with it are confidential and may also
 be privileged. It is intended only for the person to whom it is addressed.
 If you have received this email in error, please inform the sender 
 immediately.
 If you are not the intended recipient you must not use, disclose, copy,
 print, distribute or rely on this email.


 On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote:

 Spark is obviously well-suited to crunching massive amounts of data. How
 about to crunch massive amounts of numbers?

 A few years ago I put together a little demo for some co-workers to
 demonstrate the dangers of using SHA1
 http://codahale.com/how-to-safely-store-a-password/ to hash and store
 passwords. Part of the demo included a live brute-forcing of hashes to show
 how SHA1's speed made it unsuitable for hashing passwords.

 I think it would be cool to redo the demo, but utilize the power of a
 cluster managed by Spark to crunch through hashes even faster.

 But how would you do that with Spark (if at all)?

 I'm guessing you would create an RDD that somehow defined the search
 space you're going to go through, and then partition it to divide the work
 up equally amongst the cluster's cores. Does that sound right?

 I wonder if others have already used Spark for computationally-intensive
 workloads like this, as opposed to just data-intensive ones.

 Nick


 --
 View this message in context: Using Spark to crack passwords
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





How to read a snappy-compressed text file?

2014-06-12 Thread innowireless TaeYun Kim
Hi,

 

Maybe this is a newbie question: How to read a snappy-compressed text file?

 

The OS is Windows 7.

Currently, I've done the following steps:

 

1. Built Hadoop 2.4.0 with snappy option.

'hadoop checknative' command displays the following line:

snappy: true D:\hadoop-2.4.0\bin\snappy.dll

So, I assume hadoop can do snappy compression.

BTW, snapp.dll was copied from snapp64.dll file in snappy-windows-1.1.1.8.

 

2. Added the following configurations to both core-site.xml and
yarn-site.xml.

property

nameio.compression.codecs/name

valueorg.apache.hadoop.io.compress.SnappyCodec/value

/property

 

3. Added the following environment variable.

SPARK_LIBRARY_PATH=D:\hadoop-2.4.0\bin

Since I use IntelliJ, the above line was included to the Environment
variables section in Run Configuration.

 

4. Compressed the input text file with snzip.exe which was included in
snappy-windows-1.1.1.8.

 

4. Wrote the code.

sc.textFile(compressed_file_name);  // no other argument.
.map(.)

 

Now when I run my spark application, the results are as follows:

 

1. 'snappy' string cannot be found in DEBUG log.

The most relevant logs are as follows:

14/06/12 18:57:55 DEBUG NativeCodeLoader: Trying to load the custom-built
native-hadoop library...

14/06/12 18:57:55 DEBUG NativeCodeLoader: Loaded the native-hadoop library

2. Application fails. The log is as follows:

14/06/12 18:57:57 WARN: int from string failed for: [(some binary
characters)]

 

So apparently sc.textFile() does not recognize the file format and read it
as-is, so map function receives a garbage.

 

How can I fix this?

 

Thanks.

 



Re: how to set spark.executor.memory and heap size

2014-06-12 Thread Laurent T
Hi,

Can you give us a little more insight on how you used that file to solve
your problem ?
We're having the same OOM as you were and haven't been able to solve it yet.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p7469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: initial basic question from new user

2014-06-12 Thread Gerard Maas
The goal of rdd.persist is to created a cached rdd that breaks the DAG
lineage. Therefore, computations *in the same job* that use that RDD can
re-use that intermediate result, but it's not meant to survive between job
runs.

for example:

val baseData = rawDataRdd.map(...).flatMap(...).reduceByKey(...).persist
val metric1 = baseData.flatMap(op1).reduceByKey.collect
val metric2 = baseData.flatMap(op2).reduceByKey.collect

Without persist, computing metric1 and metric2 would trigger the
computation starting from rawData. With persist, both metric1 and metric2
will start from the intermediate result (baseData)

If you need to ad-hoc persist to files, you can can save RDDs using
rdd.saveAsObjectFile(...) [1] and load them afterwards using
sparkContext.objectFile(...)
If you want to preserve the RDDs in memory between job runs, you should
look at the Spark-JobServer [3]

[1]
https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

[2]
https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.SparkContext

[3] https://github.com/ooyala/spark-jobserver



On Thu, Jun 12, 2014 at 11:24 AM, Toby Douglass t...@avocet.io wrote:

 Gents,

 I am investigating Spark with a view to perform reporting on a large data
 set, where the large data set receives additional data in the form of log
 files on an hourly basis.

 Where the data set is large there is a possibility we will create a range
 of aggregate tables, to reduce the volume of data which has to be processed.

 Having spent a little while reading up about Spark, my thought was that I
 could create an RDD which is an agg, persist this to disk, have reporting
 queries run against that RDD and when new data arrives, convert the new log
 file into an agg and add that to the agg RDD.

 However, I begin now to get the impression that RDDs cannot be persisted
 across jobs - I can generate an RDD, I can persist it, but I can see no way
 for a later job to load a persisted RDD (and I begin to think it will have
 been GCed anyway, at the end of the first job).  Is this correct?





Re: HDFS Server/Client IPC version mismatch while trying to access HDFS files using Spark-0.9.1

2014-06-12 Thread bijoy deb
Hi,

The problem was due to a pre-built/binary Tachyon-0.4.1 jar in the
SPARK_CLASSPATH, and that Tachyon jar had been built against
Hadoop-1.0.4.Building the Tachyon against Hadoop-2.0.0 resolved the issue.

Thanks


On Wed, Jun 11, 2014 at 11:34 PM, Marcelo Vanzin van...@cloudera.com
wrote:

 The error is saying that your client libraries are older than what
 your server is using (2.0.0-mr1-cdh4.6.0 is IPC version 7).

 Try double-checking that your build is actually using that version
 (e.g., by looking at the hadoop jar files in lib_managed/jars).

 On Wed, Jun 11, 2014 at 2:07 AM, bijoy deb bijoy.comput...@gmail.com
 wrote:
  Any suggestions from anyone?
 
  Thanks
  Bijoy
 
 
  On Tue, Jun 10, 2014 at 11:46 PM, bijoy deb bijoy.comput...@gmail.com
  wrote:
 
  Hi all,
 
  I have build Shark-0.9.1 using sbt using the below command:
 
  SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.6.0 sbt/sbt assembly
 
  My Hadoop cluster is also having version 2.0.0-mr1-cdh4.6.0.
 
  But when I try to execute the below command from Spark shell,which
 reads a
  file from HDFS, I get the IPC version mismatch- IPC version 7 on server
  versus IPC version 4 on client error on
 org.apache.hadoop.hdfs.DFSClient
  class.
 
 
  scala val s = sc.textFile(hdfs://host:port/test.txt)
  scala s.count()
  14/06/10 23:42:59 WARN util.NativeCodeLoader: Unable to load
 native-hadoop
  library for your platform... using builtin-java classes where applicable
  14/06/10 23:42:59 WARN snappy.LoadSnappy: Snappy native library not
 loaded
  org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
  communicate with client version 4
  at org.apache.hadoop.ipc.Client.call(Client.java:1070)
  at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
  at com.sun.proxy.$Proxy9.getProtocolVersion(Unknown Source)
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
  at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
  at
  org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
  at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
  at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
  at
 
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
  at
  org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
  at
 
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
  at
 
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 
 
  Apparently this error is because of version mismatch of the hadoop-hdfs
  jar between client (one referred by Spark) and server(hadoop
 cluster).But
  what I don't understand is why is this mismatch (since I had built Spark
  with the correct Hadoop version).
 
  Any suggestions would be highly appreciated.
 
  Thanks
  Bijoy
 
 



 --
 Marcelo



Re: pmml with augustus

2014-06-12 Thread filipus
@villu: thank you for your help. In prommis I gonna try it! thats cools :-)
do you know also the other way around from pmml to a model object in spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pmml-with-augustus-tp7313p7473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Writing data to HBase using Spark

2014-06-12 Thread gaurav.dasgupta
Is there anyone else who is facing this problem of writing to HBase when
running Spark on YARN mode or Standalone mode using this example?

If not, then do I need to explicitly, specify something in the classpath?

Regards,
Gaurav


On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta gaurav.d...@gmail.com
wrote:

 Hi Kanwaldeep,

 I have tried your code but arrived into a problem. The code is working
 fine in *local* mode. But if I run the same code in Spark stand alone
 mode or YARN mode, then it is continuously executing, but not saving
 anything in the HBase table. I guess, it is stopping data streaming once
 the *saveToHBase* method is called for the first time.

 This is strange. I just want to know whether you have tested the code on
 all Spark execution modes?

 Thanks,
 Gaurav


 On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List]
 ml-node+s1001560n7305...@n3.nabble.com wrote:

 Please see sample code attached at
 https://issues.apache.org/jira/browse/SPARK-944.


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Z2F1cmF2LmRnMTlAZ21haWwuY29tfDF8LTk5NzA0ODAy
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How to use SequenceFileRDDFunctions.saveAsSequenceFile() in Java?

2014-06-12 Thread innowireless TaeYun Kim
Hi,

 

How to use SequenceFileRDDFunctions.saveAsSequenceFile() in Java?

A simple example will be a great help.

 

Thanks in advance.

 



RE: shuffling using netty in spark streaming

2014-06-12 Thread Shao, Saisai
Hi,


1.  The performance is based on your hardware and system configurations, 
you can test it yourself. In my test, the two shuffle implementations have no 
special performance difference in latest version.

2.  That’s correct to turn on netty based shuffle, and there’s no shuffle 
fetch related metrics in netty based shuffle, so you may not see the shuffle 
fetch related metrics in web portal.


Thanks
Jerry

From: onpoq l [mailto:onpo...@gmail.com]
Sent: Thursday, June 12, 2014 2:35 PM
To: user@spark.apache.org
Subject: shuffling using netty in spark streaming

Hi,

1. Does netty perform better than the basic method for shuffling? I found the 
latency caused by shuffling in a streaming job is not stable with the basic 
method.

2. However, after I turn on netty for shuffling, I can only see the results for 
the first two batches, and then no output at all. I'm not sure whether the way 
I turn on netty is correct:

val conf = new SparkConf().set(spark.shuffle.use.netty, true)

Thanks.

Boduo Li


Spark 1.0.0 Standalone AppClient cannot connect Master

2014-06-12 Thread Hao Wang
Hi, all

Why does the Spark 1.0.0 official doc remove how to build Spark with
corresponding Hadoop version?

It means that if I don't need to specify the Hadoop version with I build my
Spark 1.0.0 with `sbt/sbt assembly`?


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: initial basic question from new user

2014-06-12 Thread Christopher Nguyen
Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want
for your use case. As for Parquet support, that's newly arrived in Spark
1.0.0 together with SparkSQL so continue to watch this space.

Gerard's suggestion to look at JobServer, which you can generalize as
building a long-running application which allows multiple clients to
load/share/persist/save/collaborate-on RDDs satisfies a larger, more
complex use case. That is indeed the job of a higher-level application,
subject to a wide variety of higher-level design choices. A number of us
have successfully built Spark-based apps around that model.
--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass t...@avocet.io wrote:

 On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 The goal of rdd.persist is to created a cached rdd that breaks the DAG
 lineage. Therefore, computations *in the same job* that use that RDD can
 re-use that intermediate result, but it's not meant to survive between job
 runs.


 As I understand it, Spark is designed for interactive querying, in the
 sense that the caching of intermediate results eliminates the need to
 recompute those results.

 However, if intermediate results last only for the duration of a job (e.g.
 say a python script), how exactly is interactive querying actually
 performed?   a script is not an interactive medium.  Is the shell the only
 medium for interactive querying?

 Consider a common usage case : a web-site, which offers reporting upon a
 large data set.  Users issue arbitrary queries.  A few queries (just with
 different arguments) dominate the query load, so we thought to create
 intermediate RDDs to service those queries, so only those order of
 magnitude or smaller RDDs would need to be processed.  Where this is not
 possible, we can only use Spark for reporting by issuing each query over
 the whole data set - e.g. Spark is just like Impala is just like Presto is
 just like [nnn].  The enourmous benefit of RDDs - the entire point of Spark
 - so profoundly useful here - is not available.  What a huge and unexpected
 loss!  Spark seemingly renders itself ordinary.  It is for this reason I am
 surprised to find this functionality is not available.


 If you need to ad-hoc persist to files, you can can save RDDs using
 rdd.saveAsObjectFile(...) [1] and load them afterwards using
 sparkContext.objectFile(...)


 I've been using this site for docs;

 http://spark.apache.org

 Here we find through the top-of-the-page menus the link API Docs -
 Python API which brings us to;

 http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html

 Where this page does not show the function saveAsObjectFile().

 I find now from your link here;


 https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

 What appears to be a second and more complete set of the same
 documentation, using a different web-interface to boot.

 It appears at least that there are two sets of documentation for the same
 APIs, where one set is out of the date and the other not, and the out of
 date set is that which is linked to from the main site?

 Given that our agg sizes will exceed memory, we expect to cache them to
 disk, so save-as-object (assuming there are no out of the ordinary
 performance issues) may solve the problem, but I was hoping to store data
 is a column orientated format.  However I think this in general is not
 possible - Spark can *read* Parquet, but I think it cannot write Parquet as
 a disk-based RDD format.

 If you want to preserve the RDDs in memory between job runs, you should
 look at the Spark-JobServer [3]


 Thankyou.

 I view this with some trepidation.  It took two man-days to get Spark
 running (and I've spent another man day now trying to get a map/reduce to
 run; I'm getting there, but not there yet) - the bring-up/config experience
 for end-users is not tested or accurated documented (although to be clear,
 no better and no worse than is normal for open source; Spark is not
 exceptional).  Having to bring up another open source project is a
 significant barrier to entry; it's always such a headache.

 The save-to-disk function you mentioned earlier will allow intermediate
 RDDs to go to disk, but we do in fact have a use case where in-memory would
 be useful; it might allow us to ditch Cassandra, which would be wonderful,
 since it would reduce the system count by one.

 I have to say, having to install JobServer to achieve this one end seems
 an extraordinarily heavyweight solution - a whole new application, when all
 that is wished for is that Spark persists RDDs across jobs, where so small
 a feature seems to open the door to so much functionality.





Re: initial basic question from new user

2014-06-12 Thread FRANK AUSTIN NOTHAFT
RE:

 Given that our agg sizes will exceed memory, we expect to cache them to
disk, so save-as-object (assuming there are no out of the ordinary
performance issues) may solve the problem, but I was hoping to store data
is a column orientated format.  However I think this in general is not
possible - Spark can *read* Parquet, but I think it cannot write Parquet as
a disk-based RDD format.

Spark can write Parquet, via the ParquetOutputFormat which is distributed
from Parquet. If you'd like example code for writing out to Parquet, please
see the adamSave function in
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala,
starting at line 62. There is a bit of setup necessary for the Parquet
write codec, but otherwise it is fairly straightforward.

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466


On Thu, Jun 12, 2014 at 7:03 AM, Christopher Nguyen c...@adatao.com wrote:

 Toby, #saveAsTextFile() and #saveAsObjectFile() are probably what you want
 for your use case. As for Parquet support, that's newly arrived in Spark
 1.0.0 together with SparkSQL so continue to watch this space.

 Gerard's suggestion to look at JobServer, which you can generalize as
 building a long-running application which allows multiple clients to
 load/share/persist/save/collaborate-on RDDs satisfies a larger, more
 complex use case. That is indeed the job of a higher-level application,
 subject to a wide variety of higher-level design choices. A number of us
 have successfully built Spark-based apps around that model.
 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Thu, Jun 12, 2014 at 4:35 AM, Toby Douglass t...@avocet.io wrote:

 On Thu, Jun 12, 2014 at 11:36 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 The goal of rdd.persist is to created a cached rdd that breaks the DAG
 lineage. Therefore, computations *in the same job* that use that RDD can
 re-use that intermediate result, but it's not meant to survive between job
 runs.


 As I understand it, Spark is designed for interactive querying, in the
 sense that the caching of intermediate results eliminates the need to
 recompute those results.

 However, if intermediate results last only for the duration of a job
 (e.g. say a python script), how exactly is interactive querying actually
 performed?   a script is not an interactive medium.  Is the shell the only
 medium for interactive querying?

 Consider a common usage case : a web-site, which offers reporting upon a
 large data set.  Users issue arbitrary queries.  A few queries (just with
 different arguments) dominate the query load, so we thought to create
 intermediate RDDs to service those queries, so only those order of
 magnitude or smaller RDDs would need to be processed.  Where this is not
 possible, we can only use Spark for reporting by issuing each query over
 the whole data set - e.g. Spark is just like Impala is just like Presto is
 just like [nnn].  The enourmous benefit of RDDs - the entire point of Spark
 - so profoundly useful here - is not available.  What a huge and unexpected
 loss!  Spark seemingly renders itself ordinary.  It is for this reason I am
 surprised to find this functionality is not available.


 If you need to ad-hoc persist to files, you can can save RDDs using
 rdd.saveAsObjectFile(...) [1] and load them afterwards using
 sparkContext.objectFile(...)


 I've been using this site for docs;

 http://spark.apache.org

 Here we find through the top-of-the-page menus the link API Docs -
 Python API which brings us to;

 http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html

 Where this page does not show the function saveAsObjectFile().

 I find now from your link here;


 https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

 What appears to be a second and more complete set of the same
 documentation, using a different web-interface to boot.

 It appears at least that there are two sets of documentation for the same
 APIs, where one set is out of the date and the other not, and the out of
 date set is that which is linked to from the main site?

 Given that our agg sizes will exceed memory, we expect to cache them to
 disk, so save-as-object (assuming there are no out of the ordinary
 performance issues) may solve the problem, but I was hoping to store data
 is a column orientated format.  However I think this in general is not
 possible - Spark can *read* Parquet, but I think it cannot write Parquet as
 a disk-based RDD format.

 If you want to preserve the RDDs in memory between job runs, you should
 look at the Spark-JobServer [3]


 Thankyou.

 I view this with some trepidation.  It took two man-days to get Spark
 running (and I've spent another man day now trying to get a map/reduce to
 run; I'm getting there, but not there yet) - the bring-up/config experience
 for end-users is not 

Re: Using Spark to crack passwords

2014-06-12 Thread Sean Owen
You need a use case where a lot of computation is applied to a little data.
How about any of the various distributed computing projects out there?
Although the SETI@home use case seems like a cool example, I doubt you want
to reimplement its client.

It might be far simpler to reimplement a search for Mersenne primes or
optimal Golomb rulers or something. Although you're not going to get great
speed through the JVM, it may be just fine as an example. And it stands
some remote chance of getting useful work done.


On Thu, Jun 12, 2014 at 11:11 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Indeed, rainbow tables are helpful for working on unsalted hashes. They
 turn a large amount of computational work into a bit of computational work
 and a bit of lookup work. The rainbow tables could easily be captured as
 RDDs.

 I guess I derailed my own discussion by focusing on password cracking,
 since my intention was to explore how Spark applications are written for
 compute-intensive workloads as opposed to data intensive ones. And for
 certain types of password cracking, the best approach is to turn compute
 work into data work. :)



 On Thu, Jun 12, 2014 at 5:32 AM, Marek Wiewiorka 
 marek.wiewio...@gmail.com wrote:

 This actually what I've already mentioned -  with rainbow tables kept in
 memory it could be really fast!

 Marek


 2014-06-12 9:25 GMT+02:00 Michael Cutler mich...@tumra.com:

 Hi Nick,

 The great thing about any *unsalted* hashes is you can precompute them
 ahead of time, then it is just a lookup to find the password which matches
 the hash in seconds -- always makes for a more exciting demo than come
 back in a few hours.

 It is a no-brainer to write a generator function to create all possible
 passwords from a charset like 
 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789, hash
 them and store them to lookup later.  It is however incredibly wasteful on
 storage space.

 - all passwords from 1 to 9 letters long
 - using the charset above = 13,759,005,997,841,642 passwords
 - assuming 20 bytes to store the SHA-1 and up to 9 to store the password
  equals approximately 375.4 Petabytes

 Thankfully there is already a more efficient/compact mechanism to
 achieve this using Rainbow Tables
 http://en.wikipedia.org/wiki/Rainbow_table -- better still, there is
 an active community of people who have already precomputed many of these
 datasets already.  The above dataset is readily available to download and
 is just 864GB -- much more feasible.

 All you need to do then is write a rainbow-table lookup function in
 Spark and leverage the precomputed files stored in HDFS.  Done right you
 should be able to achieve interactive (few second) lookups.

 Have fun!

 MC


  *Michael Cutler*
 Founder, CTO


 * Mobile: +44 789 990 7847 Email:   mich...@tumra.com
 mich...@tumra.com Web: tumra.com
 http://tumra.com/?utm_source=signatureutm_medium=email *
 *Visit us at our offices in Chiswick Park http://goo.gl/maps/abBxq*
 *Registered in England  Wales, 07916412. VAT No. 130595328 130595328*


 This email and any files transmitted with it are confidential and may
 also be privileged. It is intended only for the person to whom it is
 addressed. If you have received this email in error, please inform the
 sender immediately. If you are not the intended recipient you must not
 use, disclose, copy, print, distribute or rely on this email.


 On 12 June 2014 01:24, Nick Chammas nicholas.cham...@gmail.com wrote:

 Spark is obviously well-suited to crunching massive amounts of data.
 How about to crunch massive amounts of numbers?

 A few years ago I put together a little demo for some co-workers to
 demonstrate the dangers of using SHA1
 http://codahale.com/how-to-safely-store-a-password/ to hash and
 store passwords. Part of the demo included a live brute-forcing of hashes
 to show how SHA1's speed made it unsuitable for hashing passwords.

 I think it would be cool to redo the demo, but utilize the power of a
 cluster managed by Spark to crunch through hashes even faster.

 But how would you do that with Spark (if at all)?

 I'm guessing you would create an RDD that somehow defined the search
 space you're going to go through, and then partition it to divide the work
 up equally amongst the cluster's cores. Does that sound right?

 I wonder if others have already used Spark for
 computationally-intensive workloads like this, as opposed to just
 data-intensive ones.

 Nick


 --
 View this message in context: Using Spark to crack passwords
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-crack-passwords-tp7437.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.







Re: initial basic question from new user

2014-06-12 Thread Andre Schumacher

Hi,

On 06/12/2014 05:47 PM, Toby Douglass wrote:

 In these future jobs, when I come to load the aggregted RDD, will Spark
 load and only load the columns being accessed by the query?  or will Spark
 load everything, to convert it into an internal representation, and then
 execute the query?

The aforementioned native Parquet support in Spark 1.0 supports column
projections which means only the columns that appear in the query will
be loaded. The next release will also support record filters for simple
pruning predicates (int-column smaller value and such). This is
different from using a Hadoop Input/Output format and requires no
additional setup (jars in classpath and such).

For more details see:

http://spark.apache.org/docs/latest/sql-programming-guide.html#using-parquet

Andre


Re: HELP!? Re: Streaming trouble (reduceByKey causes all printing to stop)

2014-06-12 Thread Michael Campbell
I don't know if this matters, but I'm looking at the web site that spark
puts up, and I see under the streaming tab:


   - *Started at: *Thu Jun 12 11:42:10 EDT 2014
   - *Time since start: *6 minutes 3 seconds
   - *Network receivers: *1
   - *Batch interval: *5 seconds
   - *Processed batches: *0
   - *Waiting batches: *1




Why would a batch be waiting for long over my batch time of 5 seconds?


On Thu, Jun 12, 2014 at 10:18 AM, Michael Campbell 
michael.campb...@gmail.com wrote:

 Ad... it's NOT working.

 Here's the code:

 val bytes = kafkaStream.map({ case (key, messageBytes) =
 messageBytes}) // Map to just get the bytes part out...
 val things = bytes.flatMap(bytesArrayToThings) // convert to a
 thing
 val srcDestinations = things.map(thing =
 (ipToString(thing.getSourceIp), Set(ipToString(thing.getDestinationIp
 // up to this point works fine.

 // this fails to print
 val srcDestinationSets = srcDestinations.reduceByKey((exist:
 Set[String], addl: Set[String]) = exist ++ addl)


 What it does...

 From a kafka message containing many things, convert the message to an
 array of said things, flatMap them out to a stream of 1 thing at a
 time, pull out and make a Tuple of a (SourceIP, DestinationIP).

 ALL THAT WORKS.  If I do a srcDestinations.print() I get output like the
 following, every 5 seconds, which is my batch size.

 ---
 Time: 140258200 ms
 ---
 (10.30.51.216,Set(10.20.1.1))
 (10.20.11.3,Set(10.10.61.98))
 (10.20.11.3,Set(10.10.61.95))
 ...



 What I want is a SET of (sourceIP - Set(all the destination Ips))  Using
 a set because as you can see above, the same source may have the same
 destination multiple times and I want to eliminate dupes on the destination
 side.

 When I call the reduceByKey() method, I never get any output.  When I do a
 srcDestinationSets.print()  NOTHING EVER PRINTS.  Ever.  Never.

 What am I doing wrong?  (The same happens for reduceByKeyAndWindow(...,
 Seconds(5)).)

 I'm sure this is something I've done, but I cannot figure out what it was.

 Help, please?





Re: initial basic question from new user

2014-06-12 Thread Toby Douglass
On Thu, Jun 12, 2014 at 4:48 PM, Andre Schumacher 
schum...@icsi.berkeley.edu wrote:

 On 06/12/2014 05:47 PM, Toby Douglass wrote:

  In these future jobs, when I come to load the aggregted RDD, will Spark
  load and only load the columns being accessed by the query?  or will
 Spark
  load everything, to convert it into an internal representation, and then
  execute the query?

 The aforementioned native Parquet support in Spark 1.0 supports column
 projections which means only the columns that appear in the query will
 be loaded.


[snip]

Thankyou!


Re: Writing data to HBase using Spark

2014-06-12 Thread Mayur Rustagi
Are you able to use HadoopInputoutput reader for hbase in new hadoop Api
reader?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jun 12, 2014 at 7:49 AM, gaurav.dasgupta gaurav.d...@gmail.com
wrote:

 Is there anyone else who is facing this problem of writing to HBase when
 running Spark on YARN mode or Standalone mode using this example?

 If not, then do I need to explicitly, specify something in the classpath?

 Regards,
 Gaurav


 On Wed, Jun 11, 2014 at 1:53 PM, Gaurav Dasgupta [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7474i=0 wrote:

 Hi Kanwaldeep,

 I have tried your code but arrived into a problem. The code is working
 fine in *local* mode. But if I run the same code in Spark stand alone
 mode or YARN mode, then it is continuously executing, but not saving
 anything in the HBase table. I guess, it is stopping data streaming once
 the *saveToHBase* method is called for the first time.

 This is strange. I just want to know whether you have tested the code on
 all Spark execution modes?

 Thanks,
 Gaurav


 On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List]
 [hidden email] http://user/SendEmail.jtp?type=nodenode=7474i=1
 wrote:

 Please see sample code attached at
 https://issues.apache.org/jira/browse/SPARK-944.


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http://user/SendEmail.jtp?type=nodenode=7474i=2
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
 View this message in context: Re: Writing data to HBase using Spark
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7474.html
  Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Spark 1.0.0 Standalone AppClient cannot connect Master

2014-06-12 Thread Andrew Or
Hi Wang Hao,

This is not removed. We moved it here:
http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
If you're building with SBT, and you don't specify the
SPARK_HADOOP_VERSION, then it defaults to 1.0.4.

Andrew


2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com:

 Hi, all

 Why does the Spark 1.0.0 official doc remove how to build Spark with
 corresponding Hadoop version?

 It means that if I don't need to specify the Hadoop version with I build
 my Spark 1.0.0 with `sbt/sbt assembly`?


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com



Re: use spark-shell in the source

2014-06-12 Thread Andrew Or
Not sure if this is what you're looking for, but have you looked at java's
ProcessBuilder? You can do something like

for (line - lines) {
  val command = line.split( ) // You may need to deal with quoted strings
  val process = new ProcessBuilder(command)
  // redirect output of process to main thread
  process.start()
}

Or are you trying to launch an interactive REPL in the middle of your
application?


2014-06-11 22:56 GMT-07:00 JaeBoo Jung itsjb.j...@samsung.com:

  Hi all,



 Can I use spark-shell programmatically in my spark application(in java or
 scala)?

 Because I want to convert scala lines to string array and run
 automatically in my application.

 For example,

 for( var line - lines){

 //run this line in spark shell style and get outputs.

 run(line);

 }

 Thanks

 _

 *JaeBoo, Jung*
 Assistant Engineer / BDA Lab / Samsung SDS



Re: Access DStream content?

2014-06-12 Thread Gianluca Privitera
You can use ForeachRDD then access RDD data.
Hope this works for you.

Gianluca

On 12 Jun 2014, at 10:06, Wolfinger, Fred 
fwolfin...@cyberpointllc.commailto:fwolfin...@cyberpointllc.com wrote:

Good morning.

I have a question related to Spark Streaming. I have reduced some data down to 
a simple count value (by window), and would like to take immediate action on 
that value before storing in HDFS. I don't see any DStream member functions 
that would allow me to access its contents. Is what I am trying to do not in 
the spirit of Spark Streaming? If it's not, what would be the best practice for 
doing so?

Thanks so much!

Fred




Re: Hanging Spark jobs

2014-06-12 Thread Shivani Rao
I learned this from my co-worker, but it is relevant here.

Spark has lazy evaluation by default, which means that all of your code
does not get executed until you run your saveAsTextFile, which does not
tell you much about where the problem is occurring. In order to debug this
better, you might want to put in a saveAsTextFile after each RDD
operation, so that you can figure out where it is getting stuck.

HTH
Shivani


On Wed, Jun 11, 2014 at 2:17 AM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 These stack traces come from the stuck node? Looks like it's waiting on
 data in BlockFetcherIterator. Waiting for data from another node. But you
 say all other nodes were done? Very curious.

 Maybe you could try turning on debug logging, and try to figure out what
 happens in BlockFetcherIterator (
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala).
 I do not think it is supposed to get stuck indefinitely.

 On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel dhurw...@ebay.com
 wrote:

  Hi,



 We are observing a recurring issue where our Spark jobs are hanging for
 several hours, even days, until we kill them.



 We are running Spark v0.9.1 over YARN.



 Our input is a list of edges of a graph on which we use Bagel to compute
 connected components using the following method:



 *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long)
 *extends* Message[Long] *with* Serializable

 *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int):
 (CC, Array[CCMessage]) = {

   *val* smallestComponentId = msgs.map(sq = *sq.map(_.*
 *myComponentId**)*.min).getOrElse(Long.MaxValue)

   *val* newComponentId = math.min(self.clusterID, smallestComponentId
 )

   *val* halt = (newComponentId == self.clusterID) || (superstep =
 maxIters)

   self.active = *if* (superstep == 0) *true* *else* !halt

   *val* outGoingMessages = *if* (halt  superstep  0)
 Array[CCMessage]()

   *else* self.edges.map(targetId = *new* CCMessage(targetId,
 newComponentId)).toArray

   self.clusterID = newComponentId



   (self, outGoingMessages)

 }



 Our output is a text file in which each line is a list of the node IDs in
 each component. The size of the output may be up to 6 GB.



 We see in the job tracker that most of the time jobs usually get stuck on
 the “saveAsTextFile” command, the final line in our code. In some cases,
 the job will hang during one of the iterations of Bagel during the
 computation of the connected components.



 Oftentimes, when we kill the job and re-execute it, it will finish
 successfully within an hour which is the expected duration. We notice that
 if our Spark jobs don’t finish after a few hours, they will never finish
 until they are killed, regardless of the load on our cluster.



 After consulting with our Hadoop support team, they noticed that after a
 particular hanging Spark job was running for 38 hours, all Spark processes
 on all nodes were completed except for one node which was running more than
 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then
 back to calm. Also, the other nodes were not relinquishing their resources
 until our Hadoop admin killed the process on that problematic node and
 suddenly the job finished and “success” was reported in the job tracker.
 The output seemed to be fine too. If it helps you understand the issue, the
 Hadoop admin suggested this was a Spark issue and sent us two stack dumps
 which I attached to this email: before killing the node’s Spark process
 (dump1.txt) and after (dump2.txt).



 Any advice on how to resolve this issue? How can we debug this?

  Thanks,

 ~Daniel







-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Adding external jar to spark-shell classpath in spark 1.0

2014-06-12 Thread Shivani Rao
@Marcelo:  The command ./bin/spark-shell --jars jar1,jar2,etc,etc did not
work for me on a linux machine

What I did is to append the class path in the bin/compute-classpath.sh
file. Ran the script, then started the spark shell, and that worked


Thanks
Shivani


On Wed, Jun 11, 2014 at 10:52 AM, Andrew Or and...@databricks.com wrote:

 Ah, of course, there are no application jars in spark-shell, then it seems
 that there are no workarounds for this at the moment. We will look into a
 fix shortly, but for now you will have to create an application and use
 spark-submit (or use spark-shell on Linux).


 2014-06-11 10:42 GMT-07:00 Ulanov, Alexander alexander.ula...@hp.com:

   Could you elaborate on this? I don’t have an application, I just use
 spark shell.



 *From:* Andrew Or [mailto:and...@databricks.com]
 *Sent:* Wednesday, June 11, 2014 9:40 PM

 *To:* user@spark.apache.org
 *Subject:* Re: Adding external jar to spark-shell classpath in spark 1.0



 This is a known issue: https://issues.apache.org/jira/browse/SPARK-1919.
 We haven't found a fix yet, but for now, you can workaround this by
 including your simple class in your application jar.



 2014-06-11 10:25 GMT-07:00 Ulanov, Alexander alexander.ula...@hp.com:

  Hi,



 I am currently using spark 1.0 locally on Windows 7. I would like to use
 classes from external jar in the spark-shell. I followed the instruction
 in:
 http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCALrNVjWWF6k=c_jrhoe9w_qaacjld4+kbduhfv0pitr8h1f...@mail.gmail.com%3E



 I have set ADD_JARS=”my.jar” SPARK_CLASSPATH=”my.jar” in spark-shell.cmd
 but this didn’t work.



 I also tried running “spark-shell.cmd --jars my.jar --driver-class-path
 my.jar --driver-library-path my.jar” and it didn’t work either.



 I cannot load any class from my jar into spark shell. Btw my.jar contains
 a simple Scala class.



 Best regards, Alexander







-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Announcing MesosCon schedule, incl Spark presentation

2014-06-12 Thread Dave Lester
Spark friends,

Yesterday the Mesos community announced the program
http://mesos.apache.org/blog/mesoscon-2014-program-announced/ for
#MesosCon http://events.linuxfoundation.org/events/mesoscon, the ever
Apache Mesos conference taking place August 21-22, 2014 in Chicago, IL.
Given the close relationship between Spark and Mesos, I wanted to make sure
we let you all know.

https://twitter.com/mesoscon/status/476810946141093888

The MesosCon program http://mesoscon14.sched.org includes a spark
presentation by Paco Nathan, Spark on Mesos: Handling Big Data in a
Distributed, Multi-Tenant Environment. Keynote presentations are by
Benjamin Hindman (Twitter) and John Wilkes (Google).

Hope to see many of you there.

Dave


overwriting output directory

2014-06-12 Thread SK
Hi,

When we have multiple runs of a program writing to the same output file, the
execution fails if the output directory already exists from a previous run.
Is there some way we can have it overwrite the existing directory, so that
we dont have to manually delete it after each run?

Thanks for your help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/overwriting-output-directory-tp7498.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: overwriting output directory

2014-06-12 Thread Nan Zhu
Hi, SK 

For 1.0.0 you have to delete it manually

in 1.0.1 there will be a parameter to enable overwriting 

https://github.com/apache/spark/pull/947/files

Best, 

-- 
Nan Zhu


On Thursday, June 12, 2014 at 1:57 PM, SK wrote:

 Hi,
 
 When we have multiple runs of a program writing to the same output file, the
 execution fails if the output directory already exists from a previous run.
 Is there some way we can have it overwrite the existing directory, so that
 we dont have to manually delete it after each run?
 
 Thanks for your help.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/overwriting-output-directory-tp7498.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 




Re: Writing data to HBase using Spark

2014-06-12 Thread Nan Zhu
you are using spark streaming?  

master = “local[n]” where n  1?

Best,  

--  
Nan Zhu


On Wednesday, June 11, 2014 at 4:23 AM, gaurav.dasgupta wrote:

 Hi Kanwaldeep,
  
 I have tried your code but arrived into a problem. The code is working fine 
 in local mode. But if I run the same code in Spark stand alone mode or YARN 
 mode, then it is continuously executing, but not saving anything in the HBase 
 table. I guess, it is stopping data streaming once the saveToHBase method is 
 called for the first time.
  
 This is strange. I just want to know whether you have tested the code on all 
 Spark execution modes?
  
 Thanks,
 Gaurav
  
  
 On Tue, Jun 10, 2014 at 12:20 PM, Kanwaldeep [via Apache Spark User List] 
 [hidden email] (/user/SendEmail.jtp?type=nodenode=7389i=0) wrote:
  Please see sample code attached at 
  https://issues.apache.org/jira/browse/SPARK-944.  
   
   
  If you reply to this email, your message will be added to the discussion 
  below: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7305.html

  To start a new topic under Apache Spark User List, email [hidden email] 
  (/user/SendEmail.jtp?type=nodenode=7389i=1)  
  To unsubscribe from Apache Spark User List, click here.
  NAML 
  (http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml)

  
 View this message in context: Re: Writing data to HBase using Spark 
 (http://apache-spark-user-list.1001560.n3.nabble.com/Writing-data-to-HBase-using-Spark-tp7304p7389.html)
 Sent from the Apache Spark User List mailing list archive 
 (http://apache-spark-user-list.1001560.n3.nabble.com/) at Nabble.com 
 (http://Nabble.com).



Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-12 Thread Daniel Siegmann
The old behavior (A) was dangerous, so it's good that (B) is now the
default. But in some cases I really do want to replace the old data, as per
(C). For example, I may rerun a previous computation (perhaps the input
data was corrupt and I'm rerunning with good input).

Currently I have to write separate code to remove the files before calling
Spark. It would be very convenient if Spark could do this for me. Has
anyone created a JIRA issue to support (C)?


On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com wrote:

 It is not a very good idea to save the results in the exact same place as
 the data. Any failures during the job could lead to corrupted data, because
 recomputing the lost partitions would involve reading the original
 (now-nonexistent) data.

 As such, the only safe way to do this would be to do as you said, and
 only delete the input data once the entire output has been successfully
 created.


 On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 Without (C), what is the best practice to implement the following
 scenario?

 1. rdd = sc.textFile(FileA)
 2. rdd = rdd.map(...)  // actually modifying the rdd
 3. rdd.saveAsTextFile(FileA)

 Since the rdd transformation is 'lazy', rdd will not materialize until
 saveAsTextFile(), so FileA must still exist, but it must be deleted before
 saveAsTextFile().

 What I can think is:

 3. rdd.saveAsTextFile(TempFile)
 4. delete FileA
 5. rename TempFile to FileA

 This is not very convenient...

 Thanks.

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, June 03, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing
 file

 (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output
 format check and overwrite files in the destination directory.
 But it won't clobber the directory entirely. I.e. if the directory already
 had part1 part2 part3 part4 and you write a new job outputing only
 two files (part1, part2) then it would leave the other two files
 intact,
 confusingly.

 (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
 which
 means the directory must not exist already or an excpetion is thrown.

 (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
 Spark will delete/clobber an existing destination directory if it exists,
 then fully over-write it with new data.

 I'm fine to add a flag that allows (B) for backwards-compatibility
 reasons,
 but my point was I'd prefer not to have (C) even though I see some cases
 where it would be useful.

 - Patrick

 On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote:
  Is there a third way? Unless I miss something. Hadoop's OutputFormat
  wants the target dir to not exist no matter what, so it's just a
  question of whether Spark deletes it for you or errors.
 
  On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com
 wrote:
  We can just add back a flag to make it backwards compatible - it was
  just missed during the original PR.
 
  Adding a *third* set of clobber semantics, I'm slightly -1 on that
  for the following reasons:
 
  1. It's scary to have Spark recursively deleting user files, could
  easily lead to users deleting data by mistake if they don't
  understand the exact semantics.
  2. It would introduce a third set of semantics here for saveAsXX...
  3. It's trivial for users to implement this with two lines of code
  (if output dir exists, delete it) before calling saveAsHadoopFile.
 
  - Patrick
 





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-12 Thread Nan Zhu
Actually this has been merged to the master branch 

https://github.com/apache/spark/pull/947 

-- 
Nan Zhu


On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote:

 The old behavior (A) was dangerous, so it's good that (B) is now the default. 
 But in some cases I really do want to replace the old data, as per (C). For 
 example, I may rerun a previous computation (perhaps the input data was 
 corrupt and I'm rerunning with good input).
 
 Currently I have to write separate code to remove the files before calling 
 Spark. It would be very convenient if Spark could do this for me. Has anyone 
 created a JIRA issue to support (C)?
 
 
 On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com 
 (mailto:ilike...@gmail.com) wrote:
  It is not a very good idea to save the results in the exact same place as 
  the data. Any failures during the job could lead to corrupted data, because 
  recomputing the lost partitions would involve reading the original 
  (now-nonexistent) data.
  
  As such, the only safe way to do this would be to do as you said, and 
  only delete the input data once the entire output has been successfully 
  created.
  
  
  On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
  taeyun@innowireless.co.kr (mailto:taeyun@innowireless.co.kr) 
  wrote:
   Without (C), what is the best practice to implement the following 
   scenario?
   
   1. rdd = sc.textFile(FileA)
   2. rdd = rdd.map(...)  // actually modifying the rdd
   3. rdd.saveAsTextFile(FileA)
   
   Since the rdd transformation is 'lazy', rdd will not materialize until
   saveAsTextFile(), so FileA must still exist, but it must be deleted before
   saveAsTextFile().
   
   What I can think is:
   
   3. rdd.saveAsTextFile(TempFile)
   4. delete FileA
   5. rename TempFile to FileA
   
   This is not very convenient...
   
   Thanks.
   
   -Original Message-
   From: Patrick Wendell [mailto:pwend...@gmail.com]
   Sent: Tuesday, June 03, 2014 11:40 AM
   To: user@spark.apache.org (mailto:user@spark.apache.org)
   Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing
   file
   
   (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output
   format check and overwrite files in the destination directory.
   But it won't clobber the directory entirely. I.e. if the directory already
   had part1 part2 part3 part4 and you write a new job outputing only
   two files (part1, part2) then it would leave the other two files 
   intact,
   confusingly.
   
   (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check 
   which
   means the directory must not exist already or an excpetion is thrown.
   
   (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
   Spark will delete/clobber an existing destination directory if it exists,
   then fully over-write it with new data.
   
   I'm fine to add a flag that allows (B) for backwards-compatibility 
   reasons,
   but my point was I'd prefer not to have (C) even though I see some cases
   where it would be useful.
   
   - Patrick
   
   On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com 
   (mailto:so...@cloudera.com) wrote:
Is there a third way? Unless I miss something. Hadoop's OutputFormat
wants the target dir to not exist no matter what, so it's just a
question of whether Spark deletes it for you or errors.
   
On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com 
(mailto:pwend...@gmail.com)
   wrote:
We can just add back a flag to make it backwards compatible - it was
just missed during the original PR.
   
Adding a *third* set of clobber semantics, I'm slightly -1 on that
for the following reasons:
   
1. It's scary to have Spark recursively deleting user files, could
easily lead to users deleting data by mistake if they don't
understand the exact semantics.
2. It would introduce a third set of semantics here for saveAsXX...
3. It's trivial for users to implement this with two lines of code
(if output dir exists, delete it) before calling saveAsHadoopFile.
   
- Patrick
   
   
  
 
 
 
 -- 
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning
 
 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io (mailto:daniel.siegm...@velos.io) W: www.velos.io 
 (http://www.velos.io) 



Re: Spark-Streaming window processing

2014-06-12 Thread unorthodox . engineers
To get the streaming latency I just look at the stats on the application 
drivers UI webpage. 

I don't know if you can do that programatically, but you could CURL and parse 
the page if you had to.

Jeremy Lee   BCompSci (Hons)
The Unorthodox Engineers

 On 10 Jun 2014, at 3:36 pm, Yingjun Wu wu.yj0...@gmail.com wrote:
 
 Hi Sean,
 
 Thanks for your reply! So for the first question, any idea for measuring
 latency of Spark-Streaming?
 
 Regards,
 Yingjun
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-window-processing-tp7234p7300.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NullPointerException on reading checkpoint files

2014-06-12 Thread Kiran
I am also seeing similar problem when trying to continue job using saved
checkpoint. Can somebody help in solving this problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-reading-checkpoint-files-tp7306p7507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-12 Thread Daniel Siegmann
I do not want the behavior of (A) - that is dangerous and should only be
enabled to account for legacy code. Personally, I think this option should
eventually be removed.

I want the option (C), to have Spark delete any existing part files before
creating any new output. I don't necessarily want this to be a global
option, but one on the API for saveTextFile (i.e. an additional boolean
parameter).

As it stands now, I need to precede every saveTextFile call with my own
deletion code.

In other words, instead of writing ...

if ( cleanOutput ) { MyUtil.clean(outputDir) }
rdd.writeTextFile( outputDir )

I'd like to write

rdd.writeTextFile(outputDir, cleanOutput)

Does that make sense?




On Thu, Jun 12, 2014 at 2:51 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

 Actually this has been merged to the master branch

 https://github.com/apache/spark/pull/947

 --
 Nan Zhu

 On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote:

 The old behavior (A) was dangerous, so it's good that (B) is now the
 default. But in some cases I really do want to replace the old data, as per
 (C). For example, I may rerun a previous computation (perhaps the input
 data was corrupt and I'm rerunning with good input).

 Currently I have to write separate code to remove the files before calling
 Spark. It would be very convenient if Spark could do this for me. Has
 anyone created a JIRA issue to support (C)?


 On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com wrote:

 It is not a very good idea to save the results in the exact same place as
 the data. Any failures during the job could lead to corrupted data, because
 recomputing the lost partitions would involve reading the original
 (now-nonexistent) data.

 As such, the only safe way to do this would be to do as you said, and
 only delete the input data once the entire output has been successfully
 created.


 On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 Without (C), what is the best practice to implement the following scenario?

 1. rdd = sc.textFile(FileA)
 2. rdd = rdd.map(...)  // actually modifying the rdd
 3. rdd.saveAsTextFile(FileA)

 Since the rdd transformation is 'lazy', rdd will not materialize until
 saveAsTextFile(), so FileA must still exist, but it must be deleted before
 saveAsTextFile().

 What I can think is:

 3. rdd.saveAsTextFile(TempFile)
 4. delete FileA
 5. rename TempFile to FileA

 This is not very convenient...

 Thanks.

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, June 03, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing
 file

 (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output
 format check and overwrite files in the destination directory.
 But it won't clobber the directory entirely. I.e. if the directory already
 had part1 part2 part3 part4 and you write a new job outputing only
 two files (part1, part2) then it would leave the other two files
 intact,
 confusingly.

 (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
 which
 means the directory must not exist already or an excpetion is thrown.

 (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
 Spark will delete/clobber an existing destination directory if it exists,
 then fully over-write it with new data.

 I'm fine to add a flag that allows (B) for backwards-compatibility reasons,
 but my point was I'd prefer not to have (C) even though I see some cases
 where it would be useful.

 - Patrick

 On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote:
  Is there a third way? Unless I miss something. Hadoop's OutputFormat
  wants the target dir to not exist no matter what, so it's just a
  question of whether Spark deletes it for you or errors.
 
  On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com
 wrote:
  We can just add back a flag to make it backwards compatible - it was
  just missed during the original PR.
 
  Adding a *third* set of clobber semantics, I'm slightly -1 on that
  for the following reasons:
 
  1. It's scary to have Spark recursively deleting user files, could
  easily lead to users deleting data by mistake if they don't
  understand the exact semantics.
  2. It would introduce a third set of semantics here for saveAsXX...
  3. It's trivial for users to implement this with two lines of code
  (if output dir exists, delete it) before calling saveAsHadoopFile.
 
  - Patrick
 





 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: creating new ami image for spark ec2 commands

2014-06-12 Thread unorthodox . engineers
Creating AMIs from scratch is a complete pain in the ass. If you have a spare 
week, sure. I understand why the team avoids it.

The easiest way is probably to spin up a working instance and then use Amazons 
save as new AMI, but that has some major limitations, especially with 
software not expecting it. (There are two of me now!) Worker nodes might cope 
better than the master.

But yes, I also would love new AMIs that don't pull down 200 meg every time I 
spin up. 
(Spin up a cluster in five minutes HA!) Also, AMIs per region are also good 
for costs. I've thought of doing up new ones, (since I have experience) but I 
have no time and other issues first. Perhaps once I know Spark better.

At least with spark, we have more control over the scripts exactly because they 
are Primitive. I had a quick look at YARN/Ambari, and it wasn't obvious they 
were any better with EC2, and a hundred times the complexity.

I expect most AWS-heavy companies have a full time person just managing AMIs. 
They are that annoying. It's what makes Cloudera attractive.

Jeremy Lee   BCompSci (Hons)
The Unorthodox Engineers

 On 6 Jun 2014, at 6:44 am, Matt Work Coarr mattcoarr.w...@gmail.com wrote:
 
 How would I go about creating a new AMI image that I can use with the spark 
 ec2 commands? I can't seem to find any documentation.  I'm looking for a list 
 of steps that I'd need to perform to make an Amazon Linux image ready to be 
 used by the spark ec2 tools.
 
 I've been reading through the spark 1.0.0 documentation, looking at the 
 script itself (spark_ec2.py), and looking at the github project 
 mesos/spark-ec2.
 
 From what I can tell, the spark_ec2.py script looks up the id of the AMI 
 based on the region and machine type (hvm or pvm) using static content 
 derived from the github repo mesos/spark-ec2.
 
 The spark ec2 script loads the AMI id from this base url:
 https://raw.github.com/mesos/spark-ec2/v2/ami-list
 (Which presumably comes from https://github.com/mesos/spark-ec2 )
 
 For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
 ami-5bb18832
 
 Is there a list of instructions for how this AMI was created?  Assuming I'm 
 starting with my own Amazon Linux image, what would I need to do to make it 
 usable where I could pass that AMI id to spark_ec2.py rather than using the 
 default spark-provided AMI?
 
 Thanks,
 Matt


spark EC2 bring-up problems

2014-06-12 Thread Toby Douglass
Gents,

I have been bringing up a cluster on EC2 using the spark_ec2.py script.

This works if the cluster has a single slave.

This fails if the cluster has sixteen slaves, during the work to transfer
the SSH key to the slaves.  I cannot currently bring up a large cluster.

Can anyone shed any light on this issue?

As an aside, that script does not work out of the box in Amazon EC2
instances.  Python 2.7 must be installed, and then boto for 2.7.


Re: creating new ami image for spark ec2 commands

2014-06-12 Thread Nicholas Chammas
Yeah, we badly need new AMIs that include at a minimum package/security
updates and Python 2.7. There is an open issue to track the 2.7 AMI update
https://issues.apache.org/jira/browse/SPARK-922, at least.


On Thu, Jun 12, 2014 at 3:34 PM, unorthodox.engine...@gmail.com wrote:

 Creating AMIs from scratch is a complete pain in the ass. If you have a
 spare week, sure. I understand why the team avoids it.

 The easiest way is probably to spin up a working instance and then use
 Amazons save as new AMI, but that has some major limitations, especially
 with software not expecting it. (There are two of me now!) Worker nodes
 might cope better than the master.

 But yes, I also would love new AMIs that don't pull down 200 meg every
 time I spin up.
 (Spin up a cluster in five minutes HA!) Also, AMIs per region are also
 good for costs. I've thought of doing up new ones, (since I have
 experience) but I have no time and other issues first. Perhaps once I know
 Spark better.

 At least with spark, we have more control over the scripts exactly because
 they are Primitive. I had a quick look at YARN/Ambari, and it wasn't
 obvious they were any better with EC2, and a hundred times the complexity.

 I expect most AWS-heavy companies have a full time person just managing
 AMIs. They are that annoying. It's what makes Cloudera attractive.

 Jeremy Lee   BCompSci (Hons)
 The Unorthodox Engineers

 On 6 Jun 2014, at 6:44 am, Matt Work Coarr mattcoarr.w...@gmail.com
 wrote:

 How would I go about creating a new AMI image that I can use with the
 spark ec2 commands? I can't seem to find any documentation.  I'm looking
 for a list of steps that I'd need to perform to make an Amazon Linux image
 ready to be used by the spark ec2 tools.

 I've been reading through the spark 1.0.0 documentation, looking at the
 script itself (spark_ec2.py), and looking at the github project
 mesos/spark-ec2.

 From what I can tell, the spark_ec2.py script looks up the id of the AMI
 based on the region and machine type (hvm or pvm) using static content
 derived from the github repo mesos/spark-ec2.

 The spark ec2 script loads the AMI id from this base url:
 https://raw.github.com/mesos/spark-ec2/v2/ami-list
 (Which presumably comes from https://github.com/mesos/spark-ec2 )

 For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
 ami-5bb18832

 Is there a list of instructions for how this AMI was created?  Assuming
 I'm starting with my own Amazon Linux image, what would I need to do to
 make it usable where I could pass that AMI id to spark_ec2.py rather than
 using the default spark-provided AMI?

 Thanks,
 Matt




Re: spark EC2 bring-up problems

2014-06-12 Thread Toby Douglass
On Thu, Jun 12, 2014 at 8:50 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Yes, you need Python 2.7 to run spark-ec2 and most AMIs come with 2.6

Ah, yes - I mean to say, Amazon Linux.

 .Have you tried either:

1. Retrying launch with the --resume option?
2. Increasing the value of the --wait option?

 No.  I will try the first, now.  I think the latter is not the issue - the
instances are up; something else is amiss.

Thankyou.


Re: Not fully cached when there is enough memory

2014-06-12 Thread Daniel Siegmann
I too have seen cached RDDs not hit 100%, even when they are DISK_ONLY.
Just saw that yesterday in fact. In some cases RDDs I expected didn't show
up in the list at all. I have no idea if this is an issue with Spark or
something I'm not understanding about how persist works (probably the
latter).

If I figure out the reason for this I'll let you know.


On Wed, Jun 11, 2014 at 8:54 PM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Xiangrui, clicking into the RDD link, it gives the same message, say only
 96 of 100 partitions are cached. The disk/memory usage are the same, which
 is far below the limit.
 Is this what you want to check or other issue?


 On Wed, Jun 11, 2014 at 4:38 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you try to click one that RDD and see the storage info per
 partition? I tried continuously caching RDDs, so new ones kick old
 ones out when there is not enough memory. I saw similar glitches but
 the storage info per partition is correct. If you find a way to
 reproduce this error, please create a JIRA. Thanks! -Xiangrui





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Increase storage.MemoryStore size

2014-06-12 Thread ericjohnston1989
Hey everyone,

I'm having some trouble increasing the default storage size for a broadcast
variable. It looks like it defaults to a little less than 512MB every time,
and I can't figure out which configuration to change to increase this.

INFO storage.MemoryStore: Block broadcast_0 stored as values to memory
(estimated size 426.5 MB, free 64.2 MB)

(I'm seeing this in the terminal on my driver computer)

I can change spark.executor.memory, and that seems to increase the amount
of RAM available on my nodes, but it doesn't seem to adjust this storage
size for my broadcast variables. Any ideas?

Thanks,

Eric



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Increase-storage-MemoryStore-size-tp7516.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Increase storage.MemoryStore size

2014-06-12 Thread Gianluca Privitera
If you are launching your application with spark-submit you can manually edit 
the spark-class file to make it 1g as baseline. 
It’s pretty easy to do and to figure out how once you open the file.
This worked for me even if it’s not a final solution of course.

Gianluca

On 12 Jun 2014, at 15:16, ericjohnston1989 ericjohnston1...@gmail.com wrote:

 Hey everyone,
 
 I'm having some trouble increasing the default storage size for a broadcast
 variable. It looks like it defaults to a little less than 512MB every time,
 and I can't figure out which configuration to change to increase this.
 
 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory
 (estimated size 426.5 MB, free 64.2 MB)
 
 (I'm seeing this in the terminal on my driver computer)
 
 I can change spark.executor.memory, and that seems to increase the amount
 of RAM available on my nodes, but it doesn't seem to adjust this storage
 size for my broadcast variables. Any ideas?
 
 Thanks,
 
 Eric
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Increase-storage-MemoryStore-size-tp7516.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



An attempt to implement dbscan algorithm on top of Spark

2014-06-12 Thread Aliaksei Litouka
Hi.
I'm not sure if messages like this are appropriate in this list; I just
want to share with you an application I am working on. This is my personal
project which I started to learn more about Spark and Scala, and, if it
succeeds, to contribute it to the Spark community.

Maybe someone will find it useful. Or maybe someone will want to join
development.

The application is available at https://github.com/alitouka/spark_dbscan

Any questions, comments, suggestions, as well as criticism are welcome :)

Best regards,
Aliaksei Litouka


Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Aliaksei Litouka
spark-env.sh doesn't seem to contain any settings related to memory size :(
I will continue searching for a solution and will post it if I find it :)
Thank you, anyway


On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 It might be that conf/spark-env.sh on EC2 is configured to set it to 512,
 and is overriding the application’s settings. Take a look in there and
 delete that line if possible.

 Matei

 On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com
 wrote:

  I am testing my application in EC2 cluster of m3.medium machines. By
 default, only 512 MB of memory on each machine is used. I want to increase
 this amount and I'm trying to do it by passing --executor-memory 2G option
 to the spark-submit script, but it doesn't seem to work - each machine uses
 only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I
 increase the amount of memory?




Re: Normalizations in MLBase

2014-06-12 Thread DB Tsai
Hi Asian,

I'm not sure if mlbase code is maintained for the current spark
master. The following is the code we use for standardization in my
company. I'm intended to clean up, and submit a PR. You could use it
for now.

  def standardize(data: RDD[Vector]): RDD[Vector] = {
val summarizer = new RowMatrix(data).computeColumnSummaryStatistics
val mean = summarizer.mean
val variance = summarizer.variance

// The standardization will always densify the output, so the output
// will be stored in dense vector.
data.map(x = {
  val n = x.toBreeze.length
  val output = BDV.zeros[Double](n)
  var i = 0
  while(i  n) {
if(variance(i) == 0) {
  output(i) = Double.NaN
} else {
  output(i) = (x(i) - mean(i)) / Math.sqrt(variance(i))
}
i += 1
  }
  Vectors.fromBreeze(output)
})
  }

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, Jun 12, 2014 at 1:49 AM, Aslan Bekirov aslanbeki...@gmail.com wrote:
 Hi DB,

 I found a piece of code that uses znorm to normalize data.


 /**
  * build training data set from sample and summary data
  */
  val train_data = sample_data.map( v =
Array.tabulate[Double](field_cnt)(
  i = zscore(v._2(i),sample_mean(i),sample_stddev(i))
)
  ).cache

 Please make your comments if you find something wrong.

 BR,
 Aslan



 On Thu, Jun 12, 2014 at 11:13 AM, Aslan Bekirov aslanbeki...@gmail.com
 wrote:

 Thanks a lot DB.

 I will try to do Znorm normalization using map transformation.


 BR,
 Aslan


 On Thu, Jun 12, 2014 at 12:16 AM, DB Tsai dbt...@stanford.edu wrote:

 Hi Aslan,

 Currently, we don't have the utility function to do so. However, you
 can easily implement this by another map transformation. I'm working
 on this feature now, and there will be couple different available
 normalization option users can chose.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Jun 11, 2014 at 6:25 AM, Aslan Bekirov aslanbeki...@gmail.com
 wrote:
  Hi All,
 
  I have to normalize a set of values in the range 0-500 to the [0-1]
  range.
 
  Is there any util method in MLBase to normalize large set of data?
 
  BR,
  Aslan





NullPointerExceptions when using val or broadcast on a standalone cluster.

2014-06-12 Thread bdamos
Hi, I'm consistently getting NullPointerExceptions when trying to use
String val objects defined in my main application -- even for broadcast
vals!
I'm deploying on a standalone cluster with a master and 4 workers on the
same machine, which is not the machine I'm submitting from.

The following example works in spark-shell, but does not when
submitted to the cluster with spark-submit, and also does not work locally.

Is there anything I can do to fix this?
Do vals need to be explicitly synchronized for RDD operations?
One workaround in would be to inline the vals,
but the logic in my actual application doesn't allow for this.

Thanks,
Brandon.

---

sbt-shell --master my-server

  val suffix = -suffix
  val l = sc.parallelize(List(a, b, c))
  println(l.map(_+suffix).collect().mkString(,))

  Result: a-suffix,b-suffix,c-suffix

---

Standalone Cluster with `submit.sh` (my script below):

TestApp.scala:

  package com.adobe.spark

  // Spark.
  import org.apache.spark.{SparkConf,SparkContext}
  import org.apache.spark.broadcast._
  import org.apache.spark.SparkContext._
  import org.apache.spark.storage.StorageLevel

  // Scala.
  import scala.collection.mutable.ArrayBuffer

  object TestApp extends App {
val memory = 1g
val maxCores = 1
val conf = new SparkConf()
  .setMaster(spark://myserver:7077)
  //.setMaster(local[4])
  .setAppName(ValError)
  .setSparkHome(/usr/local/spark-1.0.0)
  .setJars(Seq(/tmp/val-error.jar))
  .set(spark.executor.memory, memory)
  .set(spark.cores.max, maxCores)
val sc = new SparkContext(conf)

val suffix = -suffix
val l = sc.parallelize(List(a, b, c))
println(l.map(_+suffix).collect().mkString(,))

val suffix_bc = sc.broadcast(suffix)
println(l.map(_+suffix_bc.value).collect().mkString(,))

sc.stop()
  }

build.sbt:

  import AssemblyKeys._

  assemblySettings

  jarName in assembly := val-error.jar

  // Load provided libraries with `sbt run`.
  run in Compile = Defaults.runTask(
fullClasspath in Compile, mainClass in (Compile, run), runner in
(Compile, run)
  )

  name := TestApp

  version := 1.0

  scalaVersion := 2.10.3

  libraryDependencies ++= Seq(
org.apache.spark %% spark-core % 1.0.0 % provided,
org.slf4j % slf4j-simple % 1.7.7 // Logging.
  )

  resolvers ++= Seq(
Akka Repository at http://repo.akka.io/releases/;
  )

submit.sh:

  #!/bin/bash

  rm -f *.log driver-id.txt

  JAR=val-error.jar
  CLASS=com.adobe.spark.TestApp
  SPARK=/usr/local/spark-1.0.0

  set -x
  sbt assembly  assembly.log || exit 1
  scp target/scala-2.10/$JAR eagle:/tmp || exit 2

  $SPARK/bin/spark-submit \
--class $CLASS \
--master spark://myserver:7077 \
--deploy-mode cluster \
/tmp/$JAR | tee submit.log
  set +x

  DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed
's/Driver successfully submitted as \(.*\)/\1/g')
  [ -z $DRIVER_ID ]  exit 3
  echo $DRIVER_ID  driver-id.txt

Output:
  anull,bnull,cnull (For the first part.)

Stack Trace: (For the broadcast var.)
  Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException
  com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
  com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  scala.collection.AbstractIterator.to(Iterator.scala:1157)
 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
  org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
  org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
  org.apache.spark.scheduler.Task.run(Task.scala:51)
 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 

Java Custom Receiver onStart method never called

2014-06-12 Thread jsabin
I create a Java custom receiver and then call

ssc.receiverStream(new MyReceiver(localhost, 8081)); // where ssc is the
JavaStreamingContext

I am expecting that the receiver's onStart method gets called but it does
not.

Can anyone give me some guidance? What am I not doing?

Here's the dependency info:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.0.0/version
 /dependency

Thanks for your help



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-Custom-Receiver-onStart-method-never-called-tp7525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: An attempt to implement dbscan algorithm on top of Spark

2014-06-12 Thread Vipul Pandey
Great! I was going to implement one of my own - but I may not need to do that 
any more :)
I haven't had a chance to look deep into your code but I would recommend 
accepting an RDD[Double,Double] as well, instead of just a file. 
val data = IOHelper.readDataset(sc, /path/to/my/data.csv)
And other distance measures ofcourse. 

Thanks,
Vipul




On Jun 12, 2014, at 2:31 PM, Aliaksei Litouka aliaksei.lito...@gmail.com 
wrote:

 Hi.
 I'm not sure if messages like this are appropriate in this list; I just want 
 to share with you an application I am working on. This is my personal project 
 which I started to learn more about Spark and Scala, and, if it succeeds, to 
 contribute it to the Spark community.
 
 Maybe someone will find it useful. Or maybe someone will want to join 
 development.
 
 The application is available at https://github.com/alitouka/spark_dbscan
 
 Any questions, comments, suggestions, as well as criticism are welcome :)
 
 Best regards,
 Aliaksei Litouka



Re: NullPointerExceptions when using val or broadcast on a standalone cluster.

2014-06-12 Thread Gerard Maas
That stack trace is quite similar to the one that is generated when trying
to do a collect within a closure.  In this case, it feels wrong to
collect in a closure, but I wonder what's reason behind the NPE.
Curious to know whether they are related.

Here's a very simple example:
rrd1.flatMap(x= rrd2.collect.flatMap(y= List(y,x)))
res7: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[10] at flatMap at
console:17

scala res7.collect
14/06/13 01:11:48 INFO SparkContext: Starting job: collect at console:19
14/06/13 01:11:48 INFO DAGScheduler: Got job 2 (collect at console:19)
with 3 output partitions (allowLocal=false)
14/06/13 01:11:48 INFO DAGScheduler: Final stage: Stage 4(collect at
console:19)
14/06/13 01:11:48 INFO DAGScheduler: Parents of final stage: List()
14/06/13 01:11:48 INFO DAGScheduler: Missing parents: List()
14/06/13 01:11:48 INFO DAGScheduler: Submitting Stage 4 (FlatMappedRDD[10]
at flatMap at console:17), which has no missing parents
14/06/13 01:11:48 INFO DAGScheduler: Submitting 3 missing tasks from Stage
4 (FlatMappedRDD[10] at flatMap at console:17)
14/06/13 01:11:48 INFO TaskSchedulerImpl: Adding task set 4.0 with 3 tasks
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:0 as TID 16 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:0 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:1 as TID 17 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:1 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:2 as TID 18 on
executor localhost: localhost (PROCESS_LOCAL)
14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:2 as 1850 bytes
in 0 ms
14/06/13 01:11:48 INFO Executor: Running task ID 16
14/06/13 01:11:48 INFO Executor: Running task ID 17
14/06/13 01:11:48 INFO Executor: Running task ID 18
14/06/13 01:11:48 ERROR Executor: Exception in task ID 18
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.collect(RDD.scala:728)
at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:17)
 at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:17)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728)
 at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079)
 at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
14/06/13 01:11:48 ERROR Executor: Exception in task ID 16
... same for each partition.

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1037)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1019)
 at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1019)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637)
at scala.Option.foreach(Option.scala:236)
 at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:637)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1211)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at 

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-12 Thread Nan Zhu
ah, I see,   

I think it’s hard to do something like fs.delete() in spark code (it’s scary as 
we discussed in the previous PR )

so if you want (C), I guess you have to do some delete work manually  

Best,  

--  
Nan Zhu


On Thursday, June 12, 2014 at 3:31 PM, Daniel Siegmann wrote:

 I do not want the behavior of (A) - that is dangerous and should only be 
 enabled to account for legacy code. Personally, I think this option should 
 eventually be removed.
  
 I want the option (C), to have Spark delete any existing part files before 
 creating any new output. I don't necessarily want this to be a global option, 
 but one on the API for saveTextFile (i.e. an additional boolean parameter).
  
 As it stands now, I need to precede every saveTextFile call with my own 
 deletion code.
  
 In other words, instead of writing ...
  
 if ( cleanOutput ) { MyUtil.clean(outputDir) }
 rdd.writeTextFile( outputDir )
  
 I'd like to write
  
 rdd.writeTextFile(outputDir, cleanOutput)
  
 Does that make sense?
  
  
  
  
 On Thu, Jun 12, 2014 at 2:51 PM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  Actually this has been merged to the master branch  
   
  https://github.com/apache/spark/pull/947  
   
  --  
  Nan Zhu
   
   
  On Thursday, June 12, 2014 at 2:39 PM, Daniel Siegmann wrote:
   
   The old behavior (A) was dangerous, so it's good that (B) is now the 
   default. But in some cases I really do want to replace the old data, as 
   per (C). For example, I may rerun a previous computation (perhaps the 
   input data was corrupt and I'm rerunning with good input).

   Currently I have to write separate code to remove the files before 
   calling Spark. It would be very convenient if Spark could do this for me. 
   Has anyone created a JIRA issue to support (C)?


   On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com 
   (mailto:ilike...@gmail.com) wrote:
It is not a very good idea to save the results in the exact same place 
as the data. Any failures during the job could lead to corrupted data, 
because recomputing the lost partitions would involve reading the 
original (now-nonexistent) data.
 
As such, the only safe way to do this would be to do as you said, and 
only delete the input data once the entire output has been successfully 
created.
 
 
On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr (mailto:taeyun@innowireless.co.kr) 
wrote:
 Without (C), what is the best practice to implement the following 
 scenario?
  
 1. rdd = sc.textFile(FileA)
 2. rdd = rdd.map(...)  // actually modifying the rdd
 3. rdd.saveAsTextFile(FileA)
  
 Since the rdd transformation is 'lazy', rdd will not materialize until
 saveAsTextFile(), so FileA must still exist, but it must be deleted 
 before
 saveAsTextFile().
  
 What I can think is:
  
 3. rdd.saveAsTextFile(TempFile)
 4. delete FileA
 5. rename TempFile to FileA
  
 This is not very convenient...
  
 Thanks.
  
 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, June 03, 2014 11:40 AM
 To: user@spark.apache.org (mailto:user@spark.apache.org)
 Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite 
 existing
 file
  
 (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's 
 output
 format check and overwrite files in the destination directory.
 But it won't clobber the directory entirely. I.e. if the directory 
 already
 had part1 part2 part3 part4 and you write a new job outputing 
 only
 two files (part1, part2) then it would leave the other two files 
 intact,
 confusingly.
  
 (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat 
 check which
 means the directory must not exist already or an excpetion is thrown.
  
 (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
 Spark will delete/clobber an existing destination directory if it 
 exists,
 then fully over-write it with new data.
  
 I'm fine to add a flag that allows (B) for backwards-compatibility 
 reasons,
 but my point was I'd prefer not to have (C) even though I see some 
 cases
 where it would be useful.
  
 - Patrick
  
 On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com 
 (mailto:so...@cloudera.com) wrote:
  Is there a third way? Unless I miss something. Hadoop's OutputFormat
  wants the target dir to not exist no matter what, so it's just a
  question of whether Spark deletes it for you or errors.
 
  On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell 
  pwend...@gmail.com (mailto:pwend...@gmail.com)
 wrote:
  We can just add back a flag to make it backwards compatible - it 
  was

Re: Doubts about MLlib.linalg in python

2014-06-12 Thread ericjohnston1989
Hi Congrui Yi,

Spark is implemented in Scala, so all Scala features are first available in
Scala/Java. PySpark is a python wrapper for the Scala code, so it won't
always have the latest features.  This is especially true for the Machine
learning library.

Eric



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Doubts-about-MLlib-linalg-in-python-tp7523p7530.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
Maybe It would be nice that unpersist() ‘triggers’ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}



 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10 rdds will occur. 
rddUnion.saveAsTextFile(mergedFileName); 
} 

If rddUnion can be materialized before the rdd.unpersist() line and 
cache()d, the rdds in the loop will not be needed on 
rddUnion.saveAsTextFile(). 

Now what is the best strategy? 
- Do not unpersist all 10 rdds in the loop. 
- Materialize rddUnion in the loop by calling 'light' action API, like 
first(). 
- Give up and just rebuild/reload all 10 rdds when saving rddUnion. 

Is there some misunderstanding? 

Thanks. 



 




-- 

Daniel Siegmann, Software Developer
Velos

Accelerating Machine Learning


440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E:  mailto:daniel.siegm...@velos.io daniel.siegm...@velos.io W:  
http://www.velos.io www.velos.io



Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Matei Zaharia
Are you launching this using our EC2 scripts? Or have you set up a cluster by 
hand?

Matei

On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com 
wrote:

 spark-env.sh doesn't seem to contain any settings related to memory size :( I 
 will continue searching for a solution and will post it if I find it :)
 Thank you, anyway
 
 
 On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and 
 is overriding the application’s settings. Take a look in there and delete 
 that line if possible.
 
 Matei
 
 On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com 
 wrote:
 
  I am testing my application in EC2 cluster of m3.medium machines. By 
  default, only 512 MB of memory on each machine is used. I want to increase 
  this amount and I'm trying to do it by passing --executor-memory 2G option 
  to the spark-submit script, but it doesn't seem to work - each machine uses 
  only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I 
  increase the amount of memory?
 
 



Re: running Spark Streaming just once and stop it

2014-06-12 Thread Tathagata Das
You should be able to see the streaming tab in the Spark web ui (running on
port 4040) if you have created StreamingContext and you are using Spark 1.0


TD


On Thu, Jun 12, 2014 at 1:06 AM, Ravi Hemnani raviiihemn...@gmail.com
wrote:

 Hey,

 I did sparkcontext.addstreaminglistener(streaminglistener object) in my
 code
 and i am able to see some stats in the logs and cant see anything in web
 UI.


 How to add the Streaming Tab to the web UI ?

 I need to get queuing delays and related information.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/running-Spark-Streaming-just-once-and-stop-it-tp1382p7463.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Question about RDD cache, unpersist, materialization

2014-06-12 Thread Nicholas Chammas
FYI: Here is a related discussion
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
about this.


On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

 Maybe It would be nice that unpersist() ‘triggers’ the computations of
 other rdds that depends on it but not yet computed.
 The pseudo code can be as follows:



 unpersist()
 {
 if (this rdd has not been persisted)
 return;
 for (all rdds that depends on this rdd but not yet computed)
 compute_that_rdd;
 do_actual_unpersist();
 }



 *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io]
 *Sent:* Friday, June 13, 2014 5:38 AM
 *To:* user@spark.apache.org
 *Subject:* Re: Question about RDD cache, unpersist, materialization



 I've run into this issue. The goal of caching / persist seems to be to
 avoid recomputing an RDD when its data will be needed multiple times.
 However, once the following RDDs are computed the cache is no longer
 needed. The currently design provides no obvious way to detect when the
 cache is no longer needed so it can be discarded.

 In the case of cache in memory, it may be handled by partitions being
 dropped (in LRU order) when memory fills up. I need to do some more
 experimentation to see if this really works well, or if allowing memory to
 fill up causes performance issues or possibly OOM errors if data isn't
 correctly freed.

 In the case of persisting to disk, I'm not sure if there's a way to limit
 the disk space used for caching. Does anyone know if there is such a
 configuration option? This is a pressing issue for me - I have had jobs
 fail because nodes ran out of disk space.



 On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 If you want to force materialization use .count()



 Also if you can simply don't unpersist anything, unless you really need to
 free the memory

 —
 Sent from Mailbox https://www.dropbox.com/mailbox



 On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 BTW, it is possible that rdd.first() does not compute the whole
 partitions.
 So, first() cannot be uses for the situation below.

 -Original Message-
 From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr]
 Sent: Wednesday, June 11, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Question about RDD cache, unpersist, materialization

 Hi,

 What I (seems to) know about RDD persisting API is as follows:
 - cache() and persist() is not an action. It only does a marking.
 - unpersist() is also not an action. It only removes a marking. But if the
 rdd is already in memory, it is unloaded.

 And there seems no API to forcefully materialize the RDD without requiring
 a
 data by an action method, for example first().

 So, I am faced with the following scenario.

 {
 JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create
 empty for merging
 for (int i = 0; i  10; i++)
 {
 JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
 rdd.cache(); // Since it will be used twice, cache.
 rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); //
 Transform and save, rdd materializes
 rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another
 transform to T and merge by union
 rdd.unpersist(); // Now it seems not needed. (But needed actually)
 }
 // Here, rddUnion actually materializes, and needs all 10 rdds that
 already unpersisted.
 // So, rebuilding all 10 rdds will occur.
 rddUnion.saveAsTextFile(mergedFileName);
 }

 If rddUnion can be materialized before the rdd.unpersist() line and
 cache()d, the rdds in the loop will not be needed on
 rddUnion.saveAsTextFile().

 Now what is the best strategy?
 - Do not unpersist all 10 rdds in the loop.
 - Materialize rddUnion in the loop by calling 'light' action API, like
 first().
 - Give up and just rebuild/reload all 10 rdds when saving rddUnion.

 Is there some misunderstanding?

 Thanks.






 --

 Daniel Siegmann, Software Developer
 Velos

 Accelerating Machine Learning


 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io



RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
Currently I use rdd.count() for forceful computation, as Nick Pentreath 
suggested.

 

I think that it will be nice to have a method that forcefully computes a rdd, 
so that the unnecessary rdds are safely unpersist()ed.

 

Let’s think a case that a rdd_a is a parent of both:

(1) a short-term rdd_s that depends only on the rdd (transformation of rdd_a)

(2) and a long-term rdd_t that depends on the rdds that may be computed later. 
(may be for some aggregation)

Usually rdd_s is computed early, and rdd_a is computed for it. But it has to 
remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t 
is computed)

It would be nice if rdd_t could be computed when rdd_s is computed, so that 
rdd_a can be unpersist()ed, since it will not be used anymore.

(Currently I use rdd_t.count() for that)

 

sc.prune() which was suggested in the related discussion you provided is not 
helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t.

(Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.)

 

 

From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] 
Sent: Friday, June 13, 2014 9:31 AM
To: user
Subject: Re: Question about RDD cache, unpersist, materialization

 

FYI: Here is a related discussion 
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
  about this.

 

On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}

 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10 rdds will occur. 
rddUnion.saveAsTextFile(mergedFileName); 
} 

If rddUnion can be materialized before the rdd.unpersist() line and 
cache()d, the rdds in the loop will not be needed on 
rddUnion.saveAsTextFile(). 

Now what is the best strategy? 
- Do not unpersist all 10 rdds in the loop. 
- 

Re: use spark-shell in the source

2014-06-12 Thread Kevin Jung
Thanks for answer.
Yes, I tried to launch an interactive REPL in the middle of my application
:)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/use-spark-shell-in-the-source-tp7453p7539.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
(I¡¯ve clarified the statement (1) of my previous mail. See below.)

 

From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Friday, June 13, 2014 10:05 AM
To: user@spark.apache.org
Subject: RE: Question about RDD cache, unpersist, materialization

 

Currently I use rdd.count() for forceful computation, as Nick Pentreath 
suggested.

 

I think that it will be nice to have a method that forcefully computes a rdd, 
so that the unnecessary rdds are safely unpersist()ed.

 

Let’s think a case that a rdd_a is a parent of both:

(1) a short-term rdd_s that depends only on rdd_a (maybe rdd_s is a 
transformation of rdd_a)

(2) and a long-term rdd_t that depends on the rdds that may be computed later. 
(may be for some aggregation)

Usually rdd_s is computed early, and rdd_a is computed for it. But it has to 
remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t 
is computed)

It would be nice if rdd_t could be computed when rdd_s is computed, so that 
rdd_a can be unpersist()ed, since it will not be used anymore.

(Currently I use rdd_t.count() for that)

 

sc.prune() which was suggested in the related discussion you provided is not 
helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t.

(Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.)

 

 

From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] 
Sent: Friday, June 13, 2014 9:31 AM
To: user
Subject: Re: Question about RDD cache, unpersist, materialization

 

FYI: Here is a related discussion 
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
  about this.

 

On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}

 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10 

Re: spark EC2 bring-up problems

2014-06-12 Thread Toby Douglass
On Thu, Jun 12, 2014 at 9:10 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Hi Toby,

 It is usually the case that even if the EC2 console says the nodes are
 up, they are not really fully initialized. For 16 nodes I have found
 `--wait 800` to be the norm that makes things work.


It seems so!  resume worked fine, which fits.

 In my previous experience I have found this to be the culprit, so if

 you immediately do 'launch --resume' when you see the first SSH error
 it's still very likely to fail. But if you wait a little bit longer
 and do 'launch --resume', it could work.


It did.  Thankyou :-)


Re: Spark SQL - input command in web ui/event log

2014-06-12 Thread Michael Armbrust
Yeah, we should probably add that.  Feel free to file a JIRA.

You can get it manually by calling sc.setJobDescription with the query text
before running the query.

Michael


On Thu, Jun 12, 2014 at 5:49 PM, shlee0605 shlee0...@gmail.com wrote:

 In shark, the input SQL string was shown at the description of the web UI
 so that it was easy to keep track which stage is corresponding to which
 query.
 However, spark-sql does not show any information of the input SQL string.
 It rather shows the lower level operations to the web ui/event log.
 (screenshots are attached)

 I'm trying to find a way to match stage/or job to its input query string.
 Can input string information
 be easily added to the event-logger/or web UI?

 Thanks,
 Seunghyun

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n7536/shark_ui.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n7536/spark-sql_ui.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-input-command-in-web-ui-event-log-tp7536.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: An attempt to implement dbscan algorithm on top of Spark

2014-06-12 Thread Aliaksei Litouka
Vipul,
Thanks for your feedback. As far as I understand, mean RDD[(Double,
Double)] (note the parenthesis), and each of these Double values is
supposed to contain one coordinate of a point. It limits us to
2-dimensional space, which is not suitable for many tasks. I want the
algorithm to be able to work in multidimensional space. Actually, there is
a class org.alitouka.spark.dbscan.spatial.Point in my code, which
represents a point with an arbitrary number of coordinates.

IOHelper.readDataset is just a convenience method which reads a CSV file
and returns an RDD of Points (more precisely, it returns a value of type
RawDataset, which is just an alias for RDD[Point]). If your data is stored
in a format other than CSV, you will have to write your own code to convert
your data to RawDataset.

I can add support for other data formats in future versions.

As for other distance measures - it is a high priority issue in my list ;)



On Thu, Jun 12, 2014 at 6:02 PM, Vipul Pandey vipan...@gmail.com wrote:

 Great! I was going to implement one of my own - but I may not need to do
 that any more :)
 I haven't had a chance to look deep into your code but I would recommend
 accepting an RDD[Double,Double] as well, instead of just a file.

 val data = IOHelper.readDataset(sc, /path/to/my/data.csv)

 And other distance measures ofcourse.

 Thanks,
 Vipul




 On Jun 12, 2014, at 2:31 PM, Aliaksei Litouka aliaksei.lito...@gmail.com
 wrote:

 Hi.
 I'm not sure if messages like this are appropriate in this list; I just
 want to share with you an application I am working on. This is my personal
 project which I started to learn more about Spark and Scala, and, if it
 succeeds, to contribute it to the Spark community.

 Maybe someone will find it useful. Or maybe someone will want to join
 development.

 The application is available at https://github.com/alitouka/spark_dbscan

 Any questions, comments, suggestions, as well as criticism are welcome :)

 Best regards,
 Aliaksei Litouka





Re: specifying fields for join()

2014-06-12 Thread SK
This issue is resolved.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/specifying-fields-for-join-tp7528p7544.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: When to use CombineByKey vs reduceByKey?

2014-06-12 Thread Diana Hu
Matei,

Thanks for the answer this clarifies this very much. Based on my usage I
would use combineByKey, since the output is another custom data structures.

I found out my issues with combineByKey were relieved after doing more
tuning with the level of parallelism. I've found that it really depends on
the size of my dataset, since I did tests for 1000, 10K, 100K, 1M data
points, for now the GC issue is under control once I modified my data
structures to be mutable and the key part I was missing was that all
classes within it need it to be serializable

Thanks!

- Diana


On Wed, Jun 11, 2014 at 6:06 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 combineByKey is designed for when your return type from the aggregation is
 different from the values being aggregated (e.g. you group together
 objects), and it should allow you to modify the leftmost argument of each
 function (mergeCombiners, mergeValue, etc) and return that instead of
 allocating a new object. So it should work with mutable objects — please
 post what problems you had with that. reduceByKey actually also allows this
 if your types are the same.

 Matei


 On Jun 11, 2014, at 3:21 PM, Diana Hu siyin...@gmail.com wrote:

 Hello all,

 I've seen some performance improvements using combineByKey as opposed to
 reduceByKey or a groupByKey+map function. I have a couple questions. it'd
 be great if any one can provide some light into this.

 1) When should I use combineByKey vs reduceByKey?

 2) Do the containers need to be immutable for combineByKey? I've created
 custom data structures for the containers, one mutable and one immutable.
 The tests with the mutable containers, spark crashed with an error on
 missing references. However the downside of immutable containers (which
 works on my tests), is that for large datasets the garbage collector gets
 called many more times, and it tends to run out of heap space as the GC
 can't catch up. I tried some of the tips here
 http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning
 the JVM params, but this seems to be too much tuning?

 Thanks in advance,
 - Diana





Re: Spilled shuffle files not being cleared

2014-06-12 Thread Michael Chang
Bump


On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang m...@tellapart.com wrote:

 Hi all,

 I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks
 like I'm running out of inodes on my machines (I have around 300k each in a
 12 machine cluster).  I took a quick look and I'm seeing some shuffle spill
 files that are around even around 12 minutes after they are created.  Can
 someone help me understand when these shuffle spill files should be cleaned
 up (Is it as soon as they are used?)

 Thanks,
 Michael


 java.io.FileNotFoundException:
 /mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
 (No space left on device)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:221)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
 at
 org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)
 14/06/09 22:07:36 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException



RE: Spilled shuffle files not being cleared

2014-06-12 Thread Shao, Saisai
Hi Michael,

I think you can set up spark.cleaner.ttl=xxx to enable time-based metadata 
cleaner, which will clean old un-used shuffle data when it is timeout.

For Spark 1.0 another way is to clean shuffle data using weak reference 
(reference tracking based, configuration is spark.cleaner.referenceTracking), 
and it is enabled by default.

Thanks
Saisai

From: Michael Chang [mailto:m...@tellapart.com]
Sent: Friday, June 13, 2014 10:15 AM
To: user@spark.apache.org
Subject: Re: Spilled shuffle files not being cleared

Bump

On Mon, Jun 9, 2014 at 3:22 PM, Michael Chang 
m...@tellapart.commailto:m...@tellapart.com wrote:
Hi all,

I'm seeing exceptions that look like the below in Spark 0.9.1.  It looks like 
I'm running out of inodes on my machines (I have around 300k each in a 12 
machine cluster).  I took a quick look and I'm seeing some shuffle spill files 
that are around even around 12 minutes after they are created.  Can someone 
help me understand when these shuffle spill files should be cleaned up (Is it 
as soon as they are used?)

Thanks,
Michael


java.io.FileNotFoundException: 
/mnt/var/hadoop/1/yarn/local/usercache/ubuntu/appcache/application_1399886706975_13107/spark-local-20140609210947-19e1/1c/shuffle_41637_3_0
 (No space left on device)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:118)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179)
at 
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at 
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/06/09 22:07:36 WARN TaskSetManager: Lost TID 667432 (task 86909.0:7)
14/06/09 22:07:36 WARN TaskSetManager: Loss was due to 
java.io.FileNotFoundException



Re: wholeTextFiles not working with HDFS

2014-06-12 Thread yinxusen
Hi Sguj,

Could you give me the exception stack?

I test it on my laptop and find that it gets the wrong FileSystem. It should
be DistributedFileSystem, but it finds the RawLocalFileSystem.

If we get the same exception stack, I'll try to fix it.

Here is my exception stack:

java.io.FileNotFoundException: File /sen/reuters-out/reut2-000.sgm-0.txt
does not exist.
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:397)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:173)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:201)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097)
at org.apache.spark.rdd.RDD.collect(RDD.scala:728)

Besides, what's your hadoop version?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Aaron Davidson
The scripts for Spark 1.0 actually specify this property in
/root/spark/conf/spark-defaults.conf

I didn't know that this would override the --executor-memory flag, though,
that's pretty odd.


On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka 
aliaksei.lito...@gmail.com wrote:

 Yes, I am launching a cluster with the spark_ec2 script. I checked
 /root/spark/conf/spark-env.sh on the master node and on slaves and it looks
 like this:

 #!/usr/bin/env bash
 export SPARK_LOCAL_DIRS=/mnt/spark
 # Standalone cluster options
 export SPARK_MASTER_OPTS=
 export SPARK_WORKER_INSTANCES=1
 export SPARK_WORKER_CORES=1
 export HADOOP_HOME=/root/ephemeral-hdfs
 export SPARK_MASTER_IP=ec2-54-89-95-238.compute-1.amazonaws.com
 export MASTER=`cat /root/spark-ec2/cluster-url`
 export
 SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/
 export
 SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf
 # Bind Spark's web UIs to this machine's public EC2 hostname:
 export SPARK_PUBLIC_DNS=`wget -q -O -
 http://169.254.169.254/latest/meta-data/public-hostname`
 http://169.254.169.254/latest/meta-data/public-hostname
 # Set a high ulimit for large shuffles
 ulimit -n 100


 None of these variables seem to be related to memory size. Let me know if
 I am missing something.


 On Thu, Jun 12, 2014 at 7:17 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Are you launching this using our EC2 scripts? Or have you set up a
 cluster by hand?

 Matei

 On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com
 wrote:

 spark-env.sh doesn't seem to contain any settings related to memory size
 :( I will continue searching for a solution and will post it if I find it :)
 Thank you, anyway


 On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 It might be that conf/spark-env.sh on EC2 is configured to set it to
 512, and is overriding the application’s settings. Take a look in there and
 delete that line if possible.

 Matei

 On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka 
 aliaksei.lito...@gmail.com wrote:

  I am testing my application in EC2 cluster of m3.medium machines. By
 default, only 512 MB of memory on each machine is used. I want to increase
 this amount and I'm trying to do it by passing --executor-memory 2G option
 to the spark-submit script, but it doesn't seem to work - each machine uses
 only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I
 increase the amount of memory?







Re: Spark 1.0.0 Standalone AppClient cannot connect Master

2014-06-12 Thread Hao Wang
Hi, Andrew

Got it, Thanks!

Hao

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Fri, Jun 13, 2014 at 12:42 AM, Andrew Or and...@databricks.com wrote:

 Hi Wang Hao,

 This is not removed. We moved it here:
 http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html
 If you're building with SBT, and you don't specify the
 SPARK_HADOOP_VERSION, then it defaults to 1.0.4.

 Andrew


 2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com:

 Hi, all

 Why does the Spark 1.0.0 official doc remove how to build Spark with
 corresponding Hadoop version?

 It means that if I don't need to specify the Hadoop version with I build
 my Spark 1.0.0 with `sbt/sbt assembly`?


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com





multiple passes in mapPartitions

2014-06-12 Thread zhen
I want to take multiple passes through my data in mapPartitions. However, the
iterator only allows you to take one pass through the data. If I transformed
the iterator into an array using iter.toArray, it is too slow, since it
copies all the data into a new scala array. Also it takes twice the memory.
Which is also bad in terms of more GC. 

Is there a faster/better way of taking multiple passes without copying all
the data?

Thank you,

Zhen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.