Re: spark ui on yarn

2014-07-12 Thread Shuo Xiang
Hi Koert,
  Just curious did you find any information like CANNOT FIND ADDRESS
after clicking into some stage? I've seen similar problems due to lost of
executors.

Best,



On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote:

 I just tested a long lived application (that we normally run in standalone
 mode) on yarn in client mode.

 it looks to me like cached rdds are missing in the storage tap of the ui.

 accessing the rdd storage information via the spark context shows rdds as
 fully cached but they are missing on storage page.

 spark 1.0.0



Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-12 Thread Yan Fang
Thank you, Tathagata. That explains.

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 7:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Task slot is equivalent to core number. So one core can only run one task
 at a time.

 TD


 On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Tathagata,

 Thank you. Is task slot equivalent to the core number? Or actually one
 core can run multiple tasks at the same time?

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The same executor can be used for both receiving and processing,
 irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
 down to the number of cores / task slots that executor has. Each receiver
 is like a long running task, so each of them occupy a slot. If there are
 free slots in the executor then other tasks can be run on them.

 So if you are finding that the other tasks are being run, check how many
 cores/task slots the executor has and whether there are more task slots
 than the number of input dstream / receivers you are launching.

 @Praveen  your answers were pretty much spot on, thanks for chipping in!




 On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming
 - so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be
 used for map/reduce tasks. In yarn-cluster mode, the driver program is
 actually run as application master (lives in the first container thats
 launched) and this is not an executor - hence its not used for other
 operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are 
 used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*
 ). Does the work machine mean the executor or physical machine? If
 I have more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other
 operations, such as map, reduce, or fully occupied by the receiver?
 Similarly, if I run in yarn-cluster mode, is the executor running driver
 program used by other operations too?

 4. So if I have a driver program (cluster mode) and streaming
 receiver, do I have to have at least 2 executors because the program and
 streaming receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108









Re: KMeans for large training data

2014-07-12 Thread durin
Thanks, setting the number of partitions to the number of executors helped a
lot and training with 20k entries got a lot faster.

However, when I tried training with 1M entries, after about 45 minutes of
calculations, I get this:



It's stuck at this point. The CPU load for the master is at 100% (so 1 of 8
cores is used), but the WebUI shows no active task, and after 30 more
minutes of no visible change I had to leave for an appointment.
I've never seen an error referring to this library before. Could that be due
to the new partitioning?

Edit: Just before sending, in a new test I realized this error also appears
when the amount of testdata is very low (here 500 items). This time it
includes a Java stacktrace though, instead of just stopping:



So, to sum it up, KMeans.train works somewhere inbetween 10k and 200k items,
but not outside this range. Can you think of an explanation for this
behavior?


Best regards,
Simon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Announcing Spark 1.0.1

2014-07-12 Thread Brad Miller
Hi All,

Congrats to the entire Spark team on the 1.0.1 release.

In checking out the new features, I noticed that it looks like the
python API docs have been updated, but the title and the header at
the top of the page still say Spark 1.0.0.  Clearly not a big
deal... I just wouldn't want anyone to get confused and miss out.

http://spark.apache.org/docs/1.0.1/api/python/index.html

best,
-Brad

On Fri, Jul 11, 2014 at 8:44 PM, Henry Saputra henry.sapu...@gmail.com wrote:
 Congrats to the Spark community !

 On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote:

 I am happy to announce the availability of Spark 1.0.1! This release
 includes contributions from 70 developers. Spark 1.0.0 includes fixes
 across several areas of Spark, including the core API, PySpark, and
 MLlib. It also includes new features in Spark's (alpha) SQL library,
 including support for JSON data and performance and stability fixes.

 Visit the release notes[1] to read about this release or download[2]
 the release today.

 [1] http://spark.apache.org/releases/spark-release-1-0-1.html
 [2] http://spark.apache.org/downloads.html


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-12 Thread M Singh
Thanks TD.

BTW - If I have input file ~ 250 GBs - Is there any guideline on whether to use:

* a single input (250 GB) (in this case is there any max upper bound) 
or 

* split into 1000 files each of 250 MB (hdfs block size is 250 MB) or 

* a multiple of hdfs block size.
Mans





On Friday, July 11, 2014 4:38 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


The model for file stream is to pick up and process new files written 
atomically (by move) into a directory. So your file is being processed in a 
single batch, and then its waiting for any new files to be written into that 
directory. 

TD



On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote:

So, is it expected for the process to generate stages/tasks even after 
processing a file ?


Also, is there a way to figure out the file that is getting processed and when 
that process is complete ?


Thanks




On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


Whenever you need to do a shuffle=based operation like reduceByKey, 
groupByKey, join, etc., the system is essentially redistributing the data 
across the cluster and it needs to know how many parts should it divide the 
data into. Thats where the default parallelism is used. 


TD



On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

Hi TD:


The input file is on hdfs.  


The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key. 
 After the file has been processed, I see new stages with 2 tasks that 
continue to be generated. I understand this value (2) is the default value 
for spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 


Thanks.



On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

 * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

 * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the 
 file has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any 
insights/pointers for understanding this difference in behavior and how to 
avoid generating tasks/stages when there is no data (new file) available.


Thanks

Mans







Akka Client disconnected

2014-07-12 Thread Srikrishna S
I am run logistic regression with SGD on a problem with about 19M
parameters (the kdda dataset from the libsvm library)

I consistently see that the nodes on my computer get disconnected and
soon the whole job goes to a grinding halt.

14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost
executor 2 on pachy4 remote Akka client disassociated

Does this have anything to do with the akka.frame_size? I have tried
upto 1024 MB and I still get the same thing.

I don't have any more information in the logs about why the clients
are getting disconnected. Any thoughts?

Regards,
Krishna


Re: Akka Client disconnected

2014-07-12 Thread DB Tsai
Are you using 1.0 or current master? A bug related to this is fixed in
master.
On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote:

 I am run logistic regression with SGD on a problem with about 19M
 parameters (the kdda dataset from the libsvm library)

 I consistently see that the nodes on my computer get disconnected and
 soon the whole job goes to a grinding halt.

 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost
 executor 2 on pachy4 remote Akka client disassociated

 Does this have anything to do with the akka.frame_size? I have tried
 upto 1024 MB and I still get the same thing.

 I don't have any more information in the logs about why the clients
 are getting disconnected. Any thoughts?

 Regards,
 Krishna



Putting block rdd failed when running example svm on large data

2014-07-12 Thread crater
Hi,

I am trying to run the example BinaryClassification
(org.apache.spark.examples.mllib.BinaryClassification) on a 202G file. I am
constantly getting the messages looks like below, it is normal or I am
missing something.

14/07/12 09:49:04 WARN BlockManager: Block rdd_4_196 could not be dropped
from memory as it does not exist
14/07/12 09:49:04 WARN BlockManager: Putting block rdd_4_196 failed
14/07/12 09:49:05 WARN BlockManager: Block rdd_4_201 could not be dropped
from memory as it does not exist
14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_201 failed
14/07/12 09:49:05 WARN BlockManager: Block rdd_4_202 could not be dropped
from memory as it does not exist
14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_202 failed
14/07/12 09:49:05 WARN BlockManager: Block rdd_4_198 could not be dropped
from memory as it does not exist
14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_198 failed
14/07/12 09:49:05 WARN BlockManager: Block rdd_4_199 could not be dropped
from memory as it does not exist
14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_199 failed
14/07/12 09:49:05 WARN BlockManager: Block rdd_4_204 could not be dropped
from memory as it does not exist
14/07/12 09:49:05 WARN BlockManager: Putting block rdd_4_204 failed
14/07/12 09:49:06 WARN BlockManager: Block rdd_4_203 could not be dropped
from memory as it does not exist
14/07/12 09:49:06 WARN BlockManager: Putting block rdd_4_203 failed
14/07/12 09:49:07 WARN BlockManager: Block rdd_4_205 could not be dropped
from memory as it does not exist
14/07/12 09:49:07 WARN BlockManager: Putting block rdd_4_205 failed

Some info:
8 node cluster with 28G RAM per node, I configure 25G memory for spark. (So
this does not seem to be fit in memory).




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Confused by groupByKey() and the default partitioner

2014-07-12 Thread Guanhua Yan
Hi:

I have trouble understanding the default partitioner (hash) in Spark.
Suppose that an RDD with two partitions is created as follows:
x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)
Does spark partition x based on the hash of the key (e.g., a, b, c) by
default?
(1) Assuming this is correct, if I further use the groupByKey primitive,
x.groupByKey(), all the records sharing the same key should be located in
the same partition. Then it's not necessary to shuffle the data records
around, as all the grouping operations can be done locally.
(2) If it's not true, how could I specify a partitioner simply based on the
hashing of the key (in Python)?
Thank you,
- Guanhua




Re: spark ui on yarn

2014-07-12 Thread Koert Kuipers
hey shuo,
so far all stage links work fine for me.

i did some more testing, and it seems kind of random what shows up on the
gui and what does not. some partially cached RDDs make it to the GUI, while
some fully cached ones do not. I have not been able to detect a pattern.

is the codebase for the gui different in standalone than in yarn-client
mode?


On Sat, Jul 12, 2014 at 3:34 AM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Hi Koert,
   Just curious did you find any information like CANNOT FIND ADDRESS
 after clicking into some stage? I've seen similar problems due to lost of
 executors.

 Best,



 On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote:

 I just tested a long lived application (that we normally run in
 standalone mode) on yarn in client mode.

 it looks to me like cached rdds are missing in the storage tap of the ui.

 accessing the rdd storage information via the spark context shows rdds as
 fully cached but they are missing on storage page.

 spark 1.0.0





Re: Anaconda Spark AMI

2014-07-12 Thread Benjamin Zaitlen
Hi All,

Thanks to Jey's help, I have a release AMI candidate for
spark-1.0/anaconda-2.0 integration.  It's currently limited to availability
in US-EAST: ami-3ecd0c56

Give it a try if you have some time.  This should* just work* with spark
1.0:

./spark-ec2 -k my_key -i ~/.ssh/mykey.rsa  -a ami-3ecd0c56

If you have suggestions or run into trouble please email,

--Ben

PS:  I found that writing a noop map function is a decent way to install
pkgs on worker nodes (though most scientific pkgs are pre-installed with
anaconda:

def subprocess_noop(x):
import os
os.system(/opt/anaconda/bin/conda install h5py)
return 1

install_noop = rdd.map(subprocess_noop)
install_noop.count()


On Thu, Jul 3, 2014 at 2:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote:

 Hi Ben,

 Has the PYSPARK_PYTHON environment variable been set in
 spark/conf/spark-env.sh to the path of the new python binary?

 FYI, there's a /root/copy-dirs script that can be handy when updating
 files on an already-running cluster. You'll want to restart the spark
 cluster for the changes to take effect, as described at
 https://spark.apache.org/docs/latest/ec2-scripts.html

 Hope that helps,
 -Jey

 On Thu, Jul 3, 2014 at 11:54 AM, Benjamin Zaitlen quasi...@gmail.com
 wrote:
  Hi All,
 
  I'm a dev a Continuum and we are developing a fair amount of tooling
 around
  Spark.  A few days ago someone expressed interest in numpy+pyspark and
  Anaconda came up as a reasonable solution.
 
  I spent a number of hours yesterday trying to rework the base Spark AMI
 on
  EC2 but sadly was defeated by a number of errors.
 
  Aggregations seemed to choke -- where as small takes executed as aspected
  (errors are linked to the gist):
 
  sc.appName
  u'PySparkShell'
  sc._conf.getAll()
  [(u'spark.executor.extraLibraryPath',
 u'/root/ephemeral-hdfs/lib/native/'),
  (u'spark.executor.memory', u'6154m'), (u'spark.submit.pyFiles', u''),
  (u'spark.app.name', u'
  PySparkShell'), (u'spark.executor.extraClassPath',
  u'/root/ephemeral-hdfs/conf'), (u'spark.master',
  u'spark://.compute-1.amazonaws.com:7077')]
  file = sc.textFile(hdfs:///user/root/chekhov.txt)
  file.take(2)
  [uProject Gutenberg's Plays by Chekhov, Second Series, by Anton
 Chekhov,
  u'']
 
  lines = file.filter(lambda x: len(x)  0)
  lines.count()
  VARIOUS ERROS DISCUSSED BELOW
 
  My first thought was that I could simply get away with including
 anaconda on
  the base AMI, point the path at /dir/anaconda/bin, and bake a new one.
  Doing so resulted in some strange py4j errors like the following:
 
  Py4JError: An error occurred while calling o17.partitions. Trace:
  py4j.Py4JException: Method partitions([]) does not exist
 
  At some point I also saw:
  SystemError: Objects/cellobject.c:24: bad argument to internal function
 
  which is really strange, possibly the result of a version mismatch?
 
  I had another thought of building spark from master on the AMI, leaving
 the
  spark directory in place, and removing the spark call from the modules
 list
  in spark-ec2 launch script. Unfortunately, this resulted in the following
  errors:
 
  https://gist.github.com/quasiben/da0f4778fbc87d02c088
 
  If a spark dev was willing to make some time in the near future, I'm sure
  she/he and I could sort out these issues and give the Spark community a
  python distro ready to go for numerical computing.  For instance, I'm not
  sure how pyspark calls out to launching a python session on a slave?  Is
  this done as root or as the hadoop user? (i believe i changed
 /etc/bashrc to
  point to my anaconda bin directory so it shouldn't really matter.  Is
 there
  something special about the py4j zip include in spark dir compared with
 the
  py4j in pypi?
 
  Thoughts?
 
  --Ben
 
 



Scalability issue in Spark with SparkPageRank example

2014-07-12 Thread lokesh.gidra
Hello, 


I ran SparkPageRank example (the one included in the package) to evaluate 
scale-in capability of Spark. I ran experiments on a 8-node 48-core 
AMD machine with local[N] master. But, for N  10, the completion time 
of the experiment kept increasing, rather than decreasing. 

When I profiled it using Jprofiler, I observed that it wasn't any lock 
which consumed the CPU time. Instead, the amount of time spent in the 
following functions kept increasing: 

1) java.io.ObjectOutputStream.writeObject0 
2) scala.Tuple2.hashCode 

I confirmed the same with Oprofile as well. The findings are consistent. 

I am attaching the jstack output which I took twice during the whole 
execution with N=48 run. I ran the tests with Spark 1.0.0 and Spark 0.9.0

Can someone please suggest me what is wrong. 


Regards,
Lokesh Gidra

lessoutput3.lessoutput3
http://apache-spark-user-list.1001560.n3.nabble.com/file/n9521/lessoutput3.lessoutput3
  
lessoutput4.lessoutput4
http://apache-spark-user-list.1001560.n3.nabble.com/file/n9521/lessoutput4.lessoutput4
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scalability-issue-in-Spark-with-SparkPageRank-example-tp9521.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Stopping StreamingContext does not kill receiver

2014-07-12 Thread Nick Chammas
From the interactive shell I’ve created a StreamingContext.

I call ssc.start() and take a look at http://master_url:4040/streaming/ and
see that I have an active Twitter receiver. Then I call
ssc.stop(stopSparkContext
= false, stopGracefully = true) and wait a bit, but the receiver seems to
stay active.

Is this expected? I’m running 1.0.1 on EC2.

Nick
​




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Stopping StreamingContext does not kill receiver

2014-07-12 Thread Tathagata Das
Yes, thats a bug i just discovered. Race condition in the Twitter Receiver,
will fix asap.
Here is the JIRA https://issues.apache.org/jira/browse/SPARK-2464

TD


On Sat, Jul 12, 2014 at 3:21 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 To add a potentially relevant piece of information, around when I stop the
 StreamingContext, I get the following warning:

 14/07/12 22:16:18 WARN ReceiverTracker: All of the receivers have not 
 deregistered, Map(0 - 
 ReceiverInfo(0,TwitterReceiver-0,Actor[akka.tcp://spark@url-here:49776/user/Receiver-0-140520314#-273455949],true,url-here-too,,))

 Nick
 ​


 On Sat, Jul 12, 2014 at 6:03 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 From the interactive shell I’ve created a StreamingContext.

 I call ssc.start() and take a look at http://master_url:4040/streaming/
 and see that I have an active Twitter receiver. Then I call 
 ssc.stop(stopSparkContext
 = false, stopGracefully = true) and wait a bit, but the receiver seems
 to stay active.

 Is this expected? I’m running 1.0.1 on EC2.

 Nick
 ​

 --
 View this message in context: Stopping StreamingContext does not kill
 receiver
 http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.





Re: Stopping StreamingContext does not kill receiver

2014-07-12 Thread Nicholas Chammas
Okie doke. Thanks for filing the JIRA.


On Sat, Jul 12, 2014 at 6:45 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Yes, thats a bug i just discovered. Race condition in the Twitter
 Receiver, will fix asap.
 Here is the JIRA https://issues.apache.org/jira/browse/SPARK-2464

 TD


 On Sat, Jul 12, 2014 at 3:21 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 To add a potentially relevant piece of information, around when I stop
 the StreamingContext, I get the following warning:

 14/07/12 22:16:18 WARN ReceiverTracker: All of the receivers have not 
 deregistered, Map(0 - 
 ReceiverInfo(0,TwitterReceiver-0,Actor[akka.tcp://spark@url-here:49776/user/Receiver-0-140520314#-273455949],true,url-here-too,,))

 Nick
 ​


 On Sat, Jul 12, 2014 at 6:03 PM, Nick Chammas nicholas.cham...@gmail.com
  wrote:

 From the interactive shell I’ve created a StreamingContext.

 I call ssc.start() and take a look at http://master_url:4040/streaming/
 and see that I have an active Twitter receiver. Then I call 
 ssc.stop(stopSparkContext
 = false, stopGracefully = true) and wait a bit, but the receiver seems
 to stay active.

 Is this expected? I’m running 1.0.1 on EC2.

 Nick
 ​

 --
 View this message in context: Stopping StreamingContext does not kill
 receiver
 http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-StreamingContext-does-not-kill-receiver-tp9522.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.






Re: Confused by groupByKey() and the default partitioner

2014-07-12 Thread Aaron Davidson
Yes, groupByKey() does partition by the hash of the key unless you specify
a custom Partitioner.

(1) If you were to use groupByKey() when the data was already partitioned
correctly, the data would indeed not be shuffled. Here is the associated
code, you'll see that it simply checks that the Partitioner the groupBy()
is looking for is equal to the Partitioner of the pre-existing RDD:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89

By the way, I should warn you that groupByKey() is not a recommended
operation if you can avoid it, as it has non-obvious performance issues
when running with serious data.


On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote:

 Hi:

 I have trouble understanding the default partitioner (hash) in Spark.
 Suppose that an RDD with two partitions is created as follows:

 x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2)

 Does spark partition x based on the hash of the key (e.g., a, b, c) by 
 default?

 (1) Assuming this is correct, if I further use the groupByKey primitive, 
 x.groupByKey(), all the records sharing the same key should be located in the 
 same partition. Then it's not necessary to shuffle the data records around, 
 as all the grouping operations can be done locally.

 (2) If it's not true, how could I specify a partitioner simply based on the 
 hashing of the key (in Python)?

 Thank you,

 - Guanhua




Re: Akka Client disconnected

2014-07-12 Thread Srikrishna S
I am using the master that I compiled 2 days ago. Can you point me to the JIRA?

On Sat, Jul 12, 2014 at 9:13 AM, DB Tsai dbt...@dbtsai.com wrote:
 Are you using 1.0 or current master? A bug related to this is fixed in
 master.

 On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote:

 I am run logistic regression with SGD on a problem with about 19M
 parameters (the kdda dataset from the libsvm library)

 I consistently see that the nodes on my computer get disconnected and
 soon the whole job goes to a grinding halt.

 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost
 executor 2 on pachy4 remote Akka client disassociated

 Does this have anything to do with the akka.frame_size? I have tried
 upto 1024 MB and I still get the same thing.

 I don't have any more information in the logs about why the clients
 are getting disconnected. Any thoughts?

 Regards,
 Krishna


Re: KMeans for large training data

2014-07-12 Thread Aaron Davidson
The netlib.BLAS: Failed to load implementation warning only means that
the BLAS implementation may be slower than using a native one. The reason
why it only shows up at the end is that the library is only used for the
finalization step of the KMeans algorithm, so your job should've been
wrapping up at this point. I am not familiar with the algorithm beyond
that, so I'm not sure if for some reason we're trying to collect too much
data back to the driver here.

SPARK_DRIVER_MEMORY can increase the driver memory, by the way (or by using
the --driver-memory option when using spark-submit).


On Sat, Jul 12, 2014 at 2:38 AM, durin m...@simon-schaefer.net wrote:

 Your latest response doesn't show up here yet, I only got the mail. I'll
 still answer here in the hope that it appears later:

 Which memory setting do you mean? I can go up with spark.executor.memory a
 bit, it's currently set to 12G. But thats already way more than the whole
 SchemaRDD of Vectors that I currently use for training, which shouldn't be
 more than a few hundred M.
 I suppose you rather mean something comparable to SHARK_MASTER_MEM in
 Shark.
 I can't find the equivalent for Spark in the documentations, though.

 And if it helps, I can summarize the whole code currently that I currently
 use. It's nothing really fancy at the moment, I'm just trying to classify
 Strings that each contain a few words (words are handled each as atomic
 items).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9509.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Convert from RDD[Object] to RDD[Array[Object]]

2014-07-12 Thread Aaron Davidson
If you don't really care about the batchedDegree, but rather just want to
do operations over some set of elements rather than one at a time, then
just use mapPartitions().

Otherwise, if you really do want certain sized batches and you are able to
relax the constraints slightly, is to construct these batches within each
partition. For instance:

val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =
  new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext
def next(): Array[Int] = {
  iter.take(batchedDegree).toArray
}
  }
}

This function is efficient in that it does not load the entire partition
into memory, just enough to construct each batch. However, there will be
one smaller batch at the end of each partition (rather than just one over
the entire dataset).



On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote:

 Hi there,

 I have a bunch of data in a RDD, which I processed it one by one
 previously.
 For example, there was a RDD denoted by data: RDD[Object] and then I
 processed it using data.map(...).  However, I got a new requirement to
 process the data in a patched way. It means that I need to convert the RDD
 from RDD[Object] to RDD[Array[Object]] and then process it, which is to
 fill
 out this function: def convert2array(inputs: RDD[Object], batchedDegree:
 Int): RDD[Array[Object]] = {...}.

 I hope that after the conversion, each element of the new RDD is an array
 of
 the previous RDD elements. The parameter batchedDegree specifies how many
 elements are batched together. For example, if I have 210 elements in the
 previous RDD, the result of the conversion functions should be a RDD with 3
 elements. Each element is an array, and the first two arrays contains 1~100
 and 101~200 elements. The third element contains 201~210 elements.

 I was wondering if anybody could help me complete this scala function with
 an efficient way. Thanks a lot.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: pyspark sc.parallelize running OOM with smallish data

2014-07-12 Thread Aaron Davidson
I think this is probably dying on the driver itself, as you are probably
materializing the whole dataset inside your python driver. How large is
spark_data_array compared to your driver memory?


On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I put the same dataset into scala (using spark-shell) and it acts weird. I
 cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
 in the status bar, shows details about the worker nodes but there is no
 progress.
 sc.parallelize does finish (takes too long for the data size) in scala.


 On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
 in the cluster and gave 48g to executors. also tried kyro serialization.

 traceback (most recent call last):

   File /mohit/./m.py, line 58, in module

 spark_data = sc.parallelize(spark_data_array)

   File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

 : java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)





Re: Akka Client disconnected

2014-07-12 Thread DB Tsai
https://issues.apache.org/jira/browse/SPARK-2156

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sat, Jul 12, 2014 at 5:23 PM, Srikrishna S srikrishna...@gmail.com wrote:
 I am using the master that I compiled 2 days ago. Can you point me to the 
 JIRA?

 On Sat, Jul 12, 2014 at 9:13 AM, DB Tsai dbt...@dbtsai.com wrote:
 Are you using 1.0 or current master? A bug related to this is fixed in
 master.

 On Jul 12, 2014 8:50 AM, Srikrishna S srikrishna...@gmail.com wrote:

 I am run logistic regression with SGD on a problem with about 19M
 parameters (the kdda dataset from the libsvm library)

 I consistently see that the nodes on my computer get disconnected and
 soon the whole job goes to a grinding halt.

 14/07/12 03:05:16 ERROR cluster.YarnClientClusterScheduler: Lost
 executor 2 on pachy4 remote Akka client disassociated

 Does this have anything to do with the akka.frame_size? I have tried
 upto 1024 MB and I still get the same thing.

 I don't have any more information in the logs about why the clients
 are getting disconnected. Any thoughts?

 Regards,
 Krishna


Re: Putting block rdd failed when running example svm on large data

2014-07-12 Thread crater
Hi Xiangrui, 

Thanks for the information. Also, it is possible to figure out the execution
time per iteration for SVM?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Putting block rdd failed when running example svm on large data

2014-07-12 Thread Aaron Davidson
Also check the web ui for that. Each iteration will have one or more stages
associated with it in the driver web ui.


On Sat, Jul 12, 2014 at 6:47 PM, crater cq...@ucmerced.edu wrote:

 Hi Xiangrui,

 Thanks for the information. Also, it is possible to figure out the
 execution
 time per iteration for SVM?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Supported SQL syntax in Spark SQL

2014-07-12 Thread Nick Chammas
Is there a place where we can find an up-to-date list of supported SQL
syntax in Spark SQL?

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-tp9538.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Large Task Size?

2014-07-12 Thread Kyle Ellrott
I'm working of a patch to MLLib that allows for multiplexing several
different model optimization using the same RDD ( SPARK-2372:
https://issues.apache.org/jira/browse/SPARK-2372 )

In testing larger datasets, I've started to see some memory errors (
java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize
errors ).
My main clue is that Spark will start logging warning on smaller systems
like:

14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task
of very large size (10119 KB). The maximum recommended task size is 100 KB.

Looking up start '2862' in the case leads to a 'sample at
GroupedGradientDescent.scala:156' call. That code can be seen at
https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156

I've looked over the code, I'm broadcasting the larger variables, and
between the sampler and the combineByKey, I wouldn't think there much data
being moved over the network, much less a 10MB chunk.

Any ideas of what this might be a symptom of?

Kyle


Re: Large Task Size?

2014-07-12 Thread Aaron Davidson
I also did a quick glance through the code and couldn't find anything
worrying that should be included in the task closures. The only possibly
unsanitary part is the Updater you pass in -- what is your Updater and is
it possible it's dragging in a significant amount of extra state?


On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I'm working of a patch to MLLib that allows for multiplexing several
 different model optimization using the same RDD ( SPARK-2372:
 https://issues.apache.org/jira/browse/SPARK-2372 )

 In testing larger datasets, I've started to see some memory errors (
 java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize
 errors ).
 My main clue is that Spark will start logging warning on smaller systems
 like:

 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a
 task of very large size (10119 KB). The maximum recommended task size is
 100 KB.

 Looking up start '2862' in the case leads to a 'sample at
 GroupedGradientDescent.scala:156' call. That code can be seen at

 https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156

 I've looked over the code, I'm broadcasting the larger variables, and
 between the sampler and the combineByKey, I wouldn't think there much data
 being moved over the network, much less a 10MB chunk.

 Any ideas of what this might be a symptom of?

 Kyle




Re: not getting output from socket connection

2014-07-12 Thread Walrus theCat
Thanks!

I thought it would get passed through netcat, but given your email, I was
able to follow this tutorial and get it to work:

http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html




On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote:

 netcat is listening for a connection on port . It is echoing what
 you type to its console to anything that connects to  and reads.
 That is what Spark streaming does.

 If you yourself connect to  and write, nothing happens except that
 netcat echoes it. This does not cause Spark to somehow get that data.
 nc is only echoing input from the console.

 On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com
 wrote:
  Hi,
 
  I have a java application that is outputting a string every second.  I'm
  running the wordcount example that comes with Spark 1.0, and running nc
 -lk
  . When I type words into the terminal running netcat, I get counts.
  However, when I write the String onto a socket on port , I don't get
  counts.  I can see the strings showing up in the netcat terminal, but no
  counts from Spark.  If I paste in the string, I get counts.
 
  Any ideas?
 
  Thanks



Re: Convert from RDD[Object] to RDD[Array[Object]]

2014-07-12 Thread Mark Hamstra
And if you can relax your constraints even further to only require
RDD[List[Int]], then it's even simpler:

rdd.mapPartitions(_.grouped(batchedDegree))


On Sat, Jul 12, 2014 at 6:26 PM, Aaron Davidson ilike...@gmail.com wrote:

 If you don't really care about the batchedDegree, but rather just want to
 do operations over some set of elements rather than one at a time, then
 just use mapPartitions().

 Otherwise, if you really do want certain sized batches and you are able to
 relax the constraints slightly, is to construct these batches within each
 partition. For instance:

 val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =
   new Iterator[Array[Int]] {
 def hasNext: Boolean = iter.hasNext
 def next(): Array[Int] = {
   iter.take(batchedDegree).toArray
 }
   }
 }

 This function is efficient in that it does not load the entire partition
 into memory, just enough to construct each batch. However, there will be
 one smaller batch at the end of each partition (rather than just one over
 the entire dataset).



 On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote:

 Hi there,

 I have a bunch of data in a RDD, which I processed it one by one
 previously.
 For example, there was a RDD denoted by data: RDD[Object] and then I
 processed it using data.map(...).  However, I got a new requirement to
 process the data in a patched way. It means that I need to convert the RDD
 from RDD[Object] to RDD[Array[Object]] and then process it, which is to
 fill
 out this function: def convert2array(inputs: RDD[Object], batchedDegree:
 Int): RDD[Array[Object]] = {...}.

 I hope that after the conversion, each element of the new RDD is an array
 of
 the previous RDD elements. The parameter batchedDegree specifies how
 many
 elements are batched together. For example, if I have 210 elements in the
 previous RDD, the result of the conversion functions should be a RDD with
 3
 elements. Each element is an array, and the first two arrays contains
 1~100
 and 101~200 elements. The third element contains 201~210 elements.

 I was wondering if anybody could help me complete this scala function with
 an efficient way. Thanks a lot.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: spark ui on yarn

2014-07-12 Thread Matei Zaharia
The UI code is the same in both, but one possibility is that your executors 
were given less memory on YARN. Can you check that? Or otherwise, how do you 
know that some RDDs were cached?

Matei

On Jul 12, 2014, at 4:12 PM, Koert Kuipers ko...@tresata.com wrote:

 hey shuo,
 so far all stage links work fine for me.
 
 i did some more testing, and it seems kind of random what shows up on the gui 
 and what does not. some partially cached RDDs make it to the GUI, while some 
 fully cached ones do not. I have not been able to detect a pattern.
 
 is the codebase for the gui different in standalone than in yarn-client mode? 
 
 
 On Sat, Jul 12, 2014 at 3:34 AM, Shuo Xiang shuoxiang...@gmail.com wrote:
 Hi Koert,
   Just curious did you find any information like CANNOT FIND ADDRESS after 
 clicking into some stage? I've seen similar problems due to lost of executors.
 
 Best,
 
 
 
 On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote:
 I just tested a long lived application (that we normally run in standalone 
 mode) on yarn in client mode.
 
 it looks to me like cached rdds are missing in the storage tap of the ui.
 
 accessing the rdd storage information via the spark context shows rdds as 
 fully cached but they are missing on storage page.
 
 spark 1.0.0