Re: Replicating RDD elements

2014-03-28 Thread David Thomas
That helps! Thank you.


On Fri, Mar 28, 2014 at 12:36 AM, Sonal Goyal  wrote:

> Hi David,
>
> I am sorry but your question is not clear to me. Are you talking about
> taking some value and sharing it across your cluster so that it is present
> on all the nodes? You can look at Spark's broadcasting in that case. On the
> other hand, if you want to take one item and create an RDD of 100 or some
> other number of items, you could do a flatMap. Does that help?
>
> Best Regards,
> Sonal
> Nube Technologies 
>
>  
>
>
>
>
> On Fri, Mar 28, 2014 at 9:24 AM, David Thomas  wrote:
>
>> How can we replicate RDD elements? Say I have 1 element and 100 nodes in
>> the cluster. I need to replicate this one item on all the nodes i.e.
>> effectively create an RDD of 100 elements.
>>
>
>


Re: Announcing Spark SQL

2014-03-28 Thread Rohit Rai
Thanks Patrick,

I was thinking about that... Upon analysis I realized (on date) it would be
something similar to the way Hive Context using CustomCatalog stuff.
I will review it again, on the lines of implementing SchemaRDD with
Cassandra. Thanks for the pointer.

Upon discussion with couple of our clients, it seems the reason they would
prefer using hive is that they have already invested a lot in it. Mostly in
UDFs and HiveQL.
1. Are there any plans to develop the SQL Parser to handdle more complex
queries like HiveQL? Can we just plugin a custom parser instead of bringing
in the whole hive deps?
2. Is there any way we can support UDFs in Catalyst without using Hive? It
will bee fine if we don't support Hive UDFs as is and need minor porting
effort.


Regards,
Rohit


*Founder & CEO, **Tuplejump, Inc.*

www.tuplejump.com
*The Data Engineering Platform*


On Fri, Mar 28, 2014 at 12:48 AM, Patrick Wendell wrote:

> Hey Rohit,
>
> I think external tables based on Cassandra or other datastores will work
> out-of-the box if you build Catalyst with Hive support.
>
> Michael may have feelings about this but I'd guess the longer term design
> for having schema support for Cassandra/HBase etc likely wouldn't rely on
> hive external tables because it's an unnecessary layer of indirection.
>
> Spark should be able to directly load an SchemaRDD from Cassandra by just
> letting the user give relevant information about the Cassandra schema. And
> it should let you write-back to Cassandra by giving a mapping of fields to
> the respective cassandra columns. I think all of this would be fairly easy
> to implement on SchemaRDD and likely will make it into Spark 1.1
>
> - Patrick
>
>
> On Wed, Mar 26, 2014 at 10:59 PM, Rohit Rai  wrote:
>
>> Great work guys! Have been looking forward to this . . .
>>
>> In the blog it mentions support for reading from Hbase/Avro... What will
>> be the recommended approach for this? Will it be writing custom wrappers
>> for SQLContext like in HiveContext or using Hive's "EXTERNAL TABLE" support?
>>
>> I ask this because a few days back (based on your pull request in github)
>> I started analyzing what it would take to support Spark SQL on Cassandra.
>> One obvious approach will be to use Hive External Table support with our
>> cassandra-hive handler. But second approach sounds tempting as it will give
>> more fidelity.
>>
>> Regards,
>> Rohit
>>
>> *Founder & CEO, **Tuplejump, Inc.*
>> 
>> www.tuplejump.com
>> *The Data Engineering Platform*
>>
>>
>> On Thu, Mar 27, 2014 at 9:12 AM, Michael Armbrust > > wrote:
>>
>>> Any plans to make the SQL typesafe using something like Slick (
 http://slick.typesafe.com/)

>>>
>>> I would really like to do something like that, and maybe we will in a
>>> couple of months. However, in the near term, I think the top priorities are
>>> going to be performance and stability.
>>>
>>> Michael
>>>
>>
>>
>


Re: Mutable tagging RDD rows ?

2014-03-28 Thread Christopher Nguyen
Sung Hwan, yes, I'm saying exactly what you interpreted, including that if
you tried it, it would (mostly) work, and my uncertainty with respect to
guarantees on the semantics. Definitely there would be no fault tolerance
if the mutations depend on state that is not captured in the RDD lineage.

DDF is to RDD is like RDD is to HDFS. Not a perfect analogy, but the point
is that it's an abstraction above with all attendant implications, plusses
and minusses. With DDFs you get to think of everything as tables with
schemas, while the underlying complexity of mutability and data
representation is hidden away. You also get rich idioms to operate on those
tables like filtering, projection, subsetting, handling of missing data
(NA's), dummy-column generation, data mining statistics and machine
learning, etc. In some aspects it replaces a lot of boiler plate analytics
that you don't want to re-invent over and over again, e.g., FiveNum or
XTabs. So instead of 100 lines of code, it's 4. In other aspects it allows
you to easily apply "arbitrary" machine learning algorithms without having
to think too hard about getting the data types just right. Etc.

But you would also find yourself wanting access to the underlying RDDs for
their full semantics & flexibility.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Mar 28, 2014 at 8:46 PM, Sung Hwan Chung
wrote:

> Thanks Chris,
>
> I'm not exactly sure what you mean with MutablePair, but are you saying
> that we could create RDD[MutablePair] and modify individual rows?
>
> If so, will that play nicely with RDD's lineage and fault tolerance?
>
> As for the alternatives, I don't think 1 is something we want to do, since
> that would require another complex system we'll have to implement. Is DDF
> going to be an alternative to RDD?
>
> Thanks again!
>
>
>
> On Fri, Mar 28, 2014 at 7:02 PM, Christopher Nguyen wrote:
>
>> Sung Hwan, strictly speaking, RDDs are immutable, so the canonical way to
>> get what you want is to transform to another RDD. But you might look at
>> MutablePair (
>> https://github.com/apache/spark/blob/60abc252545ec7a5d59957a32e764cd18f6c16b4/core/src/main/scala/org/apache/spark/util/MutablePair.scala)
>> to see if the semantics meet your needs.
>>
>> Alternatively you can consider:
>>
>>1. Build & provide a fast lookup service that stores and returns the
>>mutable information keyed by the RDD row IDs, or
>>2. Use DDF (Distributed DataFrame) which we'll make available in the
>>near future, which will give you fully mutable-row table semantics.
>>
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao 
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Mar 28, 2014 at 5:16 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Hey guys,
>>>
>>> I need to tag individual RDD lines with some values. This tag value
>>> would change at every iteration. Is this possible with RDD (I suppose this
>>> is sort of like mutable RDD, but it's more) ?
>>>
>>> If not, what would be the best way to do something like this? Basically,
>>> we need to keep mutable information per data row (this would be something
>>> much smaller than actual data row, however).
>>>
>>> Thanks
>>>
>>
>>
>


Re: Mutable tagging RDD rows ?

2014-03-28 Thread Sung Hwan Chung
Thanks Chris,

I'm not exactly sure what you mean with MutablePair, but are you saying
that we could create RDD[MutablePair] and modify individual rows?

If so, will that play nicely with RDD's lineage and fault tolerance?

As for the alternatives, I don't think 1 is something we want to do, since
that would require another complex system we'll have to implement. Is DDF
going to be an alternative to RDD?

Thanks again!



On Fri, Mar 28, 2014 at 7:02 PM, Christopher Nguyen  wrote:

> Sung Hwan, strictly speaking, RDDs are immutable, so the canonical way to
> get what you want is to transform to another RDD. But you might look at
> MutablePair (
> https://github.com/apache/spark/blob/60abc252545ec7a5d59957a32e764cd18f6c16b4/core/src/main/scala/org/apache/spark/util/MutablePair.scala)
> to see if the semantics meet your needs.
>
> Alternatively you can consider:
>
>1. Build & provide a fast lookup service that stores and returns the
>mutable information keyed by the RDD row IDs, or
>2. Use DDF (Distributed DataFrame) which we'll make available in the
>near future, which will give you fully mutable-row table semantics.
>
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao 
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Mar 28, 2014 at 5:16 PM, Sung Hwan Chung  > wrote:
>
>> Hey guys,
>>
>> I need to tag individual RDD lines with some values. This tag value would
>> change at every iteration. Is this possible with RDD (I suppose this is
>> sort of like mutable RDD, but it's more) ?
>>
>> If not, what would be the best way to do something like this? Basically,
>> we need to keep mutable information per data row (this would be something
>> much smaller than actual data row, however).
>>
>> Thanks
>>
>
>


Re: function state lost when next RDD is processed

2014-03-28 Thread Mayur Rustagi
Are you referring to Spark Streaming?

Can you save the sum as a RDD & keep joining the two rdd together?

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 28, 2014 at 10:47 AM, Adrian Mocanu
wrote:

>  Thanks!
>
>
>
> Ya that’s what I’m doing so far, but I wanted to see if it’s possible to
> keep the tuples inside Spark for fault tolerance purposes.
>
>
>
> -A
>
> *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
> *Sent:* March-28-14 10:45 AM
> *To:* user@spark.apache.org
> *Subject:* Re: function state lost when next RDD is processed
>
>
>
> As long as the amount of state being passed is relatively small, it's
> probably easiest to send it back to the driver and to introduce it into RDD
> transformations as the zero value of a fold.
>
>
>
> On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu 
> wrote:
>
>  I’d like to resurrect this thread since I don’t have an answer yet.
>
>
>
> *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com]
> *Sent:* March-27-14 10:04 AM
> *To:* u...@spark.incubator.apache.org
> *Subject:* function state lost when next RDD is processed
>
>
>
> Is there a way to pass a custom function to spark to run it on the entire
> stream? For example, say I have a function which sums up values in each RDD
> and then across RDDs.
>
>
>
> I’ve tried with map, transform, reduce. They all apply my sum function on
> 1 RDD. When the next RDD comes the function starts from 0 so the sum of the
> previous RDD is lost.
>
>
>
> Does Spark support a way of passing a custom function so that its state is
> preserved across RDDs and not only within RDD?
>
>
>
> Thanks
>
> -Adrian
>
>
>
>
>


Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-28 Thread Sonal Goyal
What does your saveRDD contain? If you are using custom objects, they
should be serializable.

Best Regards,
Sonal
Nube Technologies 






On Sat, Mar 29, 2014 at 12:02 AM, pradeeps8 wrote:

> Hi Aureliano,
>
> I followed this thread to create a custom saveAsObjectFile.
> The following is the code.
> /new org.apache.spark.rdd.SequenceFileRDDFunctions[NullWritable,
> BytesWritable](saveRDD.mapPartitions(iter =>
> iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new
> BytesWritable(serialize(x).saveAsSequenceFile("objFiles") /
>
> But, I get the following error when executed.
> /
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> /
>
> Any idea about this error?
> or
> Is there anything wrong in the line of code?
>
> Thanks,
> Pradeep
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3442.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Strange behavior of RDD.cartesian

2014-03-28 Thread Matei Zaharia
Weird, how exactly are you pulling out the sample? Do you have a small program 
that reproduces this?

Matei

On Mar 28, 2014, at 3:09 AM, Jaonary Rabarisoa  wrote:

> I forgot to mention that I don't really use all of my data. Instead I use a 
> sample extracted with randomSample. 
> 
> 
> On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa  wrote:
> Hi all,
> 
> I notice that RDD.cartesian has a strange behavior with cached and uncached 
> data. More precisely, I have a set of data that I load with objectFile
> 
> val data: RDD[(Int,String,Array[Double])] = sc.objectFile("data")
> 
> Then I split it in two set depending on some criteria
> 
> 
> val part1 = data.filter(_._2 matches "view1")
> val part2 = data.filter(_._2 matches "view2")
> 
> 
> Finally, I compute the cartesian product of part1 and part2
> 
> val pair = part1.cartesian(part2)
> 
> 
> If every thing goes well I should have 
> 
> pair.count == part1.count * part2.count
> 
> But this is not the case if I don't cache part1 and part2.
> 
> What I was missing ? Does caching data mandatory in Spark ?
> 
> Cheers,
> 
> Jaonary
> 
> 
> 
> 



Re: Mutable tagging RDD rows ?

2014-03-28 Thread Christopher Nguyen
Sung Hwan, strictly speaking, RDDs are immutable, so the canonical way to
get what you want is to transform to another RDD. But you might look at
MutablePair (
https://github.com/apache/spark/blob/60abc252545ec7a5d59957a32e764cd18f6c16b4/core/src/main/scala/org/apache/spark/util/MutablePair.scala)
to see if the semantics meet your needs.

Alternatively you can consider:

   1. Build & provide a fast lookup service that stores and returns the
   mutable information keyed by the RDD row IDs, or
   2. Use DDF (Distributed DataFrame) which we'll make available in the
   near future, which will give you fully mutable-row table semantics.


--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Mar 28, 2014 at 5:16 PM, Sung Hwan Chung
wrote:

> Hey guys,
>
> I need to tag individual RDD lines with some values. This tag value would
> change at every iteration. Is this possible with RDD (I suppose this is
> sort of like mutable RDD, but it's more) ?
>
> If not, what would be the best way to do something like this? Basically,
> we need to keep mutable information per data row (this would be something
> much smaller than actual data row, however).
>
> Thanks
>


Mutable tagging RDD rows ?

2014-03-28 Thread Sung Hwan Chung
Hey guys,

I need to tag individual RDD lines with some values. This tag value would
change at every iteration. Is this possible with RDD (I suppose this is
sort of like mutable RDD, but it's more) ?

If not, what would be the best way to do something like this? Basically, we
need to keep mutable information per data row (this would be something much
smaller than actual data row, however).

Thanks


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread yh18190
Hi Andriana,

Ofcourse u can sortbykey but after that when u perform mapparttion it doesnt
guarantee that 1st partition has all those eleement in order as of original
sequence..I think we need a partitioner such that it partitions the sequence
maintaining order...

Could anyone help me in defining custom partitioner for this scenario..so
that first 12 elements goes to 1st partition and next 12 elements goes to
second parttion...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
Not sure how to change your code because you'd need to generate the keys where 
you get the data. Sorry about that.
I can tell you where to put the code to remap and sort though.

import org.apache.spark.rdd.OrderedRDDFunctions
val res2=reduced_hccg.map(_._2) 
.map( x=> (newkey,x)).sortByKey(true)  //and if you want remap them to remove 
the key that you used for sorting: .map(x=> x._2)

res2.foreach(println)
val result= res2.mapPartitions(p=>{
   val l=p.toList
   
   val approx=new ListBuffer[(Int)]
   val detail=new ListBuffer[Double]
   for(i<-0 until l.length-1 by 2)
   {
println(l(i),l(i+1))
approx+=(l(i),l(i+1))
   
 
   }
   approx.toList.iterator
   detail.toList.iterator
 })
result.foreach(println)

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 5:17 PM
To: u...@spark.incubator.apache.org
Subject: RE: Splitting RDD and Grouping together to perform computation

Hi Andriana,

Thanks for suggestion.Could you please modify my code part where I need to do 
so..I apologise for inconvinience ,becoz i am new to spark I coudnt apply 
appropriately..i would be thankful to you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread yh18190
Hi Andriana,

Thanks for suggestion.Could you please modify my code part where I need to
do so..I apologise for inconvinience ,becoz i am new to spark I coudnt apply
appropriately..i would be thankful to you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Syed A. Hashmi
>From the jist of it, it seems like you need to override the default
partitioner to control how your data is distributed among partitions. Take
a look at different Partitioners available (Default, Range, Hash) if none
of these get you desired result, you might want to provide your own.


On Fri, Mar 28, 2014 at 2:08 PM, Adrian Mocanu wrote:

> I say you need to remap so you have a key for each tuple that you can sort
> on.
> Then call rdd.sortByKey(true) like this mystream.transform(rdd =>
> rdd.sortByKey(true))
> For this fn to be available you need to import
> org.apache.spark.rdd.OrderedRDDFunctions
>
> -Original Message-
> From: yh18190 [mailto:yh18...@gmail.com]
> Sent: March-28-14 5:02 PM
> To: u...@spark.incubator.apache.org
> Subject: RE: Splitting RDD and Grouping together to perform computation
>
>
> Hi,
> Here is my code for given scenario.Could you please let me know where to
> sort?I mean on what basis we have to sort??so that they maintain order in
> partition as thatof original sequence..
>
> val res2=reduced_hccg.map(_._2)// which gives RDD of numbers
> res2.foreach(println)
> val result= res2.mapPartitions(p=>{
>val l=p.toList
>
>val approx=new ListBuffer[(Int)]
>val detail=new ListBuffer[Double]
>for(i<-0 until l.length-1 by 2)
>{
> println(l(i),l(i+1))
> approx+=(l(i),l(i+1))
>
>
>}
>approx.toList.iterator
>detail.toList.iterator
>  })
> result.foreach(println)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3450.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
I say you need to remap so you have a key for each tuple that you can sort on.
Then call rdd.sortByKey(true) like this mystream.transform(rdd => 
rdd.sortByKey(true))
For this fn to be available you need to import 
org.apache.spark.rdd.OrderedRDDFunctions

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 5:02 PM
To: u...@spark.incubator.apache.org
Subject: RE: Splitting RDD and Grouping together to perform computation


Hi,
Here is my code for given scenario.Could you please let me know where to sort?I 
mean on what basis we have to sort??so that they maintain order in partition as 
thatof original sequence..

val res2=reduced_hccg.map(_._2)// which gives RDD of numbers
res2.foreach(println)
val result= res2.mapPartitions(p=>{
   val l=p.toList
   
   val approx=new ListBuffer[(Int)]
   val detail=new ListBuffer[Double]
   for(i<-0 until l.length-1 by 2)
   {
println(l(i),l(i+1))
approx+=(l(i),l(i+1))
   
 
   }
   approx.toList.iterator
   detail.toList.iterator
 })
result.foreach(println)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread yh18190

Hi,
Here is my code for given scenario.Could you please let me know where to
sort?I mean on what basis we have to sort??so that they maintain order in
partition as thatof original sequence..

val res2=reduced_hccg.map(_._2)// which gives RDD of numbers
res2.foreach(println)
val result= res2.mapPartitions(p=>{
   val l=p.toList
   
   val approx=new ListBuffer[(Int)]
   val detail=new ListBuffer[Double]
   for(i<-0 until l.length-1 by 2)
   {
println(l(i),l(i+1))
approx+=(l(i),l(i+1))
   
 
   }
   approx.toList.iterator
   detail.toList.iterator
 })
result.foreach(println)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
I think you should sort each RDD

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 4:44 PM
To: u...@spark.incubator.apache.org
Subject: Re: Splitting RDD and Grouping together to perform computation

Hi,
Thanks Nanzhu.I tried to implement your suggestion on following scenario.I have 
RDD of say 24 elements.In that when i partioned into two groups of 12 elements 
each.Their is loss of order of elements in partition.Elemest are partitioned 
randomly.I need to preserve the order such that the first 12 elements should be 
1st partition and 2nd 12 elemts should be in 2nd partition.
Guys please help me how to main order of original sequence even after 
partioningAny solution
Before Partition:RDD
64
29186
16059
9143
6439
6155
9187
18416
25565
30420
33952
38302
43712
47092
48803
52687
56286
57471
63429
70715
75995
81878
80974
71288
48556
After Partition:In group1 with 12 elements 64, 29186,
18416
30420
33952
38302
43712
47092
56286
81878
80974
71288
48556



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-28 Thread Tathagata Das
The cleaner ttl was introduced as a "brute force" method to clean all old
data and metadata in the system, so that the system can run 24/7. The
cleaner ttl should be set to a large value, so that RDDs older than that
are not used. Though there are some cases where you may want to use an RDD
again and again for an infinite duration, in which case no value of TTL is
good enough. There are way around it - if you are going to use an RDD
beyond the tll value, then you will have to recreate the RDD after some
interval.

However, the ideal solution is to actually is to identify the stuff (RDD,
broadcasts, etc.) that are ready to be GCed and then clear their associated
data. This is currently being implemented as a part of this PR.
https://github.com/apache/spark/pull/126
This will make setting the cleaner TTL unnecessary and reduce errors
related to cleaning up of needed RDDS.






On Thu, Mar 27, 2014 at 3:47 PM, Evgeny Shishkin wrote:

>
> On 28 Mar 2014, at 01:44, Tathagata Das 
> wrote:
>
> The more I think about it the problem is not about /tmp, its more about
> the workers not having enough memory. Blocks of received data could be
> falling out of memory before it is getting processed.
> BTW, what is the storage level that you are using for your input stream?
> If you are using MEMORY_ONLY, then try MEMORY_AND_DISK. That is safer
> because it ensure that if received data falls out of memory it will be at
> least saved to disk.
>
> TD
>
>
> And i saw such errors because of cleaner.rtt.
> Thich erases everything. Even needed rdds.
>
>
>
>
> On Thu, Mar 27, 2014 at 2:29 PM, Scott Clasen wrote:
>
>> Heh sorry that wasnt a clear question, I know 'how' to set it but dont
>> know
>> what value to use in a mesos cluster, since the processes are running in
>> lxc
>> containers they wont be sharing a filesystem (or machine for that matter)
>>
>> I cant use an s3n:// url for local dir can I?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>


Re: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread yh18190
Hi,
Thanks Nanzhu.I tried to implement your suggestion on following scenario.I
have RDD of say 24 elements.In that when i partioned into two groups of 12
elements each.Their is loss of order of elements in partition.Elemest are
partitioned randomly.I need to preserve the order such that the first 12
elements should be 1st partition and 2nd 12 elemts should be in 2nd
partition.
Guys please help me how to main order of original sequence even after
partioningAny solution
Before Partition:RDD
64
29186
16059
9143
6439
6155
9187
18416
25565
30420
33952
38302
43712
47092
48803
52687
56286
57471
63429
70715
75995
81878
80974
71288
48556
After Partition:In group1 with 12 elements
64,
29186,
18416
30420
33952
38302
43712
47092
56286
81878
80974
71288
48556



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Do all classes involving RDD operation need to be registered?

2014-03-28 Thread anny9699
Thanks a lot Ognen!

It's not a fancy class that I wrote, and now I realized I neither extends
Serializable or register with Kyro and that's why it is not working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439p3446.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: function state lost when next RDD is processed

2014-03-28 Thread Adrian Mocanu
Thanks!

Ya that's what I'm doing so far, but I wanted to see if it's possible to keep 
the tuples inside Spark for fault tolerance purposes.

-A
From: Mark Hamstra [mailto:m...@clearstorydata.com]
Sent: March-28-14 10:45 AM
To: user@spark.apache.org
Subject: Re: function state lost when next RDD is processed

As long as the amount of state being passed is relatively small, it's probably 
easiest to send it back to the driver and to introduce it into RDD 
transformations as the zero value of a fold.

On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu 
mailto:amoc...@verticalscope.com>> wrote:
I'd like to resurrect this thread since I don't have an answer yet.

From: Adrian Mocanu 
[mailto:amoc...@verticalscope.com]
Sent: March-27-14 10:04 AM
To: u...@spark.incubator.apache.org
Subject: function state lost when next RDD is processed

Is there a way to pass a custom function to spark to run it on the entire 
stream? For example, say I have a function which sums up values in each RDD and 
then across RDDs.

I've tried with map, transform, reduce. They all apply my sum function on 1 
RDD. When the next RDD comes the function starts from 0 so the sum of the 
previous RDD is lost.

Does Spark support a way of passing a custom function so that its state is 
preserved across RDDs and not only within RDD?

Thanks
-Adrian




2 weeks until the deadline - Spark Summit call for submissions.

2014-03-28 Thread Scott walent
The second Spark Summit, an event to bring the Apache Spark community
together, will be in San Francisco on June 30, 2014. The call for
submissions is currently open, but will close on April 11.   The summit is
looking for talks that will cover topics including applications built on
Spark, deployment, scheduling, performance, and related projects.

At the Summit you can look forward to hearing from committers, developers,
CEOs, and companies who are solving real-world big data challenges with
Spark.

All submissions will be reviewed by a Program Committee that is made up of
the creators, top committers and individuals who have heavily contributed
to the Spark project. No speaker slots have been sold to sponsors in an
effort to to keep the Summit a community driven event.

To submit your abstracts please visit: spark-summit.org/submit

Looking forward to seeing you there!

Best,
Scott & The Spark Summit Organizers


Re: Do all classes involving RDD operation need to be registered?

2014-03-28 Thread Ognen Duzlevski
There is also this quote from the Tuning guide 
(http://spark.incubator.apache.org/docs/latest/tuning.html):
" Finally, if you don't register your classes, Kryo will still work, but 
it will have to store the full class name with each object, which is 
wasteful."


It implies that you don't really have to register your classes with 
Kryo. However, what kind of waste are we talking about? :)

Ognen

On 3/28/14, 12:10 PM, Debasish Das wrote:


Classes are serialized and sent to all the workers as akka msgs

singletons and case classes I am not sure if they are javaserialized 
or kryoserialized by default


But definitely your own classes if serialized by kryo will be much 
efficient.there is an comparison that Matei did for all the 
serialization options and kryo was fastest at that time


Hi,

I am sorry if this has been asked before. I found that if I wrapped up 
some
methods in a class with parameters, spark will throw "Task 
Nonserializable"

exception; however if wrapped up in an object or case class without
parameters, it will work fine. Is it true that all classes involving RDD
operation should be registered so that SparkContext could recognize them?

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-28 Thread pradeeps8
Hi Aureliano,

I followed this thread to create a custom saveAsObjectFile. 
The following is the code.
/new org.apache.spark.rdd.SequenceFileRDDFunctions[NullWritable,
BytesWritable](saveRDD.mapPartitions(iter =>
iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new
BytesWritable(serialize(x).saveAsSequenceFile("objFiles") /

But, I get the following error when executed.
/
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
 
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
 
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
/

Any idea about this error?
or 
Is there anything wrong in the line of code?

Thanks,
Pradeep




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3442.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Not getting it

2014-03-28 Thread lannyripple
Ok.  Based on Sonal's message I dived more into memory and partitioning and
got it to work.

For the CSV file I used 1024 partitions [textFile(path, 1024)] which cut
the partition size down to 8MB (based on standard HDFS 64MB splits).  For
the key file I also adjusted partitions to use about 8MB.  This was still
blowing up with GC Overlimit and Heap OOM with join.  I then set SPARK_MEM
(which is hard to tease out of the documentation) to 4g and the join
completed.

Going back to find SPARK_MEM I found this the best explanation --
https://groups.google.com/forum/#!searchin/spark-users/SPARK_MEM/spark-users/ou6cJMlBj_M/NlBHYDjG_NYJ

At a guess setting SPARK_MEM did more than changing the partitions.
 Something to play around.


On Fri, Mar 28, 2014 at 10:17 AM, Lanny Ripple wrote:

> I've played around with it.  The CSV file looks like it gives 130
> partitions.  I'm assuming that's the standard 64MB split size for HDFS
> files.  I have increased number of partitions and number of tasks for
> things like groupByKey and such.  Usually I start blowing up on GC
> Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
> shuffle = true,  into the mix thinking it would bring the keys into the
> same partition. E.g.,
>
> (fileA ++ fileB.map{case (k,v) => (k,
> Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
> shuffle = true).groupBy...
>
> (Which should effectively be imitating map-reduce) but I see GC Overlimit
> when I do that.
>
> I've got a stock install with num cores and worker memory set as mentioned
> but even something like this
>
> fileA.sortByKey().map{_ => 1}.reduce{_ + _}
>
> blows up with GC Overlimit (as did .count instead of the by-hand count).
>
> fileA.count
>
> works.  It seems to be able to load the file as an RDD but not manipulate
> it.
>
>
>
>
> On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List]
>  wrote:
>
>> Have you tried setting the partitioning ?
>>
>> Best Regards,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden 
>> email]
>> > wrote:
>>
>>> Hi all,
>>>
>>> I've got something which I think should be straightforward but it's not
>>> so
>>> I'm not getting it.
>>>
>>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have
>>> 16g of
>>> memory using 8 cores.
>>>
>>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>>> [key,a,b,c...]).
>>> I have another file of 25K lines containing some number of keys which
>>> might
>>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>>> something.  I'll get to that but this is toy problem that I'm using to
>>> get
>>> some intuition with spark.)
>>>
>>> Working on each file individually spark has no problem manipulating the
>>> files.  If I try and join or union+filter though I can't seem to find the
>>> join of the two files.  Code is along the lines of
>>>
>>> val fileA =
>>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>>
>>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>>
>>> (fileA ++ fileB.map{case (k,v) => (k,
>>> Array(v))}).groupBy{_._1}.filter{case
>>> (k, (_, xs)) => xs.exists{_.length == 1}
>>>
>>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>>> final .count to force the results.)
>>>
>>> I suspect I'm missing something fundamental about bringing the keyed data
>>> together into the same partitions so it can be efficiently joined but
>>> I've
>>> given up for now.  If anyone can shed some light (Beyond, "No really.
>>>  Use
>>> shark.") on what I'm not understanding it would be most helpful.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>>  To unsubscribe from Not getting it, click 
>> here
>> .
>> NAML
>>
>
>




--
View this message in context: 
http://apache-spark-user-

RE: function state lost when next RDD is processed

2014-03-28 Thread Adrian Mocanu
I'd like to resurrect this thread since I don't have an answer yet.

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March-27-14 10:04 AM
To: u...@spark.incubator.apache.org
Subject: function state lost when next RDD is processed

Is there a way to pass a custom function to spark to run it on the entire 
stream? For example, say I have a function which sums up values in each RDD and 
then across RDDs.

I've tried with map, transform, reduce. They all apply my sum function on 1 
RDD. When the next RDD comes the function starts from 0 so the sum of the 
previous RDD is lost.

Does Spark support a way of passing a custom function so that its state is 
preserved across RDDs and not only within RDD?

Thanks
-Adrian



Re: KafkaInputDStream mapping of partitions to tasks

2014-03-28 Thread Evgeniy Shishkin
One more question,

we are using memory_and_disk_ser_2 
and i worried when those rdds on disk will be removed
http://i.imgur.com/dbq5T6i.png

unpersist is set to true, and rdds get purged from memory, but disk space just 
keep growing. 

On 28 Mar 2014, at 01:32, Tathagata Das  wrote:

> Yes, no one has reported this issue before. I just opened a JIRA on what I 
> think is the main problem here
> https://spark-project.atlassian.net/browse/SPARK-1340
> Some of the receivers dont get restarted. 
> I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR 
> that should fix this. 
> 
> Regarding the second problem, I have been thinking of adding flow control 
> (i.e. limiting the rate of receiving) for a while, just havent gotten around 
> to it. 
> I added another JIRA for that for tracking this issue.
> https://spark-project.atlassian.net/browse/SPARK-1341
> 
> 
> TD
> 
> 
> On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin  wrote:
> 
> On 28 Mar 2014, at 01:11, Scott Clasen  wrote:
> 
> > Evgeniy Shishkin wrote
> >> So, at the bottom — kafka input stream just does not work.
> >
> >
> > That was the conclusion I was coming to as well.  Are there open tickets
> > around fixing this up?
> >
> 
> I am not aware of such. Actually nobody complained on spark+kafka before.
> So i thought it just works, and then we tried to build something on it and 
> almost failed.
> 
> I think that it is possible to steal/replicate how twitter storm works with 
> kafka.
> They do manual partition assignment, at least this would help to balance load.
> 
> There is another issue.
> ssc batch creates new rdds every batch duration, always, even it previous 
> computation did not finish.
> 
> But with kafka, we can consume more rdds later, after we finish previous rdds.
> That way it would be much much simpler to not get OOM’ed when starting from 
> beginning,
> because we can consume many data from kafka during batch duration and then 
> get oom.
> 
> But we just can not start slow, can not limit how many to consume during 
> batch.
> 
> 
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 



Re: function state lost when next RDD is processed

2014-03-28 Thread Mark Hamstra
As long as the amount of state being passed is relatively small, it's
probably easiest to send it back to the driver and to introduce it into RDD
transformations as the zero value of a fold.


On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu wrote:

>  I'd like to resurrect this thread since I don't have an answer yet.
>
>
>
> *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com]
> *Sent:* March-27-14 10:04 AM
> *To:* u...@spark.incubator.apache.org
> *Subject:* function state lost when next RDD is processed
>
>
>
> Is there a way to pass a custom function to spark to run it on the entire
> stream? For example, say I have a function which sums up values in each RDD
> and then across RDDs.
>
>
>
> I've tried with map, transform, reduce. They all apply my sum function on
> 1 RDD. When the next RDD comes the function starts from 0 so the sum of the
> previous RDD is lost.
>
>
>
> Does Spark support a way of passing a custom function so that its state is
> preserved across RDDs and not only within RDD?
>
>
>
> Thanks
>
> -Adrian
>
>
>


Re: Run spark on mesos remotely

2014-03-28 Thread Wush Wu
Dear all,

After studying the source code and my environment, I guess the problem is
that the hostPort is wrong. On my machine, the hostname will be exported
into `blockManager.hostPort` such as wush-home:45678, but the slaves could
not resolve the hostname to ip correctly. I am trying to solve the problem
of hostname resolving.

On the other hand, is there a way to set the system property
`spark.hostPort` so that I could export `192.168.xx.xx:45678` to
`spark.hostPort`?

Thanks.


2014-03-28 9:32 GMT+08:00 Wush Wu :

> Dear Rustagi,
>
> Thanks for you response.
>
> As far as I know, the DAG scheduler should be a part of spark. Therefore,
> should I do something not mentioned in
> http://spark.incubator.apache.org/docs/0.8.1/running-on-mesos.html to
> launch the DAG scheduler?
>
> By the way, I also notice that the user of the submission becomes the
> account on my machine. I doubt that there is a permission issue, so the
> executor cannot copy the file from spark.execute.uri. After changing the
> account to the same username on the mesos cluster, the executor
> successfully copy the file from  spark.execute.uri, but it reports the
> following error message:
>
> log4j:WARN No appenders could be found for logger
> (org.apache.spark.executor.MesosExecutorBackend).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> org.apache.spark.SparkException: Error sending message to
> BlockManagerMaster [message =
> RegisterBlockManager(BlockManagerId(201403250945-3657629962-5050-10180-16,
> pc104, 42356, 0),339585269,Actor[akka://spark/user/BlockManagerActor1])]
>  at
> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:174)
> at
> org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:139)
>  at
> org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:57)
> at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:127)
>  at org.apache.spark.storage.BlockManager.(BlockManager.scala:105)
> at org.apache.spark.storage.BlockManager.(BlockManager.scala:119)
>  at
> org.apache.spark.SparkEnv$.createFromSystemProperties(SparkEnv.scala:171)
> at org.apache.spark.executor.Executor.(Executor.scala:111)
>  at
> org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:58)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [1] milliseconds
>  at akka.dispatch.DefaultPromise.ready(Future.scala:870)
> at akka.dispatch.DefaultPromise.result(Future.scala:874)
>  at akka.dispatch.Await$.result(Future.scala:74)
> at
> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:160)
>  ... 8 more
> Exception in thread "Thread-0"
>
> Is there any suggestion to handle the error above?
>
> Thanks,
> Wush
>
>
> 2014-03-28 4:09 GMT+08:00 Mayur Rustagi :
>
> Yes but you have to maintain connection of that machine to the master
>> cluster as the driver with DAG scheduler runs on that machine.
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Thu, Mar 27, 2014 at 4:09 AM, Wush Wu  wrote:
>>
>>> Dear all,
>>>
>>> We have a spark 0.8.1 cluster on mesos 0.15. It works if I submit the
>>> job from the master of mesos. That is to say, I spawn the spark shell or
>>> launch the scala application on the master of mesos.
>>>
>>> However, when I submit the job from another machine, the job will lost.
>>> The logs shows that the mesos does not copy the
>>> spark-0.8.1-incubating.tar.gz to the temporal working directory, so the job
>>> lost immediately. Is it possible to submit the job from the machine not
>>> belong to mesos cluster?
>>>
>>> Thanks!
>>>
>>> Wush
>>>
>>
>>
>


Re: Do all classes involving RDD operation need to be registered?

2014-03-28 Thread Debasish Das
Classes are serialized and sent to all the workers as akka msgs

singletons and case classes I am not sure if they are javaserialized or
kryoserialized by default

But definitely your own classes if serialized by kryo will be much
efficient.there is an comparison that Matei did for all the
serialization options and kryo was fastest at that time
Hi,

I am sorry if this has been asked before. I found that if I wrapped up some
methods in a class with parameters, spark will throw "Task Nonserializable"
exception; however if wrapped up in an object or case class without
parameters, it will work fine. Is it true that all classes involving RDD
operation should be registered so that SparkContext could recognize them?

Thanks a lot!



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Do all classes involving RDD operation need to be registered?

2014-03-28 Thread anny9699
Hi,

I am sorry if this has been asked before. I found that if I wrapped up some
methods in a class with parameters, spark will throw "Task Nonserializable"
exception; however if wrapped up in an object or case class without
parameters, it will work fine. Is it true that all classes involving RDD
operation should be registered so that SparkContext could recognize them?

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-all-classes-involving-RDD-operation-need-to-be-registered-tp3439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Not getting it

2014-03-28 Thread lannyripple
I've played around with it.  The CSV file looks like it gives 130
partitions.  I'm assuming that's the standard 64MB split size for HDFS
files.  I have increased number of partitions and number of tasks for
things like groupByKey and such.  Usually I start blowing up on GC
Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
shuffle = true,  into the mix thinking it would bring the keys into the
same partition. E.g.,

(fileA ++ fileB.map{case (k,v) => (k,
Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
shuffle = true).groupBy...

(Which should effectively be imitating map-reduce) but I see GC Overlimit
when I do that.

I've got a stock install with num cores and worker memory set as mentioned
but even something like this

fileA.sortByKey().map{_ => 1}.reduce{_ + _}

blows up with GC Overlimit (as did .count instead of the by-hand count).

fileA.count

works.  It seems to be able to load the file as an RDD but not manipulate
it.




On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] <
ml-node+s1001560n3417...@n3.nabble.com> wrote:

> Have you tried setting the partitioning ?
>
> Best Regards,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden 
> email]
> > wrote:
>
>> Hi all,
>>
>> I've got something which I think should be straightforward but it's not so
>> I'm not getting it.
>>
>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
>> of
>> memory using 8 cores.
>>
>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>> [key,a,b,c...]).
>> I have another file of 25K lines containing some number of keys which
>> might
>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>> something.  I'll get to that but this is toy problem that I'm using to get
>> some intuition with spark.)
>>
>> Working on each file individually spark has no problem manipulating the
>> files.  If I try and join or union+filter though I can't seem to find the
>> join of the two files.  Code is along the lines of
>>
>> val fileA =
>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>
>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>
>> (fileA ++ fileB.map{case (k,v) => (k,
>> Array(v))}).groupBy{_._1}.filter{case
>> (k, (_, xs)) => xs.exists{_.length == 1}
>>
>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>> final .count to force the results.)
>>
>> I suspect I'm missing something fundamental about bringing the keyed data
>> together into the same partitions so it can be efficiently joined but I've
>> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
>> shark.") on what I'm not understanding it would be most helpful.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>  To unsubscribe from Not getting it, click 
> here
> .
> NAML
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3437.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

streaming: code to simulate a network socket data source

2014-03-28 Thread Diana Carroll
If you are learning about Spark Streaming, as I am, you've probably use
netcat "nc" as mentioned in the spark streaming programming guide.  I
wanted something a little more useful, so I modified the
ClickStreamGenerator code to make a very simple script that simply reads a
file off disk and passes it to a socket, character by character.  You
specify the port, filename, and bytesPerSecond that you want it to send.

Thought someone else might find this helpful, so here it is.

import java.net.ServerSocket
import java.io.PrintWriter
import scala.io.Source

object StreamingDataGenerator {

  def main(args : Array[String]) {
if (args.length != 3) {
  System.err.println("Usage: StreamingDataGenerator  
")
  System.exit(1)
}
val port = args(0).toInt
val file = Source.fromFile(args(1))
val bytesPerSecond = args(2).toFloat

val sleepDelayMs = (1000.0 / bytesPerSecond).toInt
val listener = new ServerSocket(port)

println("Reading from file: " + file.descr)

while (true) {
  println("Listening on port: " + port)
  val socket = listener.accept()
  new Thread() {
override def run = {
  println("Got client connect from: " + socket.getInetAddress)
  val out = new PrintWriter(socket.getOutputStream(), true)

  file.foreach(c =>
{
  Thread.sleep(sleepDelayMs)
  // write the byte to the socket
  out.write(c)
  out.flush()
  // also print the byte to stdout, for debugging ease
  print(c)
}
  )
  socket.close()
}
  }.start()
}
  }
}


Re: Exception on simple pyspark script

2014-03-28 Thread idanzalz
I sorted it out.
Turns out that if the client uses Python 2.7 and the server is Python 2.6,
you get some weird errors, like this and others. 
So you would probably want not to do that...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-on-simple-pyspark-script-tp3415p3429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark streaming and the spark shell

2014-03-28 Thread Diana Carroll
Thanks, Tagatha.  This and your other reply on awaitTermination are very
helpful.

Diana


On Thu, Mar 27, 2014 at 4:40 PM, Tathagata Das
wrote:

> Very good questions! Responses inline.
>
> TD
>
> On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll 
> wrote:
> > I'm working with spark streaming using spark-shell, and hoping folks
> could
> > answer a few questions I have.
> >
> > I'm doing WordCount on a socket stream:
> >
> > import org.apache.spark.streaming.StreamingContext
> > import org.apache.spark.streaming.StreamingContext._
> > import org.apache.spark.streaming.Seconds
> > var ssc = new StreamingContext(sc,Seconds(5))
> > var mystream = ssc.socketTextStream("localhost",)
> > var words = mystream.flatMap(line => line.split(" "))
> > var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> > wordCounts.print()
> > ssc.start()
> >
> >
> >
> > 1.  I'm assuming that using spark shell is an edge case, and that spark
> > streaming is really intended mostly for batch use.  True?
> >
>
> Yes. Currently the spark-shell is not the intended execution mode for
> Spark Streaming, even though it can be done for quick testing.
>
> > 2.   I notice that once I start ssc.start(), my stream starts processing
> and
> > continues indefinitely...even if I close the socket on the server end
> (I'm
> > using unix command "nc" to mimic a server as explained in the streaming
> > programming guide .)  Can I tell my stream to detect if it's lost a
> > connection and therefore stop executing?  (Or even better, to attempt to
> > re-establish the connection?)
> >
>
>
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.
>
> > 3.  I tried entering ssc.stop which resulted in an error:
> >
> > Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> > cancelled because SparkContext was shut down
> > 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> > SendingConnectionManagerId not found
> >
> > But it did stop the DStream execution.
> >
>
>
> Ah, that happens sometimes. The existing behavior of ssc.stop() is
> that it will stop everything immediately.
> I just opened a pull request for a more graceful shutting down of the
> Spark streaming program.
> https://github.com/apache/spark/pull/247
>
> > 4.  Then I tried restarting the ssc again (ssc.start) and got another
> error:
> > org.apache.spark.SparkException: JobScheduler already started
> > Is restarting an ssc supported?
> >
>
>
> Restarting is ideally not supported. However, the behavior was not
> explicitly checked. The above pull requests
> makes the behavior more explicitly by throwing the right warnings and
> exceptions.
>
> > 5.  When I perform an operation like wordCounts.print(), that operation
> will
> > execution on each batch, ever n seconds.  Is there a way I can undo that
> > operation?  That is, I want it to *stop* executing that print ever n
> > seconds...without having to stop the stream.
> >
> > What I'm really asking is...can I explore DStreams interactively the way
> I
> > can explore my data in regular Spark.  In regular Spark, I might perform
> > various operations on an RDD to see what happens.  So at first, I might
> have
> > used "split(" ") to tokenize my input text, but now I want to try using
> > split(",") instead, after the stream has already started running.  Can I
> do
> > that?
> >
> > I did find out that if add a new operation to an existing dstream (say,
> > words.print()) after the ssc.start it works. It *will* add the second
> > print() call to the execution list every n seconds.
> >
> > but if I try to add new dstreams, e.g.
> > ...
> >
> > ssc.start()
> >
> > var testpairs = words.map(x => (x, "TEST"))
> > testpairs.print()
> >
> >
> > I get an error:
> >
> > 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> > 139593227 ms
> > java.lang.Exception:
> > org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> > initialized
> >
> >
> > Is this sort of interactive use just not supported?
>
>
> Modifying the DStream operations after the context has started is not
> officially supported. However dynamically changing the computation can
> be done using DStream.transform() or DStream.foreachRDD()
> Both these operations allow you to do arbitrary RDD operations on each
> RDD. So you can dynamically modify what RDD operations are used within
> the DStream transform / foreachRDD (so you are not changing the
> DStream operations, only whats inside the DStream operation). But to
> use this really interactively, you have to write a bit of additional
> code that allows the user to interactively specify the function
> applied on each RDD.
>
>
>
> >
> > Thanks!
> >
> > Diana
>


Re: Running a task once on each executor

2014-03-28 Thread dmpour23
Is it possible to do this:\

JavaRDD parttionedRdds = input.map(new
Split()).sortByKey().partitionBy(new HashPartitioner(k)).values();
parttionedRdds.saveAsTextFile(args[2]);
//Then run my SingletonFunction (My app depends on the saved Files)
parttionedRdds.map(new SingletonFunc());

The parttionedRdds.map(new SingletonFunc()); is never called do i need to
set ctx.setJobGroup 
or what i am trying to implement will not work ?

What is a usecase for ctx.setJobGroup is there any example?

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3427.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Guidelines for Spark Cluster Sizing

2014-03-28 Thread Sonal Goyal
Hi,

I am looking for any guidelines for Spark Cluster Sizing - are there any
best practices or links for estimating the cluster specifications based on
input data size, transformations etc?

Thanks in advance for helping out.

Best Regards,
Sonal
Nube Technologies 




groupByKey is taking more time

2014-03-28 Thread mohit.goyal
Hi,

I have two RDD 
RDD1=K1,V1
RDD2=K1,V1

e.g-(1,List("A","B","C")),(1,List("D","E","F"))

RDD1.groupByKey(RDD2)

Where K1=Integer
V1=List of String

If I keep size of V1=3(list of three strings). The groupByKey operation
takes 2.6 m
and 
If I keep size of V1=20(list of 20 Strings). The groupByKey operation takes
4.0m

How does size of value(V1) impact groupByKey operation. It should be
dependent on number of key 

I tried the same experiment with spark.shuffle.spill=false got similar
results.

Any idea??



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-is-taking-more-time-tp3425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How does Spark handle executor down? RDD in this executor will be recomputed automatically?

2014-03-28 Thread Sonal Goyal
Each handle to the RDD holds its lineage information, which means it knows
how it was computed starting from data in a reliable storage or from other
RDDs. RDDs hence can be reconstructed when the node fails.

Best Regards,
Sonal
Nube Technologies 






On Fri, Mar 28, 2014 at 3:55 PM, colt_colt  wrote:

> I am curious about Spark fail over scenario, if some executor down,  that
> means the JVM crashed. AM will restart the executor, but how about the RDD
> data in JVM?  if I didn't persist RDD, does Spark will recompute lost RDD
> or
> just let it lose?  there is some description in Spark site: "Each RDD
> remembers the lineage of deterministic operations that were used on a
> fault-tolerant input dataset to create it."
>
> thanks in advance
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-handle-executor-down-RDD-in-this-executor-will-be-recomputed-automatically-tp3422.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: GC overhead limit exceeded

2014-03-28 Thread Syed A. Hashmi
Default is MEMORY_ONLY ... if you explicitly persist a RDD, you have to
explicitly unpersist it if you want to free memory during the job.


On Thu, Mar 27, 2014 at 11:17 PM, Sai Prasanna wrote:

> Oh sorry, that was a mistake, the default level is MEMORY_ONLY !!
> My doubt was, between two different experiments, are the RDDs cached in
> memory need to be unpersisted???
> Or it doesnt matter ?
>


How does Spark handle executor down? RDD in this executor will be recomputed automatically?

2014-03-28 Thread colt_colt
I am curious about Spark fail over scenario, if some executor down,  that
means the JVM crashed. AM will restart the executor, but how about the RDD
data in JVM?  if I didn't persist RDD, does Spark will recompute lost RDD or
just let it lose?  there is some description in Spark site: "Each RDD
remembers the lineage of deterministic operations that were used on a
fault-tolerant input dataset to create it." 

thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-handle-executor-down-RDD-in-this-executor-will-be-recomputed-automatically-tp3422.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Strange behavior of RDD.cartesian

2014-03-28 Thread Jaonary Rabarisoa
I forgot to mention that I don't really use all of my data. Instead I use a
sample extracted with randomSample.


On Fri, Mar 28, 2014 at 10:58 AM, Jaonary Rabarisoa wrote:

> Hi all,
>
> I notice that RDD.cartesian has a strange behavior with cached and
> uncached data. More precisely, I have a set of data that I load with
> objectFile
>
> *val data: RDD[(Int,String,Array[Double])] = sc.objectFile("data")*
>
> Then I split it in two set depending on some criteria
>
>
> *val part1 = data.filter(_._2 matches "view1")*
> *val part2 = data.filter(_._2 matches "view2")*
>
>
> Finally, I compute the cartesian product of part1 and part2
>
> *val pair = part1.cartesian(part2)*
>
>
> If every thing goes well I should have
>
> *pair.count == part1.count * part2.count*
>
> But this is not the case if I don't cache part1 and part2.
>
> What I was missing ? Does caching data mandatory in Spark ?
>
> Cheers,
>
> Jaonary
>
>
>
>


Strange behavior of RDD.cartesian

2014-03-28 Thread Jaonary Rabarisoa
Hi all,

I notice that RDD.cartesian has a strange behavior with cached and uncached
data. More precisely, I have a set of data that I load with objectFile

*val data: RDD[(Int,String,Array[Double])] = sc.objectFile("data")*

Then I split it in two set depending on some criteria


*val part1 = data.filter(_._2 matches "view1")*
*val part2 = data.filter(_._2 matches "view2")*


Finally, I compute the cartesian product of part1 and part2

*val pair = part1.cartesian(part2)*


If every thing goes well I should have

*pair.count == part1.count * part2.count*

But this is not the case if I don't cache part1 and part2.

What I was missing ? Does caching data mandatory in Spark ?

Cheers,

Jaonary


Re:

2014-03-28 Thread Hahn Jiang
I understand.  thanks


On Fri, Mar 28, 2014 at 4:10 AM, Mayur Rustagi wrote:

> You have to raise the global limit as root. Also you have to do that on
> the whole cluster.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Thu, Mar 27, 2014 at 4:07 AM, Hahn Jiang wrote:
>
>> I set "ulimit -n 10" in conf/spark-env.sh, is it too small?
>>
>>
>> On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal wrote:
>>
>>> Hi Hahn,
>>>
>>> What's the ulimit on your systems? Please check the following link for a
>>> discussion on the too many files open.
>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccangvg8qpn_wllsrcjegdb7hmza2ux7myxzhfvtz+b-sdxdk...@mail.gmail.com%3E
>>>
>>>
>>> Sent from my iPad
>>>
>>> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang 
>>> wrote:
>>> >
>>> > Hi, all
>>> >
>>> > I write a spark program on yarn. When I use small size input file, my
>>> program can run well. But my job will failed if input size is more than 40G.
>>> >
>>> > the error log:
>>> > java.io.FileNotFoundException (java.io.FileNotFoundException:
>>> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
>>> (Too many open files))
>>> > java.io.FileOutputStream.openAppend(Native Method)
>>> > java.io.FileOutputStream.(FileOutputStream.java:192)
>>> >
>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>>> >
>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>>> > org.apache.spark.scheduler.Task.run(Task.scala:53)
>>> >
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>> >
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>> > java.lang.Thread.run(Thread.java:662)
>>> >
>>> >
>>> > my object:
>>> > object Test {
>>> >
>>> >   def main(args: Array[String]) {
>>> > val sc = new SparkContext(args(0), "Test",
>>> >   System.getenv("SPARK_HOME"),
>>> SparkContext.jarOfClass(this.getClass))
>>> >
>>> > val mg = sc.textFile("/user/.../part-*")
>>> > val mct = sc.textFile("/user/.../part-*")
>>> >
>>> > val pair1 = mg.map {
>>> >   s =>
>>> > val cols = s.split("\t")
>>> > (cols(0), cols(1))
>>> > }
>>> > val pair2 = mct.map {
>>> >   s =>
>>> > val cols = s.split("\t")
>>> > (cols(0), cols(1))
>>> > }
>>> > val merge = pair1.union(pair2)
>>> > val result = merge.reduceByKey(_ + _)
>>> > val outputPath = new Path("/user/xxx/temp/spark-output")
>>> > outputPath.getFileSystem(new Configuration()).delete(outputPath,
>>> true)
>>> > result.saveAsTextFile(outputPath.toString)
>>> >
>>> > System.exit(0)
>>> >   }
>>> >
>>> > }
>>> >
>>> > My spark version is 0.9 and I run my job use this command
>>> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
>>> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
>>> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
>>> --worker-cores 2"
>>> >
>>>
>>
>>
>


Re: Not getting it

2014-03-28 Thread Sonal Goyal
Have you tried setting the partitioning ?

Best Regards,
Sonal
Nube Technologies 






On Thu, Mar 27, 2014 at 10:04 AM, lannyripple wrote:

> Hi all,
>
> I've got something which I think should be straightforward but it's not so
> I'm not getting it.
>
> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
> of
> memory using 8 cores.
>
> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
> [key,a,b,c...]).
> I have another file of 25K lines containing some number of keys which might
> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
> something.  I'll get to that but this is toy problem that I'm using to get
> some intuition with spark.)
>
> Working on each file individually spark has no problem manipulating the
> files.  If I try and join or union+filter though I can't seem to find the
> join of the two files.  Code is along the lines of
>
> val fileA =
> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>
> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>
> (fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case
> (k, (_, xs)) => xs.exists{_.length == 1}
>
> just causes spark to freeze.  (In all the cases I'm trying I just use a
> final .count to force the results.)
>
> I suspect I'm missing something fundamental about bringing the keyed data
> together into the same partitions so it can be efficiently joined but I've
> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
> shark.") on what I'm not understanding it would be most helpful.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


spark.akka.frameSize setting problem

2014-03-28 Thread lihu
Hi,

I just run a simple example to generate some data for the ALS
algorithm. my spark version is 0.9, and in local mode, the memory of my
node is 108G


but when I set conf.set("spark.akka.frameSize", "4096"), it
then occurred the following problem, and when I do not set this, it runs
well .


I see the Meaning of the *"**spark.akka.frameSize" is the maximum
message  size, and it in MB.*

*From the error info, I guess it is the frameSize too large that the
startup  timed out, but the conf meaning is the Maximum, not the init, and
what confuse me is the "Setting 'maximum-frame-size' must be at least 32000
bytes" info, because my frame-size is greater than 32000. can anyone
explain this?  or maybe the conf document should specify more clearly?*



 *ERROR OneForOneStrategy: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and has constructor with
[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config]
parameters*
*java.lang.IllegalArgumentException: Cannot instantiate transport
[akka.remote.transport.netty.NettyTransport]. Make sure it extends
[akka.remote.transport.Transport] and has constructor with
[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config]
parameters*
* at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)*
* at
akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)*
*...*
* at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
* at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
*Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
'maximum-frame-size' must be at least 32000 bytes*
* at scala.Predef$.require(Predef.scala:233)*
* at akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)*
* ...*
* at scala.util.Success.flatMap(Try.scala:200)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
* ... 19 more*
*14/03/28 07:25:27 ERROR Remoting: Remoting error: [Startup timed out] [*
*akka.remote.RemoteTransportException: Startup timed out*
* at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
* at akka.remote.Remoting.start(Remoting.scala:191)*
* ...*
* at sbt.Logger$$anon$4.apply(Logger.scala:90)*
* at sbt.TrapExit$App.run(TrapExit.scala:244)*
* at java.lang.Thread.run(Thread.java:744)*
*Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]*
* at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)*
* ...*
* at akka.remote.Remoting.start(Remoting.scala:173)*


Exception on simple pyspark script

2014-03-28 Thread idanzalz
Hi,
I am a newbie with Spark.
I tried installing 2 virtual machines, one as a client and one as standalone
mode worker+master.
Everything seems to run and connect fine, but when I try to run a simple
script, I get weird errors.

Here is the traceback, notice my program is just a one-liner:


vagrant@precise32:/usr/local/spark$ MASTER=spark://192.168.16.109:7077
bin/pyspark
Python 2.7.3 (default, Apr 20 2012, 22:44:07)
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
14/03/28 06:45:54 INFO Utils: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/03/28 06:45:54 WARN Utils: Your hostname, precise32 resolves to a
loopback address: 127.0.1.1; using 192.168.16.107 instead (on interface
eth0)
14/03/28 06:45:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
14/03/28 06:45:55 INFO Slf4jLogger: Slf4jLogger started
14/03/28 06:45:55 INFO Remoting: Starting remoting
14/03/28 06:45:55 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@192.168.16.107:55440]
14/03/28 06:45:55 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@192.168.16.107:55440]
14/03/28 06:45:55 INFO SparkEnv: Registering BlockManagerMaster
14/03/28 06:45:55 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140328064555-5a1f
14/03/28 06:45:55 INFO MemoryStore: MemoryStore started with capacity 297.0
MB.
14/03/28 06:45:55 INFO ConnectionManager: Bound socket to port 55114 with id
= ConnectionManagerId(192.168.16.107,55114)
14/03/28 06:45:55 INFO BlockManagerMaster: Trying to register BlockManager
14/03/28 06:45:55 INFO BlockManagerMasterActor$BlockManagerInfo: Registering
block manager 192.168.16.107:55114 with 297.0 MB RAM
14/03/28 06:45:55 INFO BlockManagerMaster: Registered BlockManager
14/03/28 06:45:55 INFO HttpServer: Starting HTTP Server
14/03/28 06:45:55 INFO HttpBroadcast: Broadcast server started at
http://192.168.16.107:58268
14/03/28 06:45:55 INFO SparkEnv: Registering MapOutputTracker
14/03/28 06:45:55 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-2a1f1a0b-f4d9-402a-ac17-a41d9f9aea0c
14/03/28 06:45:55 INFO HttpServer: Starting HTTP Server
14/03/28 06:45:56 INFO SparkUI: Started Spark Web UI at
http://192.168.16.107:4040
14/03/28 06:45:56 INFO AppClient$ClientActor: Connecting to master
spark://192.168.16.109:7077...
14/03/28 06:45:56 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 0.9.0
  /_/

Using Python version 2.7.3 (default, Apr 20 2012 22:44:07)
Spark context available as sc.
>>> 14/03/28 06:45:58 INFO SparkDeploySchedulerBackend: Connected to Spark
>>> cluster with app ID app-20140327234558-
14/03/28 06:47:03 INFO AppClient$ClientActor: Executor added:
app-20140327234558-/0 on worker-20140327234702-192.168.16.109-41619
(192.168.16.109:41619) with 1 cores
14/03/28 06:47:03 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140327234558-/0 on hostPort 192.168.16.109:41619 with 1 cores,
512.0 MB RAM
14/03/28 06:47:04 INFO AppClient$ClientActor: Executor updated:
app-20140327234558-/0 is now RUNNING
14/03/28 06:47:06 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@192.168.16.109:45642/user/Executor#-154634467]
with ID 0
14/03/28 06:47:07 INFO BlockManagerMasterActor$BlockManagerInfo: Registering
block manager 192.168.16.109:60587 with 297.0 MB RAM

>>>
>>> sc.parallelize([1,2]).count()

14/03/28 06:47:35 INFO SparkContext: Starting job: count at :1
14/03/28 06:47:35 INFO DAGScheduler: Got job 0 (count at :1) with 2
output partitions (allowLocal=false)
14/03/28 06:47:35 INFO DAGScheduler: Final stage: Stage 0 (count at
:1)
14/03/28 06:47:35 INFO DAGScheduler: Parents of final stage: List()
14/03/28 06:47:35 INFO DAGScheduler: Missing parents: List()
14/03/28 06:47:35 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[1] at
count at :1), which has no missing parents
14/03/28 06:47:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(PythonRDD[1] at count at :1)
14/03/28 06:47:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/03/28 06:47:35 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 0: 192.168.16.109 (PROCESS_LOCAL)
14/03/28 06:47:35 INFO TaskSetManager: Serialized task 0.0:0 as 2546 bytes
in 4 ms
14/03/28 06:47:37 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
executor 0: 192.168.16.109 (PROCESS_LOCAL)
14/03/28 06:47:37 INFO TaskSetManager: Serialized task 0.0:1 as 2546 bytes
in 1 ms
14/03/28 06:47:37 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
14/03/28 06:47:37 WARN TaskSetManager: Loss was due to
org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/usr/local/spark/py

Re: Replicating RDD elements

2014-03-28 Thread Sonal Goyal
Hi David,

I am sorry but your question is not clear to me. Are you talking about
taking some value and sharing it across your cluster so that it is present
on all the nodes? You can look at Spark's broadcasting in that case. On the
other hand, if you want to take one item and create an RDD of 100 or some
other number of items, you could do a flatMap. Does that help?

Best Regards,
Sonal
Nube Technologies 






On Fri, Mar 28, 2014 at 9:24 AM, David Thomas  wrote:

> How can we replicate RDD elements? Say I have 1 element and 100 nodes in
> the cluster. I need to replicate this one item on all the nodes i.e.
> effectively create an RDD of 100 elements.
>


Re: ArrayIndexOutOfBoundsException in ALS.implicit

2014-03-28 Thread Xiangrui Meng
Hi bearrito,

This is a known issue
(https://spark-project.atlassian.net/browse/SPARK-1281) and it should
be easy to fix by switching to a hash partitioner.

CC'ed dev list in case someone volunteers to work on it.

Best,
Xiangrui

On Thu, Mar 27, 2014 at 8:38 PM, bearrito  wrote:
> Usage of negative product id's causes the above exception.
>
> The cause is the use of the product id's as a mechanism to index into the
> the in and out block structures.
>
> Specifically on 9.0 it occurs at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock$2.apply(ALS.scala:262)
>
> It seems reasonable to expect that product id's are positive, if a bit
> opinionated.  I ran across this because the hash function I was using on my
> product id's includes the negatives in it's range.
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ArrayIndexOutOfBoundsException-in-ALS-implicit-tp3400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.