java.lang.NoClassDefFoundError: org/apache/spark/deploy/worker/Worker

2014-05-17 Thread Hao Wang
Hi, all

*Spark version: bae07e3 [behind 1] fix different versions of commons-lang
dependency and apache/spark#746 addendum*

I have six worker nodes and four of them have this NoClassDefFoundError when
I use thestart-slaves.sh on my driver node. However, running ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://MASTER_IP:PORT on the worker
nodes works well.

I compile the /spark directory on driver node and distribute to all the
worker nodes. Paths on different nodes are identical.

Here is the logs from one of four driver nodes.

Spark Command: java -cp
::/home/wanghao/spark/conf:/home/wanghao/spark/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://192.168.1.12:7077
--webui-port 8081


Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/deploy/worker/Worker
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.deploy.worker.Worker
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: org.apache.spark.deploy.worker.Worker.
Program will exit.

Here is spark-env.sh

export SPARK_WORKER_MEMORY=1g
export SPARK_MASTER_IP=192.168.1.12
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=2

hosts file:

127.0.0.1   localhost
192.168.1.12sing12

# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

192.168.1.11 sing11
192.168.1.59 sing59

###
# failed machines
###

192.168.1.122 host122
192.168.1.123 host123
192.168.1.124 host124
192.168.1.125 host125


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: breeze DGEMM slow in spark

2014-05-17 Thread Xiangrui Meng
You need to include breeze-natives or netlib:all to load the native
libraries. Check the log messages to ensure native libraries are used,
especially on the worker nodes. The easiest way to use OpenBLAS is
copying the shared library to /usr/lib/libblas.so.3 and
/usr/lib/liblapack.so.3. -Xiangrui

On Sat, May 17, 2014 at 8:02 PM, wxhsdp  wrote:
> i think maybe it's related to m1.large, because i also tested on my laptop,
> the two case cost nearly
> the same amount of time.
>
> my laptop:
> model name  : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz
> cpu MHz : 2893.549
>
> os:
> Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013
> x86_64 x86_64 x86_64 GNU/Linux
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: breeze DGEMM slow in spark

2014-05-17 Thread wxhsdp
i think maybe it's related to m1.large, because i also tested on my laptop,
the two case cost nearly
the same amount of time.

my laptop:
model name  : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz
cpu MHz : 2893.549

os:
Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013
x86_64 x86_64 x86_64 GNU/Linux




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Text file and shuffle

2014-05-17 Thread Puneet Lakhina
Hi,

I'm new to spark and I wanted to understand a few things conceptually so that I 
can optimize my spark job. I have a large text file (~14G, 200k lines). This 
file is available on each worker node of my spark cluster. The job I run calls 
sc.textFile(...).flatmap(...) . The function that I pass into flat map splits 
up each line from the file into a key and value. Now I have another text file 
which is smaller in size(~1.5G) but has a lot more lines because it has more 
than one value per key spread across multiple lines. . I call the same textFile 
and flatmap functions on they other file and then call groupByKey to have all 
values for a key available as a list. 

Having done this I then cogroup these 2 RDDs. I have the following questions

1. Is this sequence of steps the best way to achieve what I want, I.e a join 
across the 2 data sets?

2. I have a 8 node (25 Gb memory each) . The large file flatmap spawns about 
400 odd tasks whereas the small file flatmap only spawns about 30 odd tasks. 
The large file's flatmap takes about 2-3 mins and during this time it seems to 
do about 3G of shuffle write. I want to understand if this shuffle write is 
something I can avoid. From what I have read, the shuffle write is a disk 
write. Is that correct? Also is the reason for the shuffle write the fact that 
the partitioner for flatmap ends up having to redistribute the data across the 
cluster? 

Please let me know if I haven't provided enough information. I'm new to spark 
so if you see anything fundamental that I don't understand please feel free to 
just point me to a link that provides some detailed information.

Thanks,
Puneet

Unsubscribe

2014-05-17 Thread A.Khanolkar


Unsubscribe

2014-05-17 Thread A.Khanolkar


Re: Historical Data as Stream

2014-05-17 Thread Soumya Simanta
@Laeeq - please see this example.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala#L47-L49



On Sat, May 17, 2014 at 2:06 PM, Laeeq Ahmed  wrote:

> @Soumya Simanta
>
> Right now its just a prove of concept. Later I will have a real stream.
> Its EEG files of brain. Later it can be used for real time analysis of eeg
> streams.
>
> @Mayur
>
> The size is huge yes. SO its better to do in distributed manner and as I
> said above I want to read as stream because later i will have stream data.
> This is a prove a concept.
>
> Regards,
> Laeeq
>
>   On Saturday, May 17, 2014 7:03 PM, Mayur Rustagi <
> mayur.rust...@gmail.com> wrote:
>  The real question is why are looking to consume file as a Stream
> 1. Too big to load as RDD
> 2. Operate in sequential manner.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta 
> wrote:
>
> File is just a steam with a fixed length. Usually streams don't end but in
> this case it would.
>
> On the other hand if you real your file as a steam may not be able to use
> the entire data in the file for your analysis. Spark (give enough memory)
> can process large amounts of data quickly.
>
> On May 15, 2014, at 9:52 AM, Laeeq Ahmed  wrote:
>
> Hi,
>
> I have data in a file. Can I read it as Stream in spark? I know it seems
> odd to read file as stream but it has practical applications in real life
> if I can read it as stream. It there any other tools which can give this
> file as stream to Spark or I have to make batches manually which is not
> what I want. Its a coloumn of a million values.
>
> Regards,
> Laeeq
>
>
>
>
>
>


Re: Configuring Spark for reduceByKey on on massive data sets

2014-05-17 Thread Matei Zaharia
Make sure you set up enough reduce partitions so you don’t overload them. 
Another thing that may help is checking whether you’ve run out of local disk 
space on the machines, and turning on spark.shuffle.consolidateFiles to produce 
fewer files. Finally, there’s been a recent fix in both branch 0.9 and master 
that reduces the amount of memory used when there are small files (due to extra 
memory that was being taken by mmap()): 
https://issues.apache.org/jira/browse/SPARK-1145. You can find this in either 
the 1.0 release candidates on the dev list, or branch-0.9 in git.

Matei

On May 17, 2014, at 5:45 PM, Madhu  wrote:

> Daniel,
> 
> How many partitions do you have?
> Are they more or less uniformly distributed?
> We have similar data volume currently running well on Hadoop MapReduce with
> roughly 30 nodes. 
> I was planning to test it with Spark. 
> I'm very interested in your findings. 
> 
> 
> 
> -
> Madhu
> https://www.linkedin.com/in/msiddalingaiah
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Spark-for-reduceByKey-on-on-massive-data-sets-tp5966p5967.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Configuring Spark for reduceByKey on on massive data sets

2014-05-17 Thread Madhu
Daniel,

How many partitions do you have?
Are they more or less uniformly distributed?
We have similar data volume currently running well on Hadoop MapReduce with
roughly 30 nodes. 
I was planning to test it with Spark. 
I'm very interested in your findings. 



-
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Spark-for-reduceByKey-on-on-massive-data-sets-tp5966p5967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Benchmarking Graphx

2014-05-17 Thread Hari
HI, I want to do some benchmarking tests (run-time and memory) for one of
GraphX examples, lets say PageRank on my single processor PC to start with. 
a) Is there a way to get the total time taken for the execution from start
to finish? 
b) log4j properties need to be modified to turn off logging, but its not
clear how to. 
c) how can this be extended to a cluster?
d) also how to quantify memory overhead if i added more functionality to the
execution?
e) any scripts? reports generated?

i am new to GraphX and Clusters, any help would be appreciated?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Graphx-tp5965.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Configuring Spark for reduceByKey on on massive data sets

2014-05-17 Thread Daniel Mahler
I have had a lot of success with Spark on large datasets,
both in terms of performance and flexibility.
However I hit a wall with reduceByKey when the RDD contains billions of
items.
I am reducing with simple functions like addition for building histograms,
so the reduction process should be constant memory.
I am using 10s of AWS-EC2 macines with 60G memory and 30 processors.

After a while the whole process just hangs.
I have not been able to isolate the root problem from the logs,
but I suspect that the problem is in the shuffling.
Simple mapping and filtering transfomations work fine,
and the reductions work fine if I reduce the data down to 10^8 items
makes the reduceByKey go through.

What do I need to do to make reducByKey work for >10^9 items.

thanks
Daniel


Spark and Solr indexing

2014-05-17 Thread Flavio Pompermaier
Hi to all,

I've read about how to create an Elastiscearch index using Spark at
http://loads.pickle.me.uk/2013/11/12/spark-and-elasticsearch.html.
I have 2 questions:
1 - How is Elasticsearch able to autodetect that the hdfs index files have
changed?
2 - Is there anybody that has done the same for Solr or could give me some
hint about that?

Best,
Flavio


Re: Hadoop Writable and Spark serialization

2014-05-17 Thread Madhu
Have you tried implementing Serializable?

This is similar to what I did:

public class MySequenceFileClass implements WritableComparable, Serializable

Read as sequence file.
I tried takeSample, it works for me.

I found that if I didn't implement Serializable, I got a serialization
exception.

I didn't have to do any registration of the class.
Of course, all referenced classes must also implement Serializable.
Is that a problem in your application?



-
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-Writable-and-Spark-serialization-tp5721p5962.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Historical Data as Stream

2014-05-17 Thread Laeeq Ahmed
@Soumya Simanta

Right now its just a prove of concept. Later I will have a real stream. Its EEG 
files of brain. Later it can be used for real time analysis of eeg streams.

@Mayur

The size is huge yes. SO its better to do in distributed manner and as I said 
above I want to read as stream because later i will have stream data. This is a 
prove a concept.

Regards,
Laeeq 


On Saturday, May 17, 2014 7:03 PM, Mayur Rustagi  
wrote:
 
The real question is why are looking to consume file as a Stream
1. Too big to load as RDD 
2. Operate in sequential manner.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi



On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta  
wrote:

File is just a steam with a fixed length. Usually streams don't end but in this 
case it would. 
>
>
>On the other hand if you real your file as a steam may not be able to use the 
>entire data in the file for your analysis. Spark (give enough memory) can 
>process large amounts of data quickly. 
>
>On May 15, 2014, at 9:52 AM, Laeeq Ahmed  wrote:
>
>
>Hi,
>>
>>I have data in a file. Can I read it as Stream in spark? I know it seems odd 
>>to read file as stream but it has practical applications in real life if I 
>>can read it as stream. It there any other tools which can give this file as 
>>stream to Spark or I have to make batches manually which is not what I want. 
>>Its a coloumn of a million values.
>>
>>Regards,
>>Laeeq
>> 
>>

Re: Historical Data as Stream

2014-05-17 Thread Mayur Rustagi
The real question is why are looking to consume file as a Stream
1. Too big to load as RDD
2. Operate in sequential manner.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta wrote:

> File is just a steam with a fixed length. Usually streams don't end but in
> this case it would.
>
> On the other hand if you real your file as a steam may not be able to use
> the entire data in the file for your analysis. Spark (give enough memory)
> can process large amounts of data quickly.
>
> On May 15, 2014, at 9:52 AM, Laeeq Ahmed  wrote:
>
> Hi,
>
> I have data in a file. Can I read it as Stream in spark? I know it seems
> odd to read file as stream but it has practical applications in real life
> if I can read it as stream. It there any other tools which can give this
> file as stream to Spark or I have to make batches manually which is not
> what I want. Its a coloumn of a million values.
>
> Regards,
> Laeeq
>
>
>


Re: Benchmarking Spark with YCSB

2014-05-17 Thread Mayur Rustagi
On that note Terasort would be good too :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, May 17, 2014 at 4:13 PM, bhusted  wrote:

> Thanks Jay.  I honestly think I just had a senior moment or something.  I
> was
> getting HiBench and YCSB confused.  Has anyone attempted to port HiBench to
> using Spark?  HiBench performs a lot of map/reduce and it would be a very
> interesting comparison for us.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Spark-with-YCSB-tp5813p5953.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Using mongo with PySpark

2014-05-17 Thread Mayur Rustagi
You have to ideally pass the mongoclient object along with your data in the
mapper(python should be try to serialize your mongoclient, but explicit is
better)
if client is serializable then all should end well.. if not then you are
better off using map partition & initilizing the driver in each iteration &
load data of each partition. Thr is a similar discussion in the list in the
past.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Where's your driver code (the code interacting with the RDDs)? Are you
> getting serialization errors?
>
> 2014년 5월 17일 토요일, Samarth Mailinglist님이 작성한
> 메시지:
>
> Hi all,
>>
>> I am trying to store the results of a reduce into mongo.
>> I want to share the variable "collection" in the mappers.
>>
>>
>> Here's what I have so far (I'm using pymongo)
>>
>> db = MongoClient()['spark_test_db']
>> collec = db['programs']
>> db = MongoClient()['spark_test_db']
>> *collec = db['programs']*
>>
>> def mapper(val):
>> asc = val.encode('ascii','ignore')
>> json = convertToJSON(asc, indexMap)
>> collec.insert(json) # *this is not working*
>>
>> def convertToJSON(string, indexMap):
>> values = string.strip().split(",")
>> json = {}
>> for i in range(len(values)):
>> json[indexMap[i]] = values[i]
>> return json
>>
>> How do I do this?
>>
>


Re: Worker re-spawn and dynamic node joining

2014-05-17 Thread Mayur Rustagi
A better way would be use Mesos (and quite possibly Yarn in 1.0.0).
That will allow you to add nodes on the fly & leverage it for Spark.
Frankly Standalone mode is not meant to handle those issues. That said we
use our deployment tool as stopping the cluster for adding nodes is not
really an issue at the moment.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Sat, May 17, 2014 at 9:05 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Thanks for the info about adding/removing nodes dynamically. That's
> valuable.
>
> 2014년 5월 16일 금요일, Akhil Das님이 작성한 메시지:
>
>  Hi Han :)
>>
>> 1. Is there a way to automatically re-spawn spark workers? We've
>> situations where executor OOM causes worker process to be DEAD and it does
>> not came back automatically.
>>
>> => Yes. You can either add OOM killer 
>> exception on
>> all of your Spark processes. Or you can have a cronjob which will keep
>> monitoring your worker processes and if they goes down the cronjob will
>> bring it back.
>>
>>   2. How to dynamically add (or remove) some worker machines to (from)
>> the cluster? We'd like to leverage the auto-scaling group in EC2 for
>> example.
>>
>> => You can add/remove worker nodes on the fly by spawning a new machine
>> and then adding that machine's ip address in the master node then rsyncing
>> the spark directory with all worker machines including the one you added.
>> Then simply you can use the *start-all.sh* script inside the master node
>> to bring up the new worker in action. For removing a worker machine from
>> master can be done in the same way, you have to remove the workers IP
>> address from the masters *slaves *file and then you can restart your
>> slaves and that will get your worker removed.
>>
>>
>> FYI, we have a deployment tool (a web-based UI) that we use for internal
>> purposes, it is build on top of the spark-ec2 script (with some changes)
>> and it has a module for adding/removing worker nodes on the fly. It looks
>> like the attached screenshot. If you want i can give you some access.
>>
>> Thanks
>> Best Regards
>>
>>
>> On Wed, May 14, 2014 at 9:52 PM, Han JU  wrote:
>>
>>> Hi all,
>>>
>>> Just 2 questions:
>>>
>>>   1. Is there a way to automatically re-spawn spark workers? We've
>>> situations where executor OOM causes worker process to be DEAD and it does
>>> not came back automatically.
>>>
>>>   2. How to dynamically add (or remove) some worker machines to (from)
>>> the cluster? We'd like to leverage the auto-scaling group in EC2 for
>>> example.
>>>
>>> We're using spark-standalone.
>>>
>>> Thanks a lot.
>>>
>>> --
>>> *JU Han*
>>>
>>> Data Engineer @ Botify.com
>>>
>>> +33 061960
>>>
>>
>>


Re: Worker re-spawn and dynamic node joining

2014-05-17 Thread Nicholas Chammas
Thanks for the info about adding/removing nodes dynamically. That's
valuable.

2014년 5월 16일 금요일, Akhil Das님이 작성한 메시지:

> Hi Han :)
>
> 1. Is there a way to automatically re-spawn spark workers? We've
> situations where executor OOM causes worker process to be DEAD and it does
> not came back automatically.
>
> => Yes. You can either add OOM killer 
> exception on
> all of your Spark processes. Or you can have a cronjob which will keep
> monitoring your worker processes and if they goes down the cronjob will
> bring it back.
>
>   2. How to dynamically add (or remove) some worker machines to (from) the
> cluster? We'd like to leverage the auto-scaling group in EC2 for example.
>
> => You can add/remove worker nodes on the fly by spawning a new machine
> and then adding that machine's ip address in the master node then rsyncing
> the spark directory with all worker machines including the one you added.
> Then simply you can use the *start-all.sh* script inside the master node
> to bring up the new worker in action. For removing a worker machine from
> master can be done in the same way, you have to remove the workers IP
> address from the masters *slaves *file and then you can restart your
> slaves and that will get your worker removed.
>
>
> FYI, we have a deployment tool (a web-based UI) that we use for internal
> purposes, it is build on top of the spark-ec2 script (with some changes)
> and it has a module for adding/removing worker nodes on the fly. It looks
> like the attached screenshot. If you want i can give you some access.
>
> Thanks
> Best Regards
>
>
> On Wed, May 14, 2014 at 9:52 PM, Han JU 
> 
> > wrote:
>
>> Hi all,
>>
>> Just 2 questions:
>>
>>   1. Is there a way to automatically re-spawn spark workers? We've
>> situations where executor OOM causes worker process to be DEAD and it does
>> not came back automatically.
>>
>>   2. How to dynamically add (or remove) some worker machines to (from)
>> the cluster? We'd like to leverage the auto-scaling group in EC2 for
>> example.
>>
>> We're using spark-standalone.
>>
>> Thanks a lot.
>>
>> --
>> *JU Han*
>>
>> Data Engineer @ Botify.com
>>
>> +33 061960
>>
>
>


Re: Using mongo with PySpark

2014-05-17 Thread Nicholas Chammas
Where's your driver code (the code interacting with the RDDs)? Are you
getting serialization errors?

2014년 5월 17일 토요일, Samarth Mailinglist님이 작성한
메시지:

> Hi all,
>
> I am trying to store the results of a reduce into mongo.
> I want to share the variable "collection" in the mappers.
>
>
> Here's what I have so far (I'm using pymongo)
>
> db = MongoClient()['spark_test_db']
> collec = db['programs']
> db = MongoClient()['spark_test_db']
> *collec = db['programs']*
>
> def mapper(val):
> asc = val.encode('ascii','ignore')
> json = convertToJSON(asc, indexMap)
> collec.insert(json) # *this is not working*
>
> def convertToJSON(string, indexMap):
> values = string.strip().split(",")
> json = {}
> for i in range(len(values)):
> json[indexMap[i]] = values[i]
> return json
>
> How do I do this?
>


Re: Benchmarking Spark with YCSB

2014-05-17 Thread bhusted
Thanks Jay.  I honestly think I just had a senior moment or something.  I was
getting HiBench and YCSB confused.  Has anyone attempted to port HiBench to
using Spark?  HiBench performs a lot of map/reduce and it would be a very
interesting comparison for us.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Spark-with-YCSB-tp5813p5953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Apache Spark Throws java.lang.IllegalStateException: unread block data

2014-05-17 Thread sam
What we are doing is:

1. Installing Spark 0.9.1 according to the documentation on the website,
along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs.
2. Building a fat jar with a Spark app with sbt then trying to run it on the
cluster

I've also included code snippets, and sbt deps at the bottom.

When I've Googled this, there seems to be two somewhat vague responses:
a) Mismatching spark versions on nodes/user code
b) Need to add more jars to the SparkConf

Now I know that (b) is not the problem having successfully run the same code
on other clusters while only including one jar (it's a fat jar).

But I have no idea how to check for (a) - it appears Spark doesn't have any
version checks or anything - it would be nice if it checked versions and
threw a "mismatching version exception: you have user code using version X
and node Y has version Z".

I would be very grateful for advice on this.  I've submitted a bug report,
because there has to be something wrong with the Spark documentation because
I've seen two independent sysadms get the exact same problem with different
versions of CDH on different clusters.
https://issues.apache.org/jira/browse/SPARK-1867

The exception:

Exception in thread "main" org.apache.spark.SparkException: Job aborted:
Task 0.0:1 failed 32 times (most recent failure: Exception failure:
java.lang.IllegalStateException: unread block data)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to
java.lang.IllegalStateException: unread block data [duplicate 59]

My code snippet:

val conf = new SparkConf()
   .setMaster(clusterMaster)
   .setAppName(appName)
   .setSparkHome(sparkHome)
   .setJars(SparkContext.jarOfClass(this.getClass))

println("count = " + new
SparkContext(conf).textFile(someHdfsPath).count())

My SBT dependencies:

// relevant
"org.apache.spark" % "spark-core_2.10" % "0.9.1",
"org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",

// standard, probably unrelated
"com.github.seratch" %% "awscala" % "[0.2,)",
"org.scalacheck" %% "scalacheck" % "1.10.1" % "test",
"org.specs2" %% "specs2" % "1.14" % "test",
"org.scala-lang" % "scala-reflect" % "2.10.3",
"org.scalaz" %% "scalaz-core" % "7.0.5",
"net.minidev" % "json-smart" % "1.2"



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Throws-java-lang-IllegalStateException-unread-block-data-tp5952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Using mongo with PySpark

2014-05-17 Thread Samarth Mailinglist
Hi all,

I am trying to store the results of a reduce into mongo.
I want to share the variable "collection" in the mappers.


Here's what I have so far (I'm using pymongo)

db = MongoClient()['spark_test_db']
collec = db['programs']
db = MongoClient()['spark_test_db']
*collec = db['programs']*

def mapper(val):
asc = val.encode('ascii','ignore')
json = convertToJSON(asc, indexMap)
collec.insert(json) # *this is not working*

def convertToJSON(string, indexMap):
values = string.strip().split(",")
json = {}
for i in range(len(values)):
json[indexMap[i]] = values[i]
return json

How do I do this?


breeze DGEMM slow in spark

2014-05-17 Thread wxhsdp
Dear, all
  i'am testing double precision matrix multiplication in spark on ec2
m1.large machines.
  i use breeze linalg library, and internally it calls native
library(openblas nehalem single threaded)

m1.large:
model name  : Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
cpu MHz : 1795.672
model name  : Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
cpu MHz : 1795.672

os:
Linux ip-172-31-24-33 3.4.37-40.44.amzn1.x86_64 #1 SMP Thu Mar 21 01:17:08
UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

  here's my test code:
  def main(args: Array[String]) {

val n = args(0).toInt
val loop = args(1).toInt

val ranGen = new Random

var arr = ofDim[Double](loop,n*n)

for(i <- 0 until loop)
  for(j <- 0 until n*n) {
arr(i)(j) = ranGen.nextDouble()
  }

var time0 = System.currentTimeMillis()
println("init time = "+time0)

var c = new DenseMatrix[Double](n,n)

var time1 = System.currentTimeMillis()
println("start time = "+time1)

for(i <- 0 until loop) {
  var a = new DenseMatrix[Double](n,n,arr(i))
  var b = new DenseMatrix[Double](n,n,arr(i))

  c :+= (a * b)
}

var time2 = System.currentTimeMillis()
println("stop time = "+time2)
println("init time = "+(time1-time0))
println("used time = "+(time2-time1))
  }

  two n=3584 matrix mult uses about 14s using the above test code. but when
i put matrix
  mult part in spark mapPartitions function:

  val b = a.mapPartitions{ itr =>
val arr = itr.toArray

//timestamp here
var a = new DenseMatrix[Double](n,n,arr)
var b = new DenseMatrix[Double](n,n,arr)

c = a*b

   //timestamp here
c.toIterator
  }

  two n=3584 matrix mult uses about 50s!
  there's a shuffle operation before matrix mult in spark, during shuffle
phase the aggregated data are
  put in memory on the reduce side, there is no spill to disk. so the above
2 cases are all in memory 
  matrix mult, and they all have enough memory, GC time is really small

  so why case 2 is 3.5x slower than case 1? has any one met this before, and
what's your performance
  of DGEMM in spark? thanks for advices
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.