Re: NPE using saveAsTextFile

2014-04-09 Thread Nick Pentreath
Ok I thought it may be closing over the config option. I am using config
for job configuration, but extracting vals from that. So not sure why as I
thought I'd avoided closing over it. Will go back to source and see where
it is creeping in.



On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia wrote:

> I haven't seen this but it may be a bug in Typesafe Config, since this is
> serializing a Config object. We don't actually use Typesafe Config
> ourselves.
>
> Do you have any nulls in the data itself by any chance? And do you know
> how that Config object is getting there?
>
> Matei
>
> On Apr 9, 2014, at 11:38 PM, Nick Pentreath 
> wrote:
>
> Anyone have a chance to look at this?
>
> Am I just doing something silly somewhere?
>
> If it makes any difference, I am using the elasticsearch-hadoop plugin for
> ESInputFormat. But as I say, I can parse the data (count, first() etc). I
> just can't save it as text file.
>
>
>
>
> On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath 
> wrote:
>
>> Hi
>>
>> I'm using Spark 0.9.0.
>>
>> When calling saveAsTextFile on a custom hadoop inputformat (loaded with
>> newAPIHadoopRDD), I get the following error below.
>>
>> If I call count, I get the correct count of number of records, so the
>> inputformat is being read correctly... the issue only appears when trying
>> to use saveAsTextFile.
>>
>> If I call first() I get the correct output, also. So it doesn't appear to
>> be anything with the data or inputformat.
>>
>> Any idea what the actual problem is, since this stack trace is not
>> obvious (though it seems to be in ResultTask which ultimately causes this).
>>
>> Is this a known issue at all?
>>
>>
>> ==
>>
>> 14/04/08 16:00:46 ERROR OneForOneStrategy:
>> java.lang.NullPointerException
>>  at
>> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>> at
>> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>>  at
>> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:601)
>> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>>  at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
>>  at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Dele

Re: NPE using saveAsTextFile

2014-04-09 Thread Matei Zaharia
I haven’t seen this but it may be a bug in Typesafe Config, since this is 
serializing a Config object. We don’t actually use Typesafe Config ourselves.

Do you have any nulls in the data itself by any chance? And do you know how 
that Config object is getting there?

Matei

On Apr 9, 2014, at 11:38 PM, Nick Pentreath  wrote:

> Anyone have a chance to look at this?
> 
> Am I just doing something silly somewhere?
> 
> If it makes any difference, I am using the elasticsearch-hadoop plugin for 
> ESInputFormat. But as I say, I can parse the data (count, first() etc). I 
> just can't save it as text file.
> 
> 
> 
> 
> On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath  
> wrote:
> Hi
> 
> I'm using Spark 0.9.0.
> 
> When calling saveAsTextFile on a custom hadoop inputformat (loaded with 
> newAPIHadoopRDD), I get the following error below.
> 
> If I call count, I get the correct count of number of records, so the 
> inputformat is being read correctly... the issue only appears when trying to 
> use saveAsTextFile.
> 
> If I call first() I get the correct output, also. So it doesn't appear to be 
> anything with the data or inputformat.
> 
> Any idea what the actual problem is, since this stack trace is not obvious 
> (though it seems to be in ResultTask which ultimately causes this).
> 
> Is this a known issue at all?
> 
> 
> ==
> 
> 14/04/08 16:00:46 ERROR OneForOneStrategy: 
> java.lang.NullPointerException
>   at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>   at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>   at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
>   at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at 
> java.io.ObjectStreamClass.in

Re: NPE using saveAsTextFile

2014-04-09 Thread Nick Pentreath
Anyone have a chance to look at this?

Am I just doing something silly somewhere?

If it makes any difference, I am using the elasticsearch-hadoop plugin for
ESInputFormat. But as I say, I can parse the data (count, first() etc). I
just can't save it as text file.




On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath wrote:

> Hi
>
> I'm using Spark 0.9.0.
>
> When calling saveAsTextFile on a custom hadoop inputformat (loaded with
> newAPIHadoopRDD), I get the following error below.
>
> If I call count, I get the correct count of number of records, so the
> inputformat is being read correctly... the issue only appears when trying
> to use saveAsTextFile.
>
> If I call first() I get the correct output, also. So it doesn't appear to
> be anything with the data or inputformat.
>
> Any idea what the actual problem is, since this stack trace is not obvious
> (though it seems to be in ResultTask which ultimately causes this).
>
> Is this a known issue at all?
>
>
> ==
>
> 14/04/08 16:00:46 ERROR OneForOneStrategy:
> java.lang.NullPointerException
>  at
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
> at
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>  at
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:601)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
>  at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:601)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
>  at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStr

Strange behaviour of different SSCs with same Kafka topic

2014-04-09 Thread gaganbm
I am really at my wits' end here.

I have different Streaming contexts, lets say 2, and both listening to same
Kafka topics. I establish the KafkaStream by setting different consumer
groups to each of them.

Ideally, I should be seeing the kafka events in both the streams. But what I
am getting is really unpredictable. Only one stream gets a lot of events and
the other one almost gets nothing or very less compared to the other. Also
the frequency is very skewed. I get a lot of events in one stream
continuously, and after some duration I get a few events in the other one.

I don't know where I am going wrong. I can see consumer fetcher threads for
both the streams that listen to the Kafka topics.  

I can give further details if needed. Any help will be great. 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


shuffle memory requirements

2014-04-09 Thread Ameet Kini
val hrdd = sc.hadoopRDD(..)
val res =
hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
some code to save those partitions )

I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
custom partitioner generates over 20,000 partitions, so there are 20,000
tasks reading the shuffle files. On problems with low partitions (~ 1000),
the job completes successfully.

On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
cores per executor and brought it down to 3, and I still get
OutOfMemoryErrors at 20,000 partitions. I have
spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
I am not caching any RDDs.

Do those config params look reasonable for my shuffle size ? I'm not sure
what to increase - shuffle.memoryFraction or the memory that the reduce
tasks get. The latter I am guessing is whatever is left after giving
storage.memoryFraction and shuffle.memoryFraction.

Thanks,
Ameet


Re: pySpark memory usage

2014-04-09 Thread Jim Blomo
This dataset is uncompressed text at ~54GB. stats() returns (count:
56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
343)

On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia  wrote:
> Okay, thanks. Do you have any info on how large your records and data file 
> are? I'd like to reproduce and fix this.
>
> Matei
>
> On Apr 9, 2014, at 3:52 PM, Jim Blomo  wrote:
>
>> Hi Matei, thanks for working with me to find these issues.
>>
>> To summarize, the issues I've seen are:
>> 0.9.0:
>> - https://issues.apache.org/jira/browse/SPARK-1323
>>
>> SNAPSHOT 2014-03-18:
>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>> Java heap space.  To me this indicates a memory leak since Spark
>> should simply be counting records of size < 3MB
>> - Without persist(), "stdin writer to Python finished early" hangs the
>> application, unknown root cause
>>
>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>> problem:
>>
>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
>> java.net.SocketException: Connection reset
>>at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>at 
>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>at 
>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>at 
>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>at 
>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>at 
>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>at 
>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>at 
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>at java.io.DataInputStream.read(DataInputStream.java:100)
>>at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>at 
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>at 
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>at 
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>
>>
>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia  
>> wrote:
>>> Cool, thanks for the update. Have you tried running a branch with this fix 
>>> (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
>>> issue are you referring to, is it separate from this? (Couldn't find it 
>>> earlier in the thread.)
>>>
>>> To turn on debug logging, copy conf/log4j.properties.template to 
>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console 
>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present 
>>> in "conf" on all workers.
>>>
>>> BTW I've managed to run PySpark with this fix on some reasonably large S3 
>>> data (multiple GB) and it was fine. It might happen only if records are 
>>> large, or something like that. How much heap are you giving to your 
>>> executors, and does it show that much in the web UI?
>>>
>>> Matei
>>>
>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo  wrote:
>>>
 I think the problem I ran into in 0.9 is covered in
 https://issues.apa

Re: Only TraversableOnce?

2014-04-09 Thread Nan Zhu
Yeah, should be right 

-- 
Nan Zhu


On Wednesday, April 9, 2014 at 8:54 PM, wxhsdp wrote:

> thank you, it works
> after my operation over p, return p.toIterator, because mapPartitions has
> iterator return type, is that right?
> rdd.mapPartitions{D => {val p = D.toArray; ...; p.toIterator}}
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Only-TraversableOnce-tp3873p4043.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com 
> (http://Nabble.com).
> 
> 




Re: Only TraversableOnce?

2014-04-09 Thread wxhsdp
thank you, it works
after my operation over p, return p.toIterator, because mapPartitions has
iterator return type, is that right?
rdd.mapPartitions{D => {val p = D.toArray; ...; p.toIterator}}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Only-TraversableOnce-tp3873p4043.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Multi master Spark

2014-04-09 Thread Aaron Davidson
It is as Jagat said. The Masters do not need to know about one another, as
ZooKeeper manages their implicit communication. As for Workers (and
applications, such as spark-shell), once a Worker is registered with
*some *Master,
its metadata is stored in ZooKeeper such that if another Master is elected,
it informs all current Workers (and applications), so they need not have a
priori knowledge of all future Masters.


On Wed, Apr 9, 2014 at 3:46 PM, Dmitriy Lyubimov  wrote:

> ah.
>
> standalone HA master was added in 0.9.0. Same logic, but Spark-native.
>
>
> On Wed, Apr 9, 2014 at 3:31 PM, Pradeep Ch wrote:
>
>> Thanks Dmitriy. But I want multi master support when running spark
>> standalone. Also I want to know if this multi master thing works if I use
>> spark-shell.
>>
>>
>> On Wed, Apr 9, 2014 at 3:26 PM, Dmitriy Lyubimov wrote:
>>
>>> The only way i know to do this is to use mesos with zookeepers. you
>>> specify zookeeper url as spark url that contains multiple zookeeper hosts.
>>> Multiple mesos masters are then elected thru zookeeper leader election
>>> until current leader dies; at which point mesos will elect another master
>>> (if still left).
>>>
>>> iirc, in this mode spark master never runs, only master slaves are being
>>> spun by mesos slaves directly.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch 
>>> wrote:
>>>
 Hi,

 I want to enable Spark Master HA in spark. Documentation specifies that
 we can do this with the help of Zookeepers. But what I am worried is how to
 configure one master with the other and similarly how do workers know that
 the have two masters? where do you specify the multi-master information?

 Thanks for the help.

 Thanks,
 Pradeep

>>>
>>>
>>
>


Re: Best way to turn an RDD back into a SchemaRDD

2014-04-09 Thread Michael Armbrust
Good question.  This is something we wanted to fix, but unfortunately I'm
not sure how to do it without changing the API to RDD, which is undesirable
now that the 1.0 branch has been cut. We should figure something out though
for 1.1.

I've created https://issues.apache.org/jira/browse/SPARK-1460 to track this.

A few workarounds / hacks.
 - For distinct you can do it in SQL instead of using the Spark function.
 This will preserve the schema.
 - When getting rows it may be more concise to use the extractor instead of
asInstanceOf:
  schemaRDD.map { case Row(key: Int, value: String) => ... }

Michael


On Wed, Apr 9, 2014 at 4:05 PM, Jan-Paul Bultmann wrote:

> Hey,
> My application requires the use of "classical" RDD methods like `distinct`
> and `subtract` on SchemaRDDs.
> What is the preferred way to turn the resulting regular
> RDD[org.apache.spark.sql.Row] back into SchemaRDDs?
> Calling toSchemaRDD, will not work as the Schema information seems lost
> already.
> To make matters even more complicated the contents of Row are Any typed.
>
> So to turn make this work one has to map over the result RDD, call
> `asInstanceOf` on the Content and then put that back into
> case classes. Which seems like overkill to me.
>
> Is there a better way, that maybe employs some smart casting or reuse of
> Schema information?
>
> All the best,
> Jan


Re: Spark 0.9.1 released

2014-04-09 Thread Nicholas Chammas
Ah, looks good now. It took me a minute to realize that doing a hard
refresh on the docs page was missing the RDD class doc page...

And thanks for updating the release notes.


On Wed, Apr 9, 2014 at 7:21 PM, Tathagata Das
wrote:

> Thanks Nick for pointing that out! I have updated the release 
> notes.
> But I see the new operations like repartition in the latest PySpark RDD
> docs . Maybe
> refresh the page couple of times?
>
> TD
>
>
> On Wed, Apr 9, 2014 at 3:58 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> A very nice addition for us PySpark users in 0.9.1 is the addition of
>> RDD.repartition(), which is not mentioned in the release 
>> notes
>> !
>>
>> This is super helpful for when you create an RDD from a gzipped file and
>> then need to explicitly shuffle the data around to parallelize operations
>> on it appropriately.
>>
>> Thanks people!
>>
>> FYI, 
>> docs/latesthasn't
>>  been updated yet to reflect the new additions to PySpark.
>>
>> Nick
>>
>>
>>
>> On Wed, Apr 9, 2014 at 6:07 PM, Matei Zaharia wrote:
>>
>>> Thanks TD for managing this release, and thanks to everyone who
>>> contributed!
>>>
>>> Matei
>>>
>>> On Apr 9, 2014, at 2:59 PM, Tathagata Das 
>>> wrote:
>>>
>>> A small additional note: Please use the direct download links in the
>>> Spark Downloads  page. The
>>> Apache mirrors take a day or so to sync from the main repo, so may not work
>>> immediately.
>>>
>>> TD
>>>
>>>
>>> On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hi everyone,

 We have just posted Spark 0.9.1, which is a maintenance release with
 bug fixes, performance improvements, better stability with YARN and
 improved parity of the Scala and Python API. We recommend all 0.9.0
 users to upgrade to this stable release.

 This is the first release since Spark graduated as a top level Apache
 project. Contributions to this release came from 37 developers.

 The full release notes are at:
 http://spark.apache.org/releases/spark-release-0-9-1.html

 You can download the release at:
 http://spark.apache.org/downloads.html

 Thanks all the developers who contributed to this release:
 Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
 Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
 Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
 Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
 Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
 Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
 Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
 shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng

 TD

>>>
>>>
>>>
>>
>


Re: trouble with "join" on large RDDs

2014-04-09 Thread Brad Miller
I set SPARK_MEM in the driver process by setting
"spark.executor.memory" to 10G.  Each machine had 32G of RAM and a
dedicated 32G spill volume.  I believe all of the units are in pages,
and the page size is the standard 4K.  There are 15 slave nodes in the
cluster and the sizes of the datasets I'm trying to join are about
2.5G and 25G when serialized and compressed in the RDD cache.

I appreciate that Python lacks the type of heap size controls
available in Java, but lack any concept of how the different
computational tasks are partitioned between Java and Python in pyspark
(so it's unclear to me how much freedom python should have to chew
through tons of memory).

A couple questions which this raises for me are:
-Are there any parameters I could tune differently to try and prevent
this crashing behavior?
-Do we know why this doesn't spill to disk (as Patrick Wendell
mentions that shuffle spill is for aggregations which occur during the
reduce phase)?
-Do we have any hunch about what computation is occurring when the crash occurs?

I'd definitely appreciate the insight of others, and am willing to run
experiments and send results/errors/logs out.  Also, I'm physically
located in Soda Hall (Berkeley) so if anyone near by is interested to
examine this first hand I am glad to meet up.

best,
-Brad


On Wed, Apr 9, 2014 at 4:21 AM, Andrew Ash  wrote:
> A JVM can easily be limited in how much memory it uses with the -Xmx
> parameter, but Python doesn't have memory limits built in in such a
> first-class way.  Maybe the memory limits aren't making it to the python
> executors.
>
> What was your SPARK_MEM setting?  The JVM below seems to be using 603201
> (pages?) and the 3 large python processes each are using ~180 (pages?).
> I'm unsure the units that the OOM killer's RSS column is in.  Could be
> either pages (4kb each) or bytes.
>
>
> Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  234812573
> 2102  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  234912573
> 2101  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  235012573
> 2101  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  511512571
> 2101  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  511612571
> 2101  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  511712571
> 2101  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  772512570
> 2098  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  772612570
> 2098  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  772712570
> 2098  220 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 1032412570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 1032512570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 1032612570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
> 47932 1900 0 java
> Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 1329512570
> 2100  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 1329612570
> 2100  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 1329712570
> 2100  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 1519212570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 1519312570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 1519512570
> 2098  230 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
> 181846335730 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
> 168649233160 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
> 176234434630 0 python
> Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
> 15198 (python) score 221 or sacrifice child
> Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198 (python)
> total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB
>
>
> On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller 
> wrote:
>>
>> Hi All,
>>
>> I poked around a bit more to (1) confirm my suspicions that the crash
>> was relat

Re: Spark 0.9.1 released

2014-04-09 Thread Tathagata Das
Thanks Nick for pointing that out! I have updated the release
notes.
But I see the new operations like repartition in the latest PySpark
RDD docs.
Maybe refresh the page couple of times?

TD


On Wed, Apr 9, 2014 at 3:58 PM, Nicholas Chammas  wrote:

> A very nice addition for us PySpark users in 0.9.1 is the addition of
> RDD.repartition(), which is not mentioned in the release 
> notes
> !
>
> This is super helpful for when you create an RDD from a gzipped file and
> then need to explicitly shuffle the data around to parallelize operations
> on it appropriately.
>
> Thanks people!
>
> FYI, 
> docs/latesthasn't 
> been updated yet to reflect the new additions to PySpark.
>
> Nick
>
>
>
> On Wed, Apr 9, 2014 at 6:07 PM, Matei Zaharia wrote:
>
>> Thanks TD for managing this release, and thanks to everyone who
>> contributed!
>>
>> Matei
>>
>> On Apr 9, 2014, at 2:59 PM, Tathagata Das 
>> wrote:
>>
>> A small additional note: Please use the direct download links in the
>> Spark Downloads  page. The
>> Apache mirrors take a day or so to sync from the main repo, so may not work
>> immediately.
>>
>> TD
>>
>>
>> On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> We have just posted Spark 0.9.1, which is a maintenance release with
>>> bug fixes, performance improvements, better stability with YARN and
>>> improved parity of the Scala and Python API. We recommend all 0.9.0
>>> users to upgrade to this stable release.
>>>
>>> This is the first release since Spark graduated as a top level Apache
>>> project. Contributions to this release came from 37 developers.
>>>
>>> The full release notes are at:
>>> http://spark.apache.org/releases/spark-release-0-9-1.html
>>>
>>> You can download the release at:
>>> http://spark.apache.org/downloads.html
>>>
>>> Thanks all the developers who contributed to this release:
>>> Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
>>> Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
>>> Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
>>> Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
>>> Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
>>> Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
>>> Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
>>> shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng
>>>
>>> TD
>>>
>>
>>
>>
>


Best way to turn an RDD back into a SchemaRDD

2014-04-09 Thread Jan-Paul Bultmann
Hey,
My application requires the use of “classical” RDD methods like `distinct` and 
`subtract` on SchemaRDDs.
What is the preferred way to turn the resulting regular 
RDD[org.apache.spark.sql.Row] back into SchemaRDDs?
Calling toSchemaRDD, will not work as the Schema information seems lost already.
To make matters even more complicated the contents of Row are Any typed.

So to turn make this work one has to map over the result RDD, call 
`asInstanceOf` on the Content and then put that back into
case classes. Which seems like overkill to me.

Is there a better way, that maybe employs some smart casting or reuse of Schema 
information?

All the best,
Jan

Re: pySpark memory usage

2014-04-09 Thread Matei Zaharia
Okay, thanks. Do you have any info on how large your records and data file are? 
I’d like to reproduce and fix this.

Matei

On Apr 9, 2014, at 3:52 PM, Jim Blomo  wrote:

> Hi Matei, thanks for working with me to find these issues.
> 
> To summarize, the issues I've seen are:
> 0.9.0:
> - https://issues.apache.org/jira/browse/SPARK-1323
> 
> SNAPSHOT 2014-03-18:
> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
> Java heap space.  To me this indicates a memory leak since Spark
> should simply be counting records of size < 3MB
> - Without persist(), "stdin writer to Python finished early" hangs the
> application, unknown root cause
> 
> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
> debugging turned on.  This gives me the stacktrace on the new "stdin"
> problem:
> 
> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
> java.net.SocketException: Connection reset
>at java.net.SocketInputStream.read(SocketInputStream.java:196)
>at java.net.SocketInputStream.read(SocketInputStream.java:122)
>at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>at 
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>at 
> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>at 
> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>at java.io.FilterInputStream.read(FilterInputStream.java:133)
>at 
> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>at 
> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>at 
> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>at java.io.DataInputStream.read(DataInputStream.java:100)
>at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>at 
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> 
> 
> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia  wrote:
>> Cool, thanks for the update. Have you tried running a branch with this fix 
>> (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
>> issue are you referring to, is it separate from this? (Couldn't find it 
>> earlier in the thread.)
>> 
>> To turn on debug logging, copy conf/log4j.properties.template to 
>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console 
>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
>> "conf" on all workers.
>> 
>> BTW I've managed to run PySpark with this fix on some reasonably large S3 
>> data (multiple GB) and it was fine. It might happen only if records are 
>> large, or something like that. How much heap are you giving to your 
>> executors, and does it show that much in the web UI?
>> 
>> Matei
>> 
>> On Mar 29, 2014, at 10:44 PM, Jim Blomo  wrote:
>> 
>>> I think the problem I ran into in 0.9 is covered in
>>> https://issues.apache.org/jira/browse/SPARK-1323
>>> 
>>> When I kill the python process, the stacktrace I gets indicates that
>>> this happens at initialization.  It looks like the initial write to
>>> the Python process does not go through, and then the iterator hangs
>>> waiting for output.  I haven't h

Re: Spark 0.9.1 released

2014-04-09 Thread Nicholas Chammas
A very nice addition for us PySpark users in 0.9.1 is the addition of
RDD.repartition(), which is not mentioned in the release
notes
!

This is super helpful for when you create an RDD from a gzipped file and
then need to explicitly shuffle the data around to parallelize operations
on it appropriately.

Thanks people!

FYI, 
docs/latesthasn't
been updated yet to reflect the new additions to PySpark.

Nick



On Wed, Apr 9, 2014 at 6:07 PM, Matei Zaharia wrote:

> Thanks TD for managing this release, and thanks to everyone who
> contributed!
>
> Matei
>
> On Apr 9, 2014, at 2:59 PM, Tathagata Das 
> wrote:
>
> A small additional note: Please use the direct download links in the Spark
> Downloads  page. The Apache
> mirrors take a day or so to sync from the main repo, so may not work
> immediately.
>
> TD
>
>
> On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das  > wrote:
>
>> Hi everyone,
>>
>> We have just posted Spark 0.9.1, which is a maintenance release with
>> bug fixes, performance improvements, better stability with YARN and
>> improved parity of the Scala and Python API. We recommend all 0.9.0
>> users to upgrade to this stable release.
>>
>> This is the first release since Spark graduated as a top level Apache
>> project. Contributions to this release came from 37 developers.
>>
>> The full release notes are at:
>> http://spark.apache.org/releases/spark-release-0-9-1.html
>>
>> You can download the release at:
>> http://spark.apache.org/downloads.html
>>
>> Thanks all the developers who contributed to this release:
>> Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
>> Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
>> Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
>> Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
>> Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
>> Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
>> Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
>> shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng
>>
>> TD
>>
>
>
>


Re: Problem with running LogisticRegression in spark cluster mode

2014-04-09 Thread Jenny Zhao
Hi Jagat,

yes, I did specify mllib in build.sbt

name := "Spark LogisticRegression"

version :="1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" %
"0.9.0-incubating"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
"0.9.0-incubating"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "1.2.1"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";



On Wed, Apr 9, 2014 at 3:23 PM, Jagat Singh  wrote:

> Hi Jenny,
>
> How are you packaging your jar.
>
> Can you please confirm if you have included the Mlib jar inside the fat
> jar you have created for your code.
>
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.9.3" %
> "0.8.1-incubating"
>
> Thanks,
>
> Jagat Singh
>
>
> On Thu, Apr 10, 2014 at 8:05 AM, Jenny Zhao wrote:
>
>>
>> Hi all,
>>
>> I have been able to run LR in local mode,  but I am facing problem to run
>> it in cluster mode,  below is the source script, and stack trace when
>> running it cluster mode, I used sbt package to build the project, not sure
>> what it is complaining?
>>
>> another question I have is for LogisticRegression itself:
>>
>> 1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
>> the input features, for instance, if the feature is scale, norminal or
>> ordinal, or if MLLib only supports scale features?
>>
>> 2) Trainning error is pretty high even when the iteration is set to very
>> high, do we have number about the accuracy rate of LR model?
>>
>> Thank you for your help!
>>
>> /**
>>  * Logistic regression
>>  */
>> object SparkLogisticRegression {
>>
>>
>>   def main(args: Array[String]) {
>> if ( args.length != 3) {
>>   System.err.println("Usage: SparkLogisticRegression  > file path> >   System.exit(1)
>> }
>>
>> val numIterations = args(2).toInt;
>>
>> val sc = new SparkContext(args(0), "SparkLogisticRegression",
>>   System.getenv("SPARK_HOME"),
>>   SparkContext.jarOfClass(this.getClass))
>>
>> // parse in the input data
>> val data = sc.textFile(args(1))
>> val lpoints = data.map{ line =>
>>   val parts = line.split(',')
>>   LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
>> x.toDouble).toArray)
>> }
>>
>> // setup LR
>> val model = LogisticRegressionWithSGD.train(lpoints, numIterations)
>>
>> val labelPred = lpoints.map { p =>
>>   val pred = model.predict(p.features)
>>   (p.label, pred)
>> }
>>
>> val predErr = labelPred.filter (r => r._1 != r._2).count
>> println("Training Error: " + predErr.toDouble/lpoints.count + " " +
>> predErr + "/" + lpoints.count)
>>  }
>>
>> }
>>
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> at java.lang.Class.forName(Class.java:211)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>> at
>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>> at
>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>> at
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>> at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
>> at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>   

Re: pySpark memory usage

2014-04-09 Thread Jim Blomo
Hi Matei, thanks for working with me to find these issues.

To summarize, the issues I've seen are:
0.9.0:
- https://issues.apache.org/jira/browse/SPARK-1323

SNAPSHOT 2014-03-18:
- When persist() used and batchSize=1, java.lang.OutOfMemoryError:
Java heap space.  To me this indicates a memory leak since Spark
should simply be counting records of size < 3MB
- Without persist(), "stdin writer to Python finished early" hangs the
application, unknown root cause

I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
debugging turned on.  This gives me the stacktrace on the new "stdin"
problem:

14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
at 
org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at 
org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
at 
org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
at 
org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)


On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia  wrote:
> Cool, thanks for the update. Have you tried running a branch with this fix 
> (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
> issue are you referring to, is it separate from this? (Couldn't find it 
> earlier in the thread.)
>
> To turn on debug logging, copy conf/log4j.properties.template to 
> conf/log4j.properties and change the line log4j.rootCategory=INFO, console to 
> log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
> "conf" on all workers.
>
> BTW I've managed to run PySpark with this fix on some reasonably large S3 
> data (multiple GB) and it was fine. It might happen only if records are 
> large, or something like that. How much heap are you giving to your 
> executors, and does it show that much in the web UI?
>
> Matei
>
> On Mar 29, 2014, at 10:44 PM, Jim Blomo  wrote:
>
>> I think the problem I ran into in 0.9 is covered in
>> https://issues.apache.org/jira/browse/SPARK-1323
>>
>> When I kill the python process, the stacktrace I gets indicates that
>> this happens at initialization.  It looks like the initial write to
>> the Python process does not go through, and then the iterator hangs
>> waiting for output.  I haven't had luck turning on debugging for the
>> executor process.  Still trying to learn the lgo4j properties that
>> need to be set.
>>
>> No luck yet on tracking down the memory leak.
>>
>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>> org.apache.spark.SparkException: Python worker exited

Re: Multi master Spark

2014-04-09 Thread Dmitriy Lyubimov
ah.

standalone HA master was added in 0.9.0. Same logic, but Spark-native.


On Wed, Apr 9, 2014 at 3:31 PM, Pradeep Ch wrote:

> Thanks Dmitriy. But I want multi master support when running spark
> standalone. Also I want to know if this multi master thing works if I use
> spark-shell.
>
>
> On Wed, Apr 9, 2014 at 3:26 PM, Dmitriy Lyubimov wrote:
>
>> The only way i know to do this is to use mesos with zookeepers. you
>> specify zookeeper url as spark url that contains multiple zookeeper hosts.
>> Multiple mesos masters are then elected thru zookeeper leader election
>> until current leader dies; at which point mesos will elect another master
>> (if still left).
>>
>> iirc, in this mode spark master never runs, only master slaves are being
>> spun by mesos slaves directly.
>>
>>
>>
>>
>>
>> On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to enable Spark Master HA in spark. Documentation specifies that
>>> we can do this with the help of Zookeepers. But what I am worried is how to
>>> configure one master with the other and similarly how do workers know that
>>> the have two masters? where do you specify the multi-master information?
>>>
>>> Thanks for the help.
>>>
>>> Thanks,
>>> Pradeep
>>>
>>
>>
>


Re: Multi master Spark

2014-04-09 Thread Pradeep Ch
Thanks Dmitriy. But I want multi master support when running spark
standalone. Also I want to know if this multi master thing works if I use
spark-shell.


On Wed, Apr 9, 2014 at 3:26 PM, Dmitriy Lyubimov  wrote:

> The only way i know to do this is to use mesos with zookeepers. you
> specify zookeeper url as spark url that contains multiple zookeeper hosts.
> Multiple mesos masters are then elected thru zookeeper leader election
> until current leader dies; at which point mesos will elect another master
> (if still left).
>
> iirc, in this mode spark master never runs, only master slaves are being
> spun by mesos slaves directly.
>
>
>
>
>
> On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch wrote:
>
>> Hi,
>>
>> I want to enable Spark Master HA in spark. Documentation specifies that
>> we can do this with the help of Zookeepers. But what I am worried is how to
>> configure one master with the other and similarly how do workers know that
>> the have two masters? where do you specify the multi-master information?
>>
>> Thanks for the help.
>>
>> Thanks,
>> Pradeep
>>
>
>


Re: programmatic way to tell Spark version

2014-04-09 Thread Nicholas Chammas
Hey Patrick,

I've created SPARK-1458  to
track this request, in case the team/community wants to implement it in the
future.

Nick


On Sat, Feb 22, 2014 at 7:25 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> No use case at the moment.
>
> What prompted the question: I was going to ask a different question on
> this list and wanted to note my version of Spark. I assumed there would be
> a getVersion method on SparkContext or something like that, but I couldn't
> find one in the docs. I also couldn't find an environment variable with the
> version. After futzing around a bit I realized it was printed out (quite
> conspicuously) in the shell startup banner.
>
>
> On Sat, Feb 22, 2014 at 7:15 PM, Patrick Wendell wrote:
>
>> AFIAK - We don't have any way to do this right now. Maybe we could add
>> a getVersion method to SparkContext that would tell you. Just
>> wondering - what is the use case here?
>>
>> - Patrick
>>
>> On Sat, Feb 22, 2014 at 4:04 PM, nicholas.chammas
>>  wrote:
>> > Is there a programmatic way to tell what version of Spark I'm running?
>> >
>> > I know I can look at the banner when the Spark shell starts up, but I'm
>> > curious to know if there's another way.
>> >
>> > Nick
>> >
>> >
>> > 
>> > View this message in context: programmatic way to tell Spark version
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Multi master Spark

2014-04-09 Thread Jagat Singh
Hello Pradeep,

Quoting from

https://spark.apache.org/docs/0.9.0/spark-standalone.html

In order to schedule new applications or add Workers to the cluster, they
need to know the IP address of the current leader. This can be accomplished
by simply passing in a list of Masters where you used to pass in a single
one. For example, you might start your SparkContext pointing to
spark://host1:port1,host2:port2. This would cause your SparkContext to try
registering with both Masters - if host1 goes down, this configuration
would still be correct as we'd find the new leader, host2.

Thanks,

Jagat Singh


On Thu, Apr 10, 2014 at 8:08 AM, Pradeep Ch wrote:

> Hi,
>
> I want to enable Spark Master HA in spark. Documentation specifies that we
> can do this with the help of Zookeepers. But what I am worried is how to
> configure one master with the other and similarly how do workers know that
> the have two masters? where do you specify the multi-master information?
>
> Thanks for the help.
>
> Thanks,
> Pradeep
>


Re: Multi master Spark

2014-04-09 Thread Dmitriy Lyubimov
The only way i know to do this is to use mesos with zookeepers. you specify
zookeeper url as spark url that contains multiple zookeeper hosts. Multiple
mesos masters are then elected thru zookeeper leader election until current
leader dies; at which point mesos will elect another master (if still
left).

iirc, in this mode spark master never runs, only master slaves are being
spun by mesos slaves directly.





On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch wrote:

> Hi,
>
> I want to enable Spark Master HA in spark. Documentation specifies that we
> can do this with the help of Zookeepers. But what I am worried is how to
> configure one master with the other and similarly how do workers know that
> the have two masters? where do you specify the multi-master information?
>
> Thanks for the help.
>
> Thanks,
> Pradeep
>


Re: Problem with running LogisticRegression in spark cluster mode

2014-04-09 Thread Jagat Singh
Hi Jenny,

How are you packaging your jar.

Can you please confirm if you have included the Mlib jar inside the fat jar
you have created for your code.

libraryDependencies += "org.apache.spark" % "spark-mllib_2.9.3" %
"0.8.1-incubating"

Thanks,

Jagat Singh


On Thu, Apr 10, 2014 at 8:05 AM, Jenny Zhao  wrote:

>
> Hi all,
>
> I have been able to run LR in local mode,  but I am facing problem to run
> it in cluster mode,  below is the source script, and stack trace when
> running it cluster mode, I used sbt package to build the project, not sure
> what it is complaining?
>
> another question I have is for LogisticRegression itself:
>
> 1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
> the input features, for instance, if the feature is scale, norminal or
> ordinal, or if MLLib only supports scale features?
>
> 2) Trainning error is pretty high even when the iteration is set to very
> high, do we have number about the accuracy rate of LR model?
>
> Thank you for your help!
>
> /**
>  * Logistic regression
>  */
> object SparkLogisticRegression {
>
>
>   def main(args: Array[String]) {
> if ( args.length != 3) {
>   System.err.println("Usage: SparkLogisticRegression   file path>System.exit(1)
> }
>
> val numIterations = args(2).toInt;
>
> val sc = new SparkContext(args(0), "SparkLogisticRegression",
>   System.getenv("SPARK_HOME"),
>   SparkContext.jarOfClass(this.getClass))
>
> // parse in the input data
> val data = sc.textFile(args(1))
> val lpoints = data.map{ line =>
>   val parts = line.split(',')
>   LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
> x.toDouble).toArray)
> }
>
> // setup LR
> val model = LogisticRegressionWithSGD.train(lpoints, numIterations)
>
> val labelPred = lpoints.map { p =>
>   val pred = model.predict(p.features)
>   (p.label, pred)
> }
>
> val predErr = labelPred.filter (r => r._1 != r._2).count
> println("Training Error: " + predErr.toDouble/lpoints.count + " " +
> predErr + "/" + lpoints.count)
>  }
>
> }
>
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> at java.lang.Class.forName(Class.java:211)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
> at
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
> at
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
> at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:906)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:929)
> at java.lang.Thread.run(Thread.java:796)
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 1]
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:1 as
> TID 2 

Multi master Spark

2014-04-09 Thread Pradeep Ch
Hi,

I want to enable Spark Master HA in spark. Documentation specifies that we
can do this with the help of Zookeepers. But what I am worried is how to
configure one master with the other and similarly how do workers know that
the have two masters? where do you specify the multi-master information?

Thanks for the help.

Thanks,
Pradeep


Re: Spark 0.9.1 released

2014-04-09 Thread Matei Zaharia
Thanks TD for managing this release, and thanks to everyone who contributed!

Matei

On Apr 9, 2014, at 2:59 PM, Tathagata Das  wrote:

> A small additional note: Please use the direct download links in the Spark 
> Downloads page. The Apache mirrors take a day or so to sync from the main 
> repo, so may not work immediately.
> 
> TD
> 
> 
> On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das  
> wrote:
> Hi everyone,
> 
> We have just posted Spark 0.9.1, which is a maintenance release with
> bug fixes, performance improvements, better stability with YARN and
> improved parity of the Scala and Python API. We recommend all 0.9.0
> users to upgrade to this stable release.
> 
> This is the first release since Spark graduated as a top level Apache
> project. Contributions to this release came from 37 developers.
> 
> The full release notes are at:
> http://spark.apache.org/releases/spark-release-0-9-1.html
> 
> You can download the release at:
> http://spark.apache.org/downloads.html
> 
> Thanks all the developers who contributed to this release:
> Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
> Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
> Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
> Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
> Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
> Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
> Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
> shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng
> 
> TD
> 



Problem with running LogisticRegression in spark cluster mode

2014-04-09 Thread Jenny Zhao
Hi all,

I have been able to run LR in local mode,  but I am facing problem to run
it in cluster mode,  below is the source script, and stack trace when
running it cluster mode, I used sbt package to build the project, not sure
what it is complaining?

another question I have is for LogisticRegression itself:

1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
the input features, for instance, if the feature is scale, norminal or
ordinal, or if MLLib only supports scale features?

2) Trainning error is pretty high even when the iteration is set to very
high, do we have number about the accuracy rate of LR model?

Thank you for your help!

/**
 * Logistic regression
 */
object SparkLogisticRegression {


  def main(args: Array[String]) {
if ( args.length != 3) {
  System.err.println("Usage: SparkLogisticRegression   
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
x.toDouble).toArray)
}

// setup LR
val model = LogisticRegressionWithSGD.train(lpoints, numIterations)

val labelPred = lpoints.map { p =>
  val pred = model.predict(p.features)
  (p.label, pred)
}

val predErr = labelPred.filter (r => r._1 != r._2).count
println("Training Error: " + predErr.toDouble/lpoints.count + " " +
predErr + "/" + lpoints.count)
 }

}

14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
at java.lang.Class.forName(Class.java:211)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:906)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:929)
at java.lang.Thread.run(Thread.java:796)
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 1]
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
2 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1696 bytes in 0 ms
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
3 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1696 bytes in 0 ms
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 3 (task 0.0:0)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 2]
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
4 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1696 bytes in 1 ms
14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 4 (task 0.0:0)
14/04/09 14:50:49 INFO scheduler.T

Re: Spark 0.9.1 released

2014-04-09 Thread Tathagata Das
A small additional note: Please use the direct download links in the Spark
Downloads  page. The Apache mirrors
take a day or so to sync from the main repo, so may not work immediately.

TD


On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das
wrote:

> Hi everyone,
>
> We have just posted Spark 0.9.1, which is a maintenance release with
> bug fixes, performance improvements, better stability with YARN and
> improved parity of the Scala and Python API. We recommend all 0.9.0
> users to upgrade to this stable release.
>
> This is the first release since Spark graduated as a top level Apache
> project. Contributions to this release came from 37 developers.
>
> The full release notes are at:
> http://spark.apache.org/releases/spark-release-0-9-1.html
>
> You can download the release at:
> http://spark.apache.org/downloads.html
>
> Thanks all the developers who contributed to this release:
> Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
> Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
> Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
> Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
> Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
> Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
> Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
> shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng
>
> TD
>


Spark 0.9.1 released

2014-04-09 Thread Tathagata Das
Hi everyone,

We have just posted Spark 0.9.1, which is a maintenance release with
bug fixes, performance improvements, better stability with YARN and
improved parity of the Scala and Python API. We recommend all 0.9.0
users to upgrade to this stable release.

This is the first release since Spark graduated as a top level Apache
project. Contributions to this release came from 37 developers.

The full release notes are at:
http://spark.apache.org/releases/spark-release-0-9-1.html

You can download the release at:
http://spark.apache.org/downloads.html

Thanks all the developers who contributed to this release:
Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng

TD


Re: Spark operators on Objects

2014-04-09 Thread Flavio Pompermaier
Any help about this...?
On Apr 9, 2014 9:19 AM, "Flavio Pompermaier"  wrote:

> Hi to everybody,
>
> In my current scenario I have complex objects stored as xml in an HBase
> Table.
> What's the best strategy to work with them? My final goal would be to
> define operators on those objects (like filter, equals, append, join,
> merge, etc) and then work with multiple RDDs to perform some kind of
> comparison between those objects. What do you suggest me? Is it possible?
>
> Best,
> Flavio
>
>


Re: Spark packaging

2014-04-09 Thread Pradeep baji
Thanks Prabeesh.


On Wed, Apr 9, 2014 at 12:37 AM, prabeesh k  wrote:

> Please refer
>
> http://prabstechblog.blogspot.in/2014/04/creating-single-jar-for-spark-project.html
>
> Regards,
> prabeesh
>
>
> On Wed, Apr 9, 2014 at 1:04 PM, Pradeep baji 
> wrote:
>
>> Hi all,
>>
>> I am new to spark and trying to learn it. Is there any document which
>> describes how spark is packaged. ( like dependencies needed to build spark,
>> which jar contains what after build etc)
>>
>> Thanks for the help.
>>
>> Regards,
>> Pradeep
>>
>>
>


is it possible to initiate Spark jobs from Oozie?

2014-04-09 Thread Segerlind, Nathan L
Howdy.

Is it possible to initiate Spark jobs from Oozie (presumably as a java action)? 
If so, are there known limitations to this?  And would anybody have a pointer 
to an example?

Thanks,
Nate



Re: Spark Disk Usage

2014-04-09 Thread Surendranauth Hiraman
Andrew,

Thanks a lot for the pointer to the code! This has answered my question.

Looks like it tries to write it to memory first and then if it doesn't fit,
it spills to disk. I'll have to dig in more to figure out the details.

-Suren



On Wed, Apr 9, 2014 at 12:46 PM, Andrew Ash  wrote:

> The groupByKey would be aware of the subsequent persist -- that's part of
> the reason why operations are lazy.  As far as whether it's materialized in
> memory first and then flushed to disk vs streamed to disk I'm not sure the
> exact behavior.
>
> What I'd expect to happen would be that the RDD is materialized in memory
> up until it fills up the BlockManager.  At that point it starts spilling
> blocks out to disk in order to keep from OOM'ing.  I'm not sure if new
> blocks go straight to disk or if the BlockManager pages already-existing
> blocks out in order to make room for new blocks.
>
> You can always read through source to figure it out though!
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L588
>
>
>
>
> On Wed, Apr 9, 2014 at 6:52 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> Yes, MEMORY_AND_DISK.
>>
>> We do a groupByKey and then call persist on the resulting RDD. So I'm
>> wondering if groupByKey is aware of the subsequent persist setting to use
>> disk or just creates the Seq[V] in memory and only uses disk after that
>> data structure is fully realized in memory.
>>
>> -Suren
>>
>>
>>
>> On Wed, Apr 9, 2014 at 9:46 AM, Andrew Ash  wrote:
>>
>>> Which persistence level are you talking about? MEMORY_AND_DISK ?
>>>
>>> Sent from my mobile phone
>>> On Apr 9, 2014 2:28 PM, "Surendranauth Hiraman" 
>>> wrote:
>>>
 Thanks, Andrew. That helps.

 For 1, it sounds like the data for the RDD is held in memory and then
 only written to disk after the entire RDD has been realized in memory. Is
 that correct?

 -Suren



 On Wed, Apr 9, 2014 at 9:25 AM, Andrew Ash wrote:

> For 1, persist can be used to save an RDD to disk using the various
> persistence levels.  When a persistency level is set on an RDD, when that
> RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
> re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
> use the cached value.
>
>
> https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence
>
> 2. The other places disk is used most commonly is shuffles.  If you
> have data across the cluster that comes from a source, then you might not
> have to hold it all in memory at once.  But if you do a shuffle, which
> scatters the data across the cluster in a certain way, then you have to
> have the memory/disk available for that RDD all at once.  In that case,
> shuffles will sometimes need to spill over to disk for large RDDs, which
> can be controlled with the spark.shuffle.spill setting.
>
> Does that help clarify?
>
>
> On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> It might help if I clarify my questions. :-)
>>
>> 1. Is persist() applied during the transformation right before the
>> persist() call in the graph? Or is is applied after the transform's
>> processing is complete? In the case of things like GroupBy, is the Seq
>> backed by disk as it is being created? We're trying to get a sense of how
>> the processing is handled behind the scenes with respect to disk.
>>
>> 2. When else is disk used internally?
>>
>> Any pointers are appreciated.
>>
>> -Suren
>>
>>
>>
>>
>> On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> Hi,
>>>
>>> Any thoughts on this? Thanks.
>>>
>>> -Suren
>>>
>>>
>>>
>>> On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
>>> suren.hira...@velos.io> wrote:
>>>
 Hi,

 I know if we call persist with the right options, we can have Spark
 persist an RDD's data on disk.

 I am wondering what happens in intermediate operations that could
 conceivably create large collections/Sequences, like GroupBy and 
 shuffling.

 Basically, one part of the question is when is disk used internally?

 And is calling persist() on the RDD returned by such
 transformations what let's it know to use disk in those situations? 
 Trying
 to understand if persist() is applied during the transformation or 
 after it.

 Thank you.


 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646

KafkaInputDStream Stops reading new messages

2014-04-09 Thread Kanwaldeep
Spark Streaming job was running on two worker nodes and then there was an
error on one of the nodes. The spark job showed running but no progress was
being made and not processing any new messages. Based on the driver log
files I see the following errors. 

I would expect the stream reading would be retried and continue processing
new messages. Is there any configuration that I could be missing.

System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator",
"com.snc.sinet.streaming.StreamAggregatorKryoRegistrator")
System.setProperty("spark.local.dir",
Configuration.streamingConfig.localDir)
System.setProperty("spark.ui.port",
Configuration.streamingConfig.uiPort.toString)


2014-04-05 18:22:26,507 ERROR akka.remote.EndpointWriter
spark-akka.actor.default-dispatcher-3 -
AssociationError
[akka.tcp://sp...@hclient01.sea1.service-now.com:49048] <-
[akka.tcp://sp...@hclient02.sea1.service-now.com:50888]: Error [Shut down
address: akka.tcp://sp...@hclient02.sea1.service-now.com:50888] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://sp...@hclient02.sea1.service-now.com:50888
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@hclient02.sea1.service-now.com:47512]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: hclient02.sea1.service-now.com/10.196.32.78:47512
]

2014-04-05 18:21:52,893 WARN  o.a.spark.scheduler.TaskSetManager  -
Loss was due to
java.lang.IllegalStateException
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2418)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1912)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:64)
at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-Stops-reading-new-messages-tp4016.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using ProtoBuf 2.5 for messages with Spark Streaming

2014-04-09 Thread Kanwaldeep
Any update on this? We are still facing this issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-ProtoBuf-2-5-for-messages-with-Spark-Streaming-tp3396p4015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark RDD to Shark table IN MEMORY conversion

2014-04-09 Thread abhietc31
Never mind...plz return it later with interest  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-to-Shark-table-IN-MEMORY-conversion-tp3682p4014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


cannot run spark shell in yarn-client mode

2014-04-09 Thread Pennacchiotti, Marco
I am pretty new to Spark and I am trying to run the spark shell on a Yarn 
cluster from the cli (in yarn-client mode). I am able to start the shell with 
the following command:

SPARK_JAR=../spark-0.9.0-incubating/jars/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
 \ SPARK_YARN_APP_JAR=emptyfile \
SPARK_WORKER_INSTANCES=10 \
SPARK_YARN_QUEUE=hdmi-others \
MASTER=yarn-client \
ADD_JARS="lib/avro-mapred-1.7.6-hadoop2.jar,schemas-java.jar" \
SPARK_CLASSPATH="lib/avro-mapred-1.7.6-hadoop2.jar:schemas-java.jar" \
SPARK_WORKER_MEMORY=512M \
SPARK_MASTER_MEMORY=512M \
../spark-0.9.0-incubating/bin/spark-shell

However, as soon as I try to execute an action that requires workers to execute 
on the cluster's machines I get this WARN message in the spark shell:

14/04/08 17:55:59 WARN YarnClientClusterScheduler: Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient memory

When I look a the log of my application master on the webUI I see this:

14/04/08 17:55:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/08 17:55:09 INFO Remoting: Starting remoting
14/04/08 17:55:10 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkYarnAM@machine:35023]
14/04/08 17:55:10 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkYarnAM@machine:35023]
14/04/08 17:55:11 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
14/04/08 17:55:11 INFO client.RMProxy: Connecting to ResourceManager at 
cluster:8030
14/04/08 17:55:11 INFO yarn.WorkerLauncher: ApplicationAttemptId: 
appattempt_1394582929977_173315_01
14/04/08 17:55:11 INFO yarn.WorkerLauncher: Registering the ApplicationMaster
14/04/08 17:55:11 INFO yarn.WorkerLauncher: Waiting for Spark driver to be 
reachable.
**14/04/08 17:56:14 ERROR yarn.WorkerLauncher: Failed to connect to driver at 
machine:59281, retrying ...
14/04/08 17:57:17 ERROR yarn.WorkerLauncher: Failed to connect to driver at 
machine:59281, retrying ...
14/04/08 17:58:20 ERROR yarn.WorkerLauncher: Failed to connect to driver at 
machine:59281, retrying ...

It looks like the master cannot connect to a worker, but I have no idea why 
this is happening.
I didn't find any answer to this issue in the forum. Any idea of what the 
problem may be?

Thanks,
Marco


Re: Why doesn't the driver node do any work?

2014-04-09 Thread Mayur Rustagi
Also Driver can run on one of the slave nodes. (you will stil need a spark
master though for resource allocation etc).
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Apr 8, 2014 at 2:46 PM, Nan Zhu  wrote:

>  may be unrelated to the question itself, just FYI
>
> you can run your driver program in worker node with Spark-0.9
>
>
> http://spark.apache.org/docs/latest/spark-standalone.html#launching-applications-inside-the-cluster
>
> Best,
>
> --
> Nan Zhu
>
>
> On Tuesday, April 8, 2014 at 5:11 PM, Nicholas Chammas wrote:
>
> Alright, so I guess I understand now why spark-ec2 allows you to select
> different instance types for the driver node and worker nodes. If the
> driver node is just driving and not doing any large collect()s or heavy
> processing, it can be much smaller than the worker nodes.
>
> With regards to data locality, that may not be an issue in my usage
> pattern if, in theory, I wanted to make the driver node also do work. I
> launch clusters using spark-ec2 and source data from S3, so I'm missing out
> on that data locality benefit from the get-go. The firewall may be an issue
> if spark-ec2 doesn't punch open the appropriate holes. And it may well not,
> since it doesn't seem to have an option to configure the driver node to
> also do work.
>
> Anyway, I'll definitely leave things the way they are. If I want a beefier
> cluster, it's probably much easier to just launch a cluster with more
> slaves using spark-ec2 than it is to set the driver node to a non-default
> configuration.
>
>
> On Tue, Apr 8, 2014 at 4:48 PM, Sean Owen  wrote:
>
> If you want the machine that hosts the driver to also do work, you can
> designate it as a worker too, if I'm not mistaken. I don't think the
> driver should do work, logically, but, that's not to say that the
> machine it's on shouldn't do work.
> --
> Sean Owen | Director, Data Science | London
>
>
> On Tue, Apr 8, 2014 at 8:24 PM, Nicholas Chammas
>  wrote:
> > So I have a cluster in EC2 doing some work, and when I take a look here
> >
> > http://driver-node:4040/executors/
> >
> > I see that my driver node is snoozing on the job: No tasks, no memory
> used,
> > and no RDD blocks cached.
> >
> > I'm assuming that it was a conscious design choice not to have the driver
> > node partake in the cluster's workload.
> >
> > Why is that? It seems like a wasted resource.
> >
> > What's more, the slaves may rise up one day and overthrow the driver out
> of
> > resentment.
> >
> > Nick
> >
> >
> > 
> > View this message in context: Why doesn't the driver node do any work?
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
>


Re: Spark RDD to Shark table IN MEMORY conversion

2014-04-09 Thread Mayur Rustagi
Not right now. Like the pitch though
Open new horizons for In-memory analysis..
mind if I borrow that :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Apr 8, 2014 at 8:36 PM, abhietc31  wrote:

> Anybody, please help for abov e query.
> It's challanging but will open new horizon for In-Memory analysis.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-to-Shark-table-IN-MEMORY-conversion-tp3682p3968.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: hbase scan performance

2014-04-09 Thread Jerry Lam
Hi Dave,

This is HBase solution to the poor scan performance issue:
https://issues.apache.org/jira/browse/HBASE-8369

I encountered the same issue before.
To the best of my knowledge, this is not a mapreduce issue. It is hbase
issue. If you are planning to swap out mapreduce and replace it with spark,
I don't think you can get a lot of performance from scanning HBase unless
you are talking about caching the results from HBase in spark and reuse it
over and over.

HTH,

Jerry


On Wed, Apr 9, 2014 at 12:02 PM, David Quigley  wrote:

> Hi all,
>
> We are currently using hbase to store user data and periodically doing a
> full scan to aggregate data. The reason we use hbase is that we need a
> single user's data to be contiguous, so as user data comes in, we need the
> ability to update a random access store.
>
> The performance of a full hbase scan with MapReduce is frustratingly slow,
> despite implementing recommended optimizations. I see that it is possible
> to scan hbase with Spark, but am not familiar with how Spark interfaces
> with hbase. Would you expect the scan to perform similarly if used as a
> Spark input as a MapReduce input?
>
> Thanks,
> Dave
>


How to change the parallelism level of input dstreams

2014-04-09 Thread Dong Mo
 Dear list,

A quick question about spark streaming:

Say I have this stage set up in my Spark Streaming cluster:

batched TCP stream ==> map(expensive computation) ===> ReduceByKey

I know I can set the number of tasks for ReduceByKey.

But I didn't find a place to specify the parallelism for the input
dstream(RDD sequence generated after the TCP stream). Do I need to
explicitly call repartition() to split the input RDD streams into many
parititions? If that is the case, what is the mechanism used to split the
RDD stream? Random fully reparation on each (K,V) pair (effectively a
shuffle) or more like rebalance?
And what is the default parallelism level for input stream?

Thank you so much
-Mo


Re: Spark Disk Usage

2014-04-09 Thread Andrew Ash
The groupByKey would be aware of the subsequent persist -- that's part of
the reason why operations are lazy.  As far as whether it's materialized in
memory first and then flushed to disk vs streamed to disk I'm not sure the
exact behavior.

What I'd expect to happen would be that the RDD is materialized in memory
up until it fills up the BlockManager.  At that point it starts spilling
blocks out to disk in order to keep from OOM'ing.  I'm not sure if new
blocks go straight to disk or if the BlockManager pages already-existing
blocks out in order to make room for new blocks.

You can always read through source to figure it out though!

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L588




On Wed, Apr 9, 2014 at 6:52 AM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> Yes, MEMORY_AND_DISK.
>
> We do a groupByKey and then call persist on the resulting RDD. So I'm
> wondering if groupByKey is aware of the subsequent persist setting to use
> disk or just creates the Seq[V] in memory and only uses disk after that
> data structure is fully realized in memory.
>
> -Suren
>
>
>
> On Wed, Apr 9, 2014 at 9:46 AM, Andrew Ash  wrote:
>
>> Which persistence level are you talking about? MEMORY_AND_DISK ?
>>
>> Sent from my mobile phone
>> On Apr 9, 2014 2:28 PM, "Surendranauth Hiraman" 
>> wrote:
>>
>>> Thanks, Andrew. That helps.
>>>
>>> For 1, it sounds like the data for the RDD is held in memory and then
>>> only written to disk after the entire RDD has been realized in memory. Is
>>> that correct?
>>>
>>> -Suren
>>>
>>>
>>>
>>> On Wed, Apr 9, 2014 at 9:25 AM, Andrew Ash  wrote:
>>>
 For 1, persist can be used to save an RDD to disk using the various
 persistence levels.  When a persistency level is set on an RDD, when that
 RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
 re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
 use the cached value.


 https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence

 2. The other places disk is used most commonly is shuffles.  If you
 have data across the cluster that comes from a source, then you might not
 have to hold it all in memory at once.  But if you do a shuffle, which
 scatters the data across the cluster in a certain way, then you have to
 have the memory/disk available for that RDD all at once.  In that case,
 shuffles will sometimes need to spill over to disk for large RDDs, which
 can be controlled with the spark.shuffle.spill setting.

 Does that help clarify?


 On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
 suren.hira...@velos.io> wrote:

> It might help if I clarify my questions. :-)
>
> 1. Is persist() applied during the transformation right before the
> persist() call in the graph? Or is is applied after the transform's
> processing is complete? In the case of things like GroupBy, is the Seq
> backed by disk as it is being created? We're trying to get a sense of how
> the processing is handled behind the scenes with respect to disk.
>
> 2. When else is disk used internally?
>
> Any pointers are appreciated.
>
> -Suren
>
>
>
>
> On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> Hi,
>>
>> Any thoughts on this? Thanks.
>>
>> -Suren
>>
>>
>>
>> On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> Hi,
>>>
>>> I know if we call persist with the right options, we can have Spark
>>> persist an RDD's data on disk.
>>>
>>> I am wondering what happens in intermediate operations that could
>>> conceivably create large collections/Sequences, like GroupBy and 
>>> shuffling.
>>>
>>> Basically, one part of the question is when is disk used internally?
>>>
>>> And is calling persist() on the RDD returned by such transformations
>>> what let's it know to use disk in those situations? Trying to 
>>> understand if
>>> persist() is applied during the transformation or after it.
>>>
>>> Thank you.
>>>
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v elos.io
>>> W: www.velos.io
>>>
>>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP T

Re: How does Spark handle RDD via HDFS ?

2014-04-09 Thread Andrew Ash
The typical way to handle that use case would be to join the 3 files
together into one RDD and then do the factorization on that.  There will
definitely be network traffic during the initial join to get everything
into one table, and after that there will likely be more network traffic
for various shuffle joins that are needed.

So for your individual questions:

1) it's not so much that your algorithm is executed on each node, but that
the algorithm is executed on the cluster and small component tasks are run
on each node.  Each step in the algorithm happens across the whole cluster
concurrently.
2) network communication will be needed to move data around during the
algorithm.  I'm not familiar with MF, but if you need the whole dataset on
a machine at once for some reason, then you'll have lots of network
computation
3) This question sounds a bit like pre-fetching -- does Spark start loading
data that it will need "soon" but does not need now?  That does not happen,
and Spark RDDs are actually lazy (technical term) in that no computation
happens until the end.  Think of that like stacking operations on top of
data.  Stacking operations without evaluating them is cheap, and then once
you evaluate the whole thing it can do special pipelining stuff that is
able to keep the resident memory set lower.

Does that help?

Andrew



On Wed, Apr 9, 2014 at 9:05 AM, gtanguy wrote:

> Hello everybody,
>
> I am wondering how Spark handles via HDFS his RDD, what if during a map
> phase I need data which are not present locally?
>
> What I am working on :
> I am working on a recommendation algorithm : Matrix Factorization (MF)
> using
> a stochastic gradient as optimizer. For now my algorithm works locally but
> to anticipate further needs I would like to parallelized it using spark
> 0.9.0 on HDFS (without yarn).
> I saw the regression logistic (RL) SGD example in the MLibs. Matrix
> Factorization can be view as multiple regression logistic iteration, so I
> will follow the example to implement it. The only difference is : my
> dataset
> is composed by 3 files :
> User.csv -> (UserID age sex..)
> Item.csv -> (ItemID color size..)
> Obs.csv -> (UserID, ItemID, ratings)
>
> What I understand :
> In the RL example we have only the 'Obs.csv' file. Given that we have 3
> machines, the file will be divided on 3 machines, during the map phase, the
> RL algorithm will be respectively executed on the 3 slaves with local data.
> So each RL will process 1/3 of the data. During the reduce phase, we just
> average the result returned by each slave. No network communication is
> needed during the RL process except the reduce step. All the data during
> map
> phase used/needed are local.
>
> What I am wondering :
> In my case my MF needs on each machine all the informations of the
> 'User.csv' file, 1/3 'Item.csv' file and 1/3 obs.csv   to operate. When
> HDFS
> distributes my 3 files, I will have 1/3 of each file on each datanode.
> 1.What will happen when my MF algorithm is executed on each node?
> 2.Network communications will be needed for the 2/3 of the user.csv, right?
> 3.Will network communications be optimized as following :
> During an actual computation does the data needed for the next computation
> will be loaded (so that the time taking by the network communication won't
> affect the computation time)?
>
> Any help is highly appreciated.
>
> Best regards,
>
> Germain Tanguy.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-handle-RDD-via-HDFS-tp4003.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Marco Costantini
Perfect. Now I know what to do. Thanks to your help!

Many thanks,
Marco.


On Wed, Apr 9, 2014 at 12:27 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> The AMI should automatically switch between PVM and HVM based on the
> instance type you specify on the command line. For reference (note you
> don't need to specify this on the command line), the PVM ami id
> is ami-5bb18832 in us-east-1.
>
> FWIW we maintain the list of AMI Ids (across regions and pvm, hvm) at
> https://github.com/mesos/spark-ec2/tree/v2/ami-list
>
> Thanks
> Shivaram
>
>
> On Wed, Apr 9, 2014 at 9:12 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> Ah, tried that. I believe this is an HVM AMI? We are exploring
>> paravirtual AMIs.
>>
>>
>> On Wed, Apr 9, 2014 at 11:17 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> And for the record, that AMI is ami-35b1885c. Again, you don't need to
>>> specify it explicitly; spark-ec2 will default to it.
>>>
>>>
>>> On Wed, Apr 9, 2014 at 11:08 AM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 Marco,

 If you call spark-ec2 launch without specifying an AMI, it will default
 to the Spark-provided AMI.

 Nick


 On Wed, Apr 9, 2014 at 9:43 AM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> Hi there,
> To answer your question; no there is no reason NOT to use an AMI that
> Spark has prepared. The reason we haven't is that we were not aware such
> AMIs existed. Would you kindly point us to the documentation where we can
> read about this further?
>
> Many many thanks, Shivaram.
> Marco.
>
>
> On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> Is there any reason why you want to start with a vanilla amazon AMI
>> rather than the ones we build and provide as a part of Spark EC2 scripts 
>> ?
>> The AMIs we provide are close to the vanilla AMI but have the root 
>> account
>> setup properly and install packages like java that are used by Spark.
>>
>> If you wish to customize the AMI, you could always start with our AMI
>> and add more packages you like -- I have definitely done this recently 
>> and
>> it works with HVM and PVM as far as I can tell.
>>
>> Shivaram
>>
>>
>> On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> I was able to keep the "workaround" ...around... by overwriting the
>>> generated '/root/.ssh/authorized_keys' file with a known good one, in 
>>> the
>>> '/etc/rc.local' file
>>>
>>>
>>> On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 Another thing I didn't mention. The AMI and user used: naturally
 I've created several of my own AMIs with the following characteristics.
 None of which worked.

 1) Enabling ssh as root as per this guide (
 http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
 When doing this, I do not specify a user for the spark-ec2 script. What
 happens is that, it works! But only while it's alive. If I stop the
 instance, create an AMI, and launch a new instance based from the new 
 AMI,
 the change I made in the '/root/.ssh/authorized_keys' file is 
 overwritten

 2) adding the 'ec2-user' to the 'root' group. This means that the
 ec2-user does not have to use sudo to perform any operations needing 
 root
 privilidges. When doing this, I specify the user 'ec2-user' for the
 spark-ec2 script. An error occurs: rsync fails with exit code 23.

 I believe HVMs still work. But it would be valuable to the
 community to know that the root user work-around does/doesn't work any 
 more
 for paravirtual instances.

 Thanks,
 Marco.


 On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> As requested, here is the script I am running. It is a simple
> shell script which calls spark-ec2 wrapper script. I execute it from 
> the
> 'ec2' directory of spark, as usual. The AMI used is the raw one from 
> the
> AWS Quick Start section. It is the first option (an Amazon Linux
> paravirtual image). Any ideas or confirmation would be GREATLY 
> appreciated.
> Please and thank you.
>
>
> #!/bin/sh
>
> export AWS_ACCESS_KEY_ID=MyCensoredKey
> export AWS_SECRET_ACCESS_KEY=MyCensoredKey
>
> AMI_ID=ami-2f726546
>
> ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user
> -s 10 -v 0.9.0

Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Shivaram Venkataraman
The AMI should automatically switch between PVM and HVM based on the
instance type you specify on the command line. For reference (note you
don't need to specify this on the command line), the PVM ami id
is ami-5bb18832 in us-east-1.

FWIW we maintain the list of AMI Ids (across regions and pvm, hvm) at
https://github.com/mesos/spark-ec2/tree/v2/ami-list

Thanks
Shivaram


On Wed, Apr 9, 2014 at 9:12 AM, Marco Costantini <
silvio.costant...@granatads.com> wrote:

> Ah, tried that. I believe this is an HVM AMI? We are exploring paravirtual
> AMIs.
>
>
> On Wed, Apr 9, 2014 at 11:17 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> And for the record, that AMI is ami-35b1885c. Again, you don't need to
>> specify it explicitly; spark-ec2 will default to it.
>>
>>
>> On Wed, Apr 9, 2014 at 11:08 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Marco,
>>>
>>> If you call spark-ec2 launch without specifying an AMI, it will default
>>> to the Spark-provided AMI.
>>>
>>> Nick
>>>
>>>
>>> On Wed, Apr 9, 2014 at 9:43 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 Hi there,
 To answer your question; no there is no reason NOT to use an AMI that
 Spark has prepared. The reason we haven't is that we were not aware such
 AMIs existed. Would you kindly point us to the documentation where we can
 read about this further?

 Many many thanks, Shivaram.
 Marco.


 On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
 shiva...@eecs.berkeley.edu> wrote:

> Is there any reason why you want to start with a vanilla amazon AMI
> rather than the ones we build and provide as a part of Spark EC2 scripts ?
> The AMIs we provide are close to the vanilla AMI but have the root account
> setup properly and install packages like java that are used by Spark.
>
> If you wish to customize the AMI, you could always start with our AMI
> and add more packages you like -- I have definitely done this recently and
> it works with HVM and PVM as far as I can tell.
>
> Shivaram
>
>
> On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> I was able to keep the "workaround" ...around... by overwriting the
>> generated '/root/.ssh/authorized_keys' file with a known good one, in the
>> '/etc/rc.local' file
>>
>>
>> On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> Another thing I didn't mention. The AMI and user used: naturally
>>> I've created several of my own AMIs with the following characteristics.
>>> None of which worked.
>>>
>>> 1) Enabling ssh as root as per this guide (
>>> http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
>>> When doing this, I do not specify a user for the spark-ec2 script. What
>>> happens is that, it works! But only while it's alive. If I stop the
>>> instance, create an AMI, and launch a new instance based from the new 
>>> AMI,
>>> the change I made in the '/root/.ssh/authorized_keys' file is 
>>> overwritten
>>>
>>> 2) adding the 'ec2-user' to the 'root' group. This means that the
>>> ec2-user does not have to use sudo to perform any operations needing 
>>> root
>>> privilidges. When doing this, I specify the user 'ec2-user' for the
>>> spark-ec2 script. An error occurs: rsync fails with exit code 23.
>>>
>>> I believe HVMs still work. But it would be valuable to the community
>>> to know that the root user work-around does/doesn't work any more for
>>> paravirtual instances.
>>>
>>> Thanks,
>>> Marco.
>>>
>>>
>>> On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 As requested, here is the script I am running. It is a simple shell
 script which calls spark-ec2 wrapper script. I execute it from the 
 'ec2'
 directory of spark, as usual. The AMI used is the raw one from the AWS
 Quick Start section. It is the first option (an Amazon Linux 
 paravirtual
 image). Any ideas or confirmation would be GREATLY appreciated. Please 
 and
 thank you.


 #!/bin/sh

 export AWS_ACCESS_KEY_ID=MyCensoredKey
 export AWS_SECRET_ACCESS_KEY=MyCensoredKey

 AMI_ID=ami-2f726546

 ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user -s
 10 -v 0.9.0 -w 300 --no-ganglia -a ${AMI_ID} -m m3.2xlarge -t 
 m3.2xlarge
 launch marcotest



 On Mon, Apr 7, 2014 at 6:21 PM, Shivaram Venkataraman <
 shivaram.venkatara...@gmail.com> wrote:

> Hmm -- That is strange. Can you paste the command you are using to
> launch the in

Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Marco Costantini
Ah, tried that. I believe this is an HVM AMI? We are exploring paravirtual
AMIs.


On Wed, Apr 9, 2014 at 11:17 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> And for the record, that AMI is ami-35b1885c. Again, you don't need to
> specify it explicitly; spark-ec2 will default to it.
>
>
> On Wed, Apr 9, 2014 at 11:08 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Marco,
>>
>> If you call spark-ec2 launch without specifying an AMI, it will default
>> to the Spark-provided AMI.
>>
>> Nick
>>
>>
>> On Wed, Apr 9, 2014 at 9:43 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> Hi there,
>>> To answer your question; no there is no reason NOT to use an AMI that
>>> Spark has prepared. The reason we haven't is that we were not aware such
>>> AMIs existed. Would you kindly point us to the documentation where we can
>>> read about this further?
>>>
>>> Many many thanks, Shivaram.
>>> Marco.
>>>
>>>
>>> On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>
 Is there any reason why you want to start with a vanilla amazon AMI
 rather than the ones we build and provide as a part of Spark EC2 scripts ?
 The AMIs we provide are close to the vanilla AMI but have the root account
 setup properly and install packages like java that are used by Spark.

 If you wish to customize the AMI, you could always start with our AMI
 and add more packages you like -- I have definitely done this recently and
 it works with HVM and PVM as far as I can tell.

 Shivaram


 On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> I was able to keep the "workaround" ...around... by overwriting the
> generated '/root/.ssh/authorized_keys' file with a known good one, in the
> '/etc/rc.local' file
>
>
> On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> Another thing I didn't mention. The AMI and user used: naturally I've
>> created several of my own AMIs with the following characteristics. None 
>> of
>> which worked.
>>
>> 1) Enabling ssh as root as per this guide (
>> http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
>> When doing this, I do not specify a user for the spark-ec2 script. What
>> happens is that, it works! But only while it's alive. If I stop the
>> instance, create an AMI, and launch a new instance based from the new 
>> AMI,
>> the change I made in the '/root/.ssh/authorized_keys' file is overwritten
>>
>> 2) adding the 'ec2-user' to the 'root' group. This means that the
>> ec2-user does not have to use sudo to perform any operations needing root
>> privilidges. When doing this, I specify the user 'ec2-user' for the
>> spark-ec2 script. An error occurs: rsync fails with exit code 23.
>>
>> I believe HVMs still work. But it would be valuable to the community
>> to know that the root user work-around does/doesn't work any more for
>> paravirtual instances.
>>
>> Thanks,
>> Marco.
>>
>>
>> On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> As requested, here is the script I am running. It is a simple shell
>>> script which calls spark-ec2 wrapper script. I execute it from the 'ec2'
>>> directory of spark, as usual. The AMI used is the raw one from the AWS
>>> Quick Start section. It is the first option (an Amazon Linux paravirtual
>>> image). Any ideas or confirmation would be GREATLY appreciated. Please 
>>> and
>>> thank you.
>>>
>>>
>>> #!/bin/sh
>>>
>>> export AWS_ACCESS_KEY_ID=MyCensoredKey
>>> export AWS_SECRET_ACCESS_KEY=MyCensoredKey
>>>
>>> AMI_ID=ami-2f726546
>>>
>>> ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user -s
>>> 10 -v 0.9.0 -w 300 --no-ganglia -a ${AMI_ID} -m m3.2xlarge -t m3.2xlarge
>>> launch marcotest
>>>
>>>
>>>
>>> On Mon, Apr 7, 2014 at 6:21 PM, Shivaram Venkataraman <
>>> shivaram.venkatara...@gmail.com> wrote:
>>>
 Hmm -- That is strange. Can you paste the command you are using to
 launch the instances ? The typical workflow is to use the spark-ec2 
 wrapper
 script using the guidelines at
 http://spark.apache.org/docs/latest/ec2-scripts.html

 Shivaram


 On Mon, Apr 7, 2014 at 1:53 PM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> Hi Shivaram,
>
> OK so let's assume the script CANNOT take a different user and
> that it must be 'root'. The typical workaround is as you said, allow 
> the
> ssh with the root user. Now, don't laugh, but, this worked last 
> Frida

How does Spark handle RDD via HDFS ?

2014-04-09 Thread gtanguy
Hello everybody,

I am wondering how Spark handles via HDFS his RDD, what if during a map
phase I need data which are not present locally?

What I am working on :
I am working on a recommendation algorithm : Matrix Factorization (MF) using
a stochastic gradient as optimizer. For now my algorithm works locally but
to anticipate further needs I would like to parallelized it using spark
0.9.0 on HDFS (without yarn).
I saw the regression logistic (RL) SGD example in the MLibs. Matrix
Factorization can be view as multiple regression logistic iteration, so I
will follow the example to implement it. The only difference is : my dataset
is composed by 3 files : 
User.csv -> (UserID age sex..) 
Item.csv -> (ItemID color size..) 
Obs.csv -> (UserID, ItemID, ratings)

What I understand :
In the RL example we have only the 'Obs.csv' file. Given that we have 3
machines, the file will be divided on 3 machines, during the map phase, the
RL algorithm will be respectively executed on the 3 slaves with local data.
So each RL will process 1/3 of the data. During the reduce phase, we just
average the result returned by each slave. No network communication is
needed during the RL process except the reduce step. All the data during map
phase used/needed are local.

What I am wondering : 
In my case my MF needs on each machine all the informations of the
'User.csv' file, 1/3 'Item.csv' file and 1/3 obs.csv   to operate. When HDFS
distributes my 3 files, I will have 1/3 of each file on each datanode.  
1.What will happen when my MF algorithm is executed on each node? 
2.Network communications will be needed for the 2/3 of the user.csv, right?
3.Will network communications be optimized as following :
During an actual computation does the data needed for the next computation
will be loaded (so that the time taking by the network communication won't
affect the computation time)?

Any help is highly appreciated. 

Best regards,

Germain Tanguy.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-handle-RDD-via-HDFS-tp4003.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error when compiling spark in IDEA and best practice to use IDE?

2014-04-09 Thread Dong Mo
All of these works

Thanks
-Mo


2014-04-09 2:34 GMT-04:00 Xiangrui Meng :

> After sbt/sbt gen-diea, do not import as an SBT project but choose
> "open project" and point it to the spark folder. -Xiangrui
>
> On Tue, Apr 8, 2014 at 10:45 PM, Sean Owen  wrote:
> > I let IntelliJ read the Maven build directly and that works fine.
> > --
> > Sean Owen | Director, Data Science | London
> >
> >
> > On Wed, Apr 9, 2014 at 6:14 AM, Dong Mo  wrote:
> >> Dear list,
> >>
> >> SBT compiles fine, but when I do the following:
> >> sbt/sbt gen-idea
> >> import project as SBT project to IDEA 13.1
> >> Make Project
> >> and these errors show up:
> >>
> >> Error:(28, 8) object FileContext is not a member of package
> >> org.apache.hadoop.fs
> >> import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path,
> >> FileUtil}
> >>^
> >> Error:(31, 8) object Master is not a member of package
> >> org.apache.hadoop.mapred
> >> import org.apache.hadoop.mapred.Master
> >>^
> >> Error:(34, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.api._
> >>  ^
> >> Error:(35, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
> >>  ^
> >> Error:(36, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.api.protocolrecords._
> >>  ^
> >> Error:(37, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.api.records._
> >>  ^
> >> Error:(38, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.client.YarnClientImpl
> >>  ^
> >> Error:(39, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.conf.YarnConfiguration
> >>  ^
> >> Error:(40, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.ipc.YarnRPC
> >>  ^
> >> Error:(41, 26) object yarn is not a member of package org.apache.hadoop
> >> import org.apache.hadoop.yarn.util.{Apps, Records}
> >>  ^
> >> Error:(49, 11) not found: type YarnClientImpl
> >>   extends YarnClientImpl with Logging {
> >>   ^
> >> Error:(48, 20) not found: type ClientArguments
> >> class Client(args: ClientArguments, conf: Configuration, sparkConf:
> >> SparkConf)
> >>^
> >> Error:(51, 18) not found: type ClientArguments
> >>   def this(args: ClientArguments, sparkConf: SparkConf) =
> >>  ^
> >> Error:(54, 18) not found: type ClientArguments
> >>   def this(args: ClientArguments) = this(args, new SparkConf())
> >>  ^
> >> Error:(56, 12) not found: type YarnRPC
> >>   var rpc: YarnRPC = YarnRPC.create(conf)
> >>^
> >> Error:(56, 22) not found: value YarnRPC
> >>   var rpc: YarnRPC = YarnRPC.create(conf)
> >>  ^
> >> Error:(57, 17) not found: type YarnConfiguration
> >>   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
> >> ^
> >> Error:(57, 41) not found: type YarnConfiguration
> >>   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
> >> ^
> >> Error:(58, 59) value getCredentials is not a member of
> >> org.apache.hadoop.security.UserGroupInformation
> >>   val credentials =
> UserGroupInformation.getCurrentUser().getCredentials()
> >>   ^
> >> Error:(60, 34) not found: type ClientDistributedCacheManager
> >>   private val distCacheMgr = new ClientDistributedCacheManager()
> >>  ^
> >> Error:(72, 5) not found: value init
> >> init(yarnConf)
> >> ^
> >> Error:(73, 5) not found: value start
> >> start()
> >> ^
> >> Error:(76, 24) value getNewApplication is not a member of
> >> org.apache.spark.Logging
> >> val newApp = super.getNewApplication()
> >>^
> >> Error:(137, 35) not found: type GetNewApplicationResponse
> >>   def verifyClusterResources(app: GetNewApplicationResponse) = {
> >>   ^
> >> Error:(156, 65) not found: type ApplicationSubmissionContext
> >>   def createApplicationSubmissionContext(appId: ApplicationId):
> >> ApplicationSubmissionContext = {
> >> ^
> >> Error:(156, 49) not found: type ApplicationId
> >>   def createApplicationSubmissionContext(appId: ApplicationId):
> >> ApplicationSubmissionContext = {
> >> ^
> >> Error:(118, 31) not found: type ApplicationId
> >>   def getAppStagingDir(appId: ApplicationId): String = {
> >>   ^
> >> Error:(224, 69) not found: type LocalResource
> >>   

hbase scan performance

2014-04-09 Thread David Quigley
Hi all,

We are currently using hbase to store user data and periodically doing a
full scan to aggregate data. The reason we use hbase is that we need a
single user's data to be contiguous, so as user data comes in, we need the
ability to update a random access store.

The performance of a full hbase scan with MapReduce is frustratingly slow,
despite implementing recommended optimizations. I see that it is possible
to scan hbase with Spark, but am not familiar with how Spark interfaces
with hbase. Would you expect the scan to perform similarly if used as a
Spark input as a MapReduce input?

Thanks,
Dave


executors not registering with the driver

2014-04-09 Thread azurecoder
Up until last week we had no problems running a Spark standalone cluster. We
now have a problem registering executors with the driver node in any
application. Although we can start-all and see the worker on 8080 no
executors are registered with the blockmanager.

The feedback we have is scant but we're getting stuff like this suggesting
it's a name resolution issue of some kind:

14/04/09 08:22:58 INFO Master: akka.tcp://spark@Spark0:51214 got
disassociated, removing it.
14/04/09 08:22:58 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkMaster@100.92.60.69:7077] ->
[akka.tcp://spark@Spark0:51214]: Error [Association failed with
[akka.tcp://spark@Spark0:51214]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@Spark0:51214]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: Spark0/100.92.60.69:51214

any insight would be helpful?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/executors-not-registering-with-the-driver-tp4000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


What level of parallelism should I expect from my cluster?

2014-04-09 Thread Nicholas Chammas
When you click on a stage in the Spark UI at 4040, you can see how many
tasks are running concurrently.

How many tasks should I expect to see running concurrently, given I have
things set up optimally in my cluster, and my RDDs are partitioned properly?

Is it the total number of virtual cores across all my slaves?

I devised the following script to give me that number for a cluster created
by spark-ec2.

# spark-ec2 cluster
# run on driver node
# total number of virtual cores across all slaves
yum install -y pssh
{ nproc; pssh -i -h /root/spark-ec2/slaves nproc; } | grep -v "SUCCESS" |
paste -sd+ | bc

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-level-of-parallelism-should-I-expect-from-my-cluster-tp3999.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: To Ten RDD

2014-04-09 Thread Jeyaraj, Arockia R (Arockia)
Thanks. It works for me.

From: mailforledkk [mailto:mailforle...@126.com]
Sent: Wednesday, April 09, 2014 9:16 AM
To: user
Cc: mailforledkk
Subject: Re: To Ten RDD

i see the top method in RDD class , you can use this method to get top N , but 
i found some error when i use this method  it's seem as when mapPartitions 
in top , the task result may be an Nil , then reduce will result an class cast 
exception  as blow :
java.lang.ClassCastException: scala.collection.immutable.Nil$ cannot be cast to 
org.apache.spark.util.BoundedPriorityQueue
at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:873)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:671)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:668)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

my spark version is : 0.9.0  , i don't konw is this a bug of spark ..

there are another method to get top N , you can use sortByKey method and then 
use take method to get the top N value of your RDD


mailforledkk

From: Jeyaraj, Arockia R (Arockia)
Date: 2014-04-09 21:55
To: user@spark.apache.org
Subject: To Ten RDD
Hi ,

Can you any one tell how to get Top ten RDD by value?


Thanks
Arockia Raja


Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Nicholas Chammas
And for the record, that AMI is ami-35b1885c. Again, you don't need to
specify it explicitly; spark-ec2 will default to it.


On Wed, Apr 9, 2014 at 11:08 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Marco,
>
> If you call spark-ec2 launch without specifying an AMI, it will default to
> the Spark-provided AMI.
>
> Nick
>
>
> On Wed, Apr 9, 2014 at 9:43 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> Hi there,
>> To answer your question; no there is no reason NOT to use an AMI that
>> Spark has prepared. The reason we haven't is that we were not aware such
>> AMIs existed. Would you kindly point us to the documentation where we can
>> read about this further?
>>
>> Many many thanks, Shivaram.
>> Marco.
>>
>>
>> On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> Is there any reason why you want to start with a vanilla amazon AMI
>>> rather than the ones we build and provide as a part of Spark EC2 scripts ?
>>> The AMIs we provide are close to the vanilla AMI but have the root account
>>> setup properly and install packages like java that are used by Spark.
>>>
>>> If you wish to customize the AMI, you could always start with our AMI
>>> and add more packages you like -- I have definitely done this recently and
>>> it works with HVM and PVM as far as I can tell.
>>>
>>> Shivaram
>>>
>>>
>>> On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 I was able to keep the "workaround" ...around... by overwriting the
 generated '/root/.ssh/authorized_keys' file with a known good one, in the
 '/etc/rc.local' file


 On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> Another thing I didn't mention. The AMI and user used: naturally I've
> created several of my own AMIs with the following characteristics. None of
> which worked.
>
> 1) Enabling ssh as root as per this guide (
> http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
> When doing this, I do not specify a user for the spark-ec2 script. What
> happens is that, it works! But only while it's alive. If I stop the
> instance, create an AMI, and launch a new instance based from the new AMI,
> the change I made in the '/root/.ssh/authorized_keys' file is overwritten
>
> 2) adding the 'ec2-user' to the 'root' group. This means that the
> ec2-user does not have to use sudo to perform any operations needing root
> privilidges. When doing this, I specify the user 'ec2-user' for the
> spark-ec2 script. An error occurs: rsync fails with exit code 23.
>
> I believe HVMs still work. But it would be valuable to the community
> to know that the root user work-around does/doesn't work any more for
> paravirtual instances.
>
> Thanks,
> Marco.
>
>
> On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> As requested, here is the script I am running. It is a simple shell
>> script which calls spark-ec2 wrapper script. I execute it from the 'ec2'
>> directory of spark, as usual. The AMI used is the raw one from the AWS
>> Quick Start section. It is the first option (an Amazon Linux paravirtual
>> image). Any ideas or confirmation would be GREATLY appreciated. Please 
>> and
>> thank you.
>>
>>
>> #!/bin/sh
>>
>> export AWS_ACCESS_KEY_ID=MyCensoredKey
>> export AWS_SECRET_ACCESS_KEY=MyCensoredKey
>>
>> AMI_ID=ami-2f726546
>>
>> ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user -s
>> 10 -v 0.9.0 -w 300 --no-ganglia -a ${AMI_ID} -m m3.2xlarge -t m3.2xlarge
>> launch marcotest
>>
>>
>>
>> On Mon, Apr 7, 2014 at 6:21 PM, Shivaram Venkataraman <
>> shivaram.venkatara...@gmail.com> wrote:
>>
>>> Hmm -- That is strange. Can you paste the command you are using to
>>> launch the instances ? The typical workflow is to use the spark-ec2 
>>> wrapper
>>> script using the guidelines at
>>> http://spark.apache.org/docs/latest/ec2-scripts.html
>>>
>>> Shivaram
>>>
>>>
>>> On Mon, Apr 7, 2014 at 1:53 PM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 Hi Shivaram,

 OK so let's assume the script CANNOT take a different user and that
 it must be 'root'. The typical workaround is as you said, allow the ssh
 with the root user. Now, don't laugh, but, this worked last Friday, but
 today (Monday) it no longer works. :D Why? ...

 ...It seems that NOW, when you launch a 'paravirtual' ami, the root
 user's 'authorized_keys' file is always overwritten. This means the
 workaround doesn't work anymore! I would LOVE for someone to verify 
 this.


Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Nicholas Chammas
Marco,

If you call spark-ec2 launch without specifying an AMI, it will default to
the Spark-provided AMI.

Nick


On Wed, Apr 9, 2014 at 9:43 AM, Marco Costantini <
silvio.costant...@granatads.com> wrote:

> Hi there,
> To answer your question; no there is no reason NOT to use an AMI that
> Spark has prepared. The reason we haven't is that we were not aware such
> AMIs existed. Would you kindly point us to the documentation where we can
> read about this further?
>
> Many many thanks, Shivaram.
> Marco.
>
>
> On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> Is there any reason why you want to start with a vanilla amazon AMI
>> rather than the ones we build and provide as a part of Spark EC2 scripts ?
>> The AMIs we provide are close to the vanilla AMI but have the root account
>> setup properly and install packages like java that are used by Spark.
>>
>> If you wish to customize the AMI, you could always start with our AMI and
>> add more packages you like -- I have definitely done this recently and it
>> works with HVM and PVM as far as I can tell.
>>
>> Shivaram
>>
>>
>> On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> I was able to keep the "workaround" ...around... by overwriting the
>>> generated '/root/.ssh/authorized_keys' file with a known good one, in the
>>> '/etc/rc.local' file
>>>
>>>
>>> On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 Another thing I didn't mention. The AMI and user used: naturally I've
 created several of my own AMIs with the following characteristics. None of
 which worked.

 1) Enabling ssh as root as per this guide (
 http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
 When doing this, I do not specify a user for the spark-ec2 script. What
 happens is that, it works! But only while it's alive. If I stop the
 instance, create an AMI, and launch a new instance based from the new AMI,
 the change I made in the '/root/.ssh/authorized_keys' file is overwritten

 2) adding the 'ec2-user' to the 'root' group. This means that the
 ec2-user does not have to use sudo to perform any operations needing root
 privilidges. When doing this, I specify the user 'ec2-user' for the
 spark-ec2 script. An error occurs: rsync fails with exit code 23.

 I believe HVMs still work. But it would be valuable to the community to
 know that the root user work-around does/doesn't work any more for
 paravirtual instances.

 Thanks,
 Marco.


 On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
 silvio.costant...@granatads.com> wrote:

> As requested, here is the script I am running. It is a simple shell
> script which calls spark-ec2 wrapper script. I execute it from the 'ec2'
> directory of spark, as usual. The AMI used is the raw one from the AWS
> Quick Start section. It is the first option (an Amazon Linux paravirtual
> image). Any ideas or confirmation would be GREATLY appreciated. Please and
> thank you.
>
>
> #!/bin/sh
>
> export AWS_ACCESS_KEY_ID=MyCensoredKey
> export AWS_SECRET_ACCESS_KEY=MyCensoredKey
>
> AMI_ID=ami-2f726546
>
> ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user -s 10
> -v 0.9.0 -w 300 --no-ganglia -a ${AMI_ID} -m m3.2xlarge -t m3.2xlarge
> launch marcotest
>
>
>
> On Mon, Apr 7, 2014 at 6:21 PM, Shivaram Venkataraman <
> shivaram.venkatara...@gmail.com> wrote:
>
>> Hmm -- That is strange. Can you paste the command you are using to
>> launch the instances ? The typical workflow is to use the spark-ec2 
>> wrapper
>> script using the guidelines at
>> http://spark.apache.org/docs/latest/ec2-scripts.html
>>
>> Shivaram
>>
>>
>> On Mon, Apr 7, 2014 at 1:53 PM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> Hi Shivaram,
>>>
>>> OK so let's assume the script CANNOT take a different user and that
>>> it must be 'root'. The typical workaround is as you said, allow the ssh
>>> with the root user. Now, don't laugh, but, this worked last Friday, but
>>> today (Monday) it no longer works. :D Why? ...
>>>
>>> ...It seems that NOW, when you launch a 'paravirtual' ami, the root
>>> user's 'authorized_keys' file is always overwritten. This means the
>>> workaround doesn't work anymore! I would LOVE for someone to verify 
>>> this.
>>>
>>> Just to point out, I am trying to make this work with a paravirtual
>>> instance and not an HVM instance.
>>>
>>> Please and thanks,
>>> Marco.
>>>
>>>
>>> On Mon, Apr 7, 2014 at 4:40 PM, Shivaram Venkataraman <
>>> shivaram.venkatara...@gmail.com> wrote:
>>>
 Right now the spark-ec2 scripts assume that y

Re: To Ten RDD

2014-04-09 Thread mailforledkk






i see the top method in RDD class , you can use this method to get top N , but 
i found some error when i use this method  it's seem as when mapPartitions 
in top , the task result may be an Nil , then reduce will result an class cast 
exception  as blow :java.lang.ClassCastException: 
scala.collection.immutable.Nil$ cannot be cast to 
org.apache.spark.util.BoundedPriorityQueue 
at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:873) 
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:671) 
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:668) 
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
 
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
my spark version is : 0.9.0  , i don't konw is this a bug of spark ..
there are another method to get top N , you can use sortByKey method and then 
use take method to get the top N value of your RDD


mailforledkk
 From: Jeyaraj, Arockia R (Arockia)Date: 2014-04-09 21:55To: 
user@spark.apache.orgSubject: To Ten RDDHi , Can you any one tell how to get 
Top ten RDD by value?  ThanksArockia Raja


To Ten RDD

2014-04-09 Thread Jeyaraj, Arockia R (Arockia)
Hi ,

Can you any one tell how to get Top ten RDD by value?


Thanks
Arockia Raja


Re: Spark Disk Usage

2014-04-09 Thread Surendranauth Hiraman
Yes, MEMORY_AND_DISK.

We do a groupByKey and then call persist on the resulting RDD. So I'm
wondering if groupByKey is aware of the subsequent persist setting to use
disk or just creates the Seq[V] in memory and only uses disk after that
data structure is fully realized in memory.

-Suren



On Wed, Apr 9, 2014 at 9:46 AM, Andrew Ash  wrote:

> Which persistence level are you talking about? MEMORY_AND_DISK ?
>
> Sent from my mobile phone
> On Apr 9, 2014 2:28 PM, "Surendranauth Hiraman" 
> wrote:
>
>> Thanks, Andrew. That helps.
>>
>> For 1, it sounds like the data for the RDD is held in memory and then
>> only written to disk after the entire RDD has been realized in memory. Is
>> that correct?
>>
>> -Suren
>>
>>
>>
>> On Wed, Apr 9, 2014 at 9:25 AM, Andrew Ash  wrote:
>>
>>> For 1, persist can be used to save an RDD to disk using the various
>>> persistence levels.  When a persistency level is set on an RDD, when that
>>> RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
>>> re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
>>> use the cached value.
>>>
>>>
>>> https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence
>>>
>>> 2. The other places disk is used most commonly is shuffles.  If you have
>>> data across the cluster that comes from a source, then you might not have
>>> to hold it all in memory at once.  But if you do a shuffle, which scatters
>>> the data across the cluster in a certain way, then you have to have the
>>> memory/disk available for that RDD all at once.  In that case, shuffles
>>> will sometimes need to spill over to disk for large RDDs, which can be
>>> controlled with the spark.shuffle.spill setting.
>>>
>>> Does that help clarify?
>>>
>>>
>>> On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
>>> suren.hira...@velos.io> wrote:
>>>
 It might help if I clarify my questions. :-)

 1. Is persist() applied during the transformation right before the
 persist() call in the graph? Or is is applied after the transform's
 processing is complete? In the case of things like GroupBy, is the Seq
 backed by disk as it is being created? We're trying to get a sense of how
 the processing is handled behind the scenes with respect to disk.

 2. When else is disk used internally?

 Any pointers are appreciated.

 -Suren




 On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
 suren.hira...@velos.io> wrote:

> Hi,
>
> Any thoughts on this? Thanks.
>
> -Suren
>
>
>
> On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> Hi,
>>
>> I know if we call persist with the right options, we can have Spark
>> persist an RDD's data on disk.
>>
>> I am wondering what happens in intermediate operations that could
>> conceivably create large collections/Sequences, like GroupBy and 
>> shuffling.
>>
>> Basically, one part of the question is when is disk used internally?
>>
>> And is calling persist() on the RDD returned by such transformations
>> what let's it know to use disk in those situations? Trying to understand 
>> if
>> persist() is applied during the transformation or after it.
>>
>> Thank you.
>>
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v elos.io
 W: www.velos.io


>>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Re: Spark Disk Usage

2014-04-09 Thread Andrew Ash
Which persistence level are you talking about? MEMORY_AND_DISK ?

Sent from my mobile phone
On Apr 9, 2014 2:28 PM, "Surendranauth Hiraman" 
wrote:

> Thanks, Andrew. That helps.
>
> For 1, it sounds like the data for the RDD is held in memory and then only
> written to disk after the entire RDD has been realized in memory. Is that
> correct?
>
> -Suren
>
>
>
> On Wed, Apr 9, 2014 at 9:25 AM, Andrew Ash  wrote:
>
>> For 1, persist can be used to save an RDD to disk using the various
>> persistence levels.  When a persistency level is set on an RDD, when that
>> RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
>> re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
>> use the cached value.
>>
>>
>> https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence
>>
>> 2. The other places disk is used most commonly is shuffles.  If you have
>> data across the cluster that comes from a source, then you might not have
>> to hold it all in memory at once.  But if you do a shuffle, which scatters
>> the data across the cluster in a certain way, then you have to have the
>> memory/disk available for that RDD all at once.  In that case, shuffles
>> will sometimes need to spill over to disk for large RDDs, which can be
>> controlled with the spark.shuffle.spill setting.
>>
>> Does that help clarify?
>>
>>
>> On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> It might help if I clarify my questions. :-)
>>>
>>> 1. Is persist() applied during the transformation right before the
>>> persist() call in the graph? Or is is applied after the transform's
>>> processing is complete? In the case of things like GroupBy, is the Seq
>>> backed by disk as it is being created? We're trying to get a sense of how
>>> the processing is handled behind the scenes with respect to disk.
>>>
>>> 2. When else is disk used internally?
>>>
>>> Any pointers are appreciated.
>>>
>>> -Suren
>>>
>>>
>>>
>>>
>>> On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
>>> suren.hira...@velos.io> wrote:
>>>
 Hi,

 Any thoughts on this? Thanks.

 -Suren



 On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
 suren.hira...@velos.io> wrote:

> Hi,
>
> I know if we call persist with the right options, we can have Spark
> persist an RDD's data on disk.
>
> I am wondering what happens in intermediate operations that could
> conceivably create large collections/Sequences, like GroupBy and 
> shuffling.
>
> Basically, one part of the question is when is disk used internally?
>
> And is calling persist() on the RDD returned by such transformations
> what let's it know to use disk in those situations? Trying to understand 
> if
> persist() is applied during the transformation or after it.
>
> Thank you.
>
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v elos.io
 W: www.velos.io


>>>
>>>
>>> --
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v elos.io
>>> W: www.velos.io
>>>
>>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


Re: AWS Spark-ec2 script with different user

2014-04-09 Thread Marco Costantini
Hi there,
To answer your question; no there is no reason NOT to use an AMI that Spark
has prepared. The reason we haven't is that we were not aware such AMIs
existed. Would you kindly point us to the documentation where we can read
about this further?

Many many thanks, Shivaram.
Marco.


On Tue, Apr 8, 2014 at 4:42 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Is there any reason why you want to start with a vanilla amazon AMI rather
> than the ones we build and provide as a part of Spark EC2 scripts ? The
> AMIs we provide are close to the vanilla AMI but have the root account
> setup properly and install packages like java that are used by Spark.
>
> If you wish to customize the AMI, you could always start with our AMI and
> add more packages you like -- I have definitely done this recently and it
> works with HVM and PVM as far as I can tell.
>
> Shivaram
>
>
> On Tue, Apr 8, 2014 at 8:50 AM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> I was able to keep the "workaround" ...around... by overwriting the
>> generated '/root/.ssh/authorized_keys' file with a known good one, in the
>> '/etc/rc.local' file
>>
>>
>> On Tue, Apr 8, 2014 at 10:12 AM, Marco Costantini <
>> silvio.costant...@granatads.com> wrote:
>>
>>> Another thing I didn't mention. The AMI and user used: naturally I've
>>> created several of my own AMIs with the following characteristics. None of
>>> which worked.
>>>
>>> 1) Enabling ssh as root as per this guide (
>>> http://blog.tiger-workshop.com/enable-root-access-on-amazon-ec2-instance/).
>>> When doing this, I do not specify a user for the spark-ec2 script. What
>>> happens is that, it works! But only while it's alive. If I stop the
>>> instance, create an AMI, and launch a new instance based from the new AMI,
>>> the change I made in the '/root/.ssh/authorized_keys' file is overwritten
>>>
>>> 2) adding the 'ec2-user' to the 'root' group. This means that the
>>> ec2-user does not have to use sudo to perform any operations needing root
>>> privilidges. When doing this, I specify the user 'ec2-user' for the
>>> spark-ec2 script. An error occurs: rsync fails with exit code 23.
>>>
>>> I believe HVMs still work. But it would be valuable to the community to
>>> know that the root user work-around does/doesn't work any more for
>>> paravirtual instances.
>>>
>>> Thanks,
>>> Marco.
>>>
>>>
>>> On Tue, Apr 8, 2014 at 9:51 AM, Marco Costantini <
>>> silvio.costant...@granatads.com> wrote:
>>>
 As requested, here is the script I am running. It is a simple shell
 script which calls spark-ec2 wrapper script. I execute it from the 'ec2'
 directory of spark, as usual. The AMI used is the raw one from the AWS
 Quick Start section. It is the first option (an Amazon Linux paravirtual
 image). Any ideas or confirmation would be GREATLY appreciated. Please and
 thank you.


 #!/bin/sh

 export AWS_ACCESS_KEY_ID=MyCensoredKey
 export AWS_SECRET_ACCESS_KEY=MyCensoredKey

 AMI_ID=ami-2f726546

 ./spark-ec2 -k gds-generic -i ~/.ssh/gds-generic.pem -u ec2-user -s 10
 -v 0.9.0 -w 300 --no-ganglia -a ${AMI_ID} -m m3.2xlarge -t m3.2xlarge
 launch marcotest



 On Mon, Apr 7, 2014 at 6:21 PM, Shivaram Venkataraman <
 shivaram.venkatara...@gmail.com> wrote:

> Hmm -- That is strange. Can you paste the command you are using to
> launch the instances ? The typical workflow is to use the spark-ec2 
> wrapper
> script using the guidelines at
> http://spark.apache.org/docs/latest/ec2-scripts.html
>
> Shivaram
>
>
> On Mon, Apr 7, 2014 at 1:53 PM, Marco Costantini <
> silvio.costant...@granatads.com> wrote:
>
>> Hi Shivaram,
>>
>> OK so let's assume the script CANNOT take a different user and that
>> it must be 'root'. The typical workaround is as you said, allow the ssh
>> with the root user. Now, don't laugh, but, this worked last Friday, but
>> today (Monday) it no longer works. :D Why? ...
>>
>> ...It seems that NOW, when you launch a 'paravirtual' ami, the root
>> user's 'authorized_keys' file is always overwritten. This means the
>> workaround doesn't work anymore! I would LOVE for someone to verify this.
>>
>> Just to point out, I am trying to make this work with a paravirtual
>> instance and not an HVM instance.
>>
>> Please and thanks,
>> Marco.
>>
>>
>> On Mon, Apr 7, 2014 at 4:40 PM, Shivaram Venkataraman <
>> shivaram.venkatara...@gmail.com> wrote:
>>
>>> Right now the spark-ec2 scripts assume that you have root access and
>>> a lot of internal scripts assume have the user's home directory hard 
>>> coded
>>> as /root.   However all the Spark AMIs we build should have root ssh 
>>> access
>>> -- Do you find this not to be the case ?
>>>
>>> You can also enable root ssh access in a vanilla AMI by editing
>>> 

Re: Spark Disk Usage

2014-04-09 Thread Surendranauth Hiraman
Thanks, Andrew. That helps.

For 1, it sounds like the data for the RDD is held in memory and then only
written to disk after the entire RDD has been realized in memory. Is that
correct?

-Suren



On Wed, Apr 9, 2014 at 9:25 AM, Andrew Ash  wrote:

> For 1, persist can be used to save an RDD to disk using the various
> persistence levels.  When a persistency level is set on an RDD, when that
> RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
> re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
> use the cached value.
>
>
> https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence
>
> 2. The other places disk is used most commonly is shuffles.  If you have
> data across the cluster that comes from a source, then you might not have
> to hold it all in memory at once.  But if you do a shuffle, which scatters
> the data across the cluster in a certain way, then you have to have the
> memory/disk available for that RDD all at once.  In that case, shuffles
> will sometimes need to spill over to disk for large RDDs, which can be
> controlled with the spark.shuffle.spill setting.
>
> Does that help clarify?
>
>
> On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> It might help if I clarify my questions. :-)
>>
>> 1. Is persist() applied during the transformation right before the
>> persist() call in the graph? Or is is applied after the transform's
>> processing is complete? In the case of things like GroupBy, is the Seq
>> backed by disk as it is being created? We're trying to get a sense of how
>> the processing is handled behind the scenes with respect to disk.
>>
>> 2. When else is disk used internally?
>>
>> Any pointers are appreciated.
>>
>> -Suren
>>
>>
>>
>>
>> On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> Hi,
>>>
>>> Any thoughts on this? Thanks.
>>>
>>> -Suren
>>>
>>>
>>>
>>> On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
>>> suren.hira...@velos.io> wrote:
>>>
 Hi,

 I know if we call persist with the right options, we can have Spark
 persist an RDD's data on disk.

 I am wondering what happens in intermediate operations that could
 conceivably create large collections/Sequences, like GroupBy and shuffling.

 Basically, one part of the question is when is disk used internally?

 And is calling persist() on the RDD returned by such transformations
 what let's it know to use disk in those situations? Trying to understand if
 persist() is applied during the transformation or after it.

 Thank you.


 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v elos.io
 W: www.velos.io


>>>
>>>
>>> --
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v elos.io
>>> W: www.velos.io
>>>
>>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Re: Spark Disk Usage

2014-04-09 Thread Andrew Ash
For 1, persist can be used to save an RDD to disk using the various
persistence levels.  When a persistency level is set on an RDD, when that
RDD is evaluated it's saved to memory/disk/elsewhere so that it can be
re-used.  It's applied to that RDD, so that subsequent uses of the RDD can
use the cached value.

https://spark.apache.org/docs/0.9.0/scala-programming-guide.html#rdd-persistence

2. The other places disk is used most commonly is shuffles.  If you have
data across the cluster that comes from a source, then you might not have
to hold it all in memory at once.  But if you do a shuffle, which scatters
the data across the cluster in a certain way, then you have to have the
memory/disk available for that RDD all at once.  In that case, shuffles
will sometimes need to spill over to disk for large RDDs, which can be
controlled with the spark.shuffle.spill setting.

Does that help clarify?


On Mon, Apr 7, 2014 at 10:20 AM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> It might help if I clarify my questions. :-)
>
> 1. Is persist() applied during the transformation right before the
> persist() call in the graph? Or is is applied after the transform's
> processing is complete? In the case of things like GroupBy, is the Seq
> backed by disk as it is being created? We're trying to get a sense of how
> the processing is handled behind the scenes with respect to disk.
>
> 2. When else is disk used internally?
>
> Any pointers are appreciated.
>
> -Suren
>
>
>
>
> On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> Hi,
>>
>> Any thoughts on this? Thanks.
>>
>> -Suren
>>
>>
>>
>> On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> Hi,
>>>
>>> I know if we call persist with the right options, we can have Spark
>>> persist an RDD's data on disk.
>>>
>>> I am wondering what happens in intermediate operations that could
>>> conceivably create large collections/Sequences, like GroupBy and shuffling.
>>>
>>> Basically, one part of the question is when is disk used internally?
>>>
>>> And is calling persist() on the RDD returned by such transformations
>>> what let's it know to use disk in those situations? Trying to understand if
>>> persist() is applied during the transformation or after it.
>>>
>>> Thank you.
>>>
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v elos.io
>>> W: www.velos.io
>>>
>>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


Re: PySpark SocketConnect Issue in Cluster

2014-04-09 Thread Surendranauth Hiraman
This appears to be an issue around using pandas. Even if we just
instantiate a dataframe and do nothing with it, the python worker process
is exiting. But if we remove any pandas references, the same job runs to
completion.

Has anyone run into this before?

-Suren



On Mon, Apr 7, 2014 at 1:10 PM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> Hi,
>
> We have a situation where a Pyspark script works fine as a local process
> ("local" url) on the Master and the Worker nodes, which would indicate that
> all python dependencies are set up properly on each machine.
>
> But when we try to run the script at the cluster level (using the master's
> url), if fails partway through the flow on a GroupBy with a SocketConnect
> error and python crashes.
>
> This is on ec2 using the AMI. This doesn't seem to be an issue of the
> master not seeing the workers, since they show up in the web ui.
>
> Also, we can see the job running on the cluster until it reaches the
> GroupBy transform step, which is when we get the SocketConnect error.
>
> Any ideas?
>
> -Suren
>
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Re: trouble with "join" on large RDDs

2014-04-09 Thread Andrew Ash
A JVM can easily be limited in how much memory it uses with the -Xmx
parameter, but Python doesn't have memory limits built in in such a
first-class way.  Maybe the memory limits aren't making it to the python
executors.

What was your SPARK_MEM setting?  The JVM below seems to be using 603201
(pages?) and the 3 large python processes each are using ~180 (pages?).
 I'm unsure the units that the OOM killer's RSS column is in.  Could be
either pages (4kb each) or bytes.


Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  234812573
2102  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  234912573
2101  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  235012573
2101  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  511512571
2101  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  511612571
2101  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  511712571
2101  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  772512570
2098  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  772612570
2098  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  772712570
2098  220 0 python
Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 1032412570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 1032512570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 1032612570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
   47932 1900 0 java
Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 1329512570
2100  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 1329612570
2100  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 1329712570
2100  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 1519212570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 1519312570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 1519512570
2098  230 0 python
Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
 181846335730 0 python
Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
 168649233160 0 python
Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
 176234434630 0 python
Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
15198 (python) score 221 or sacrifice child
Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198
(python) total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB


On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller wrote:

> Hi All,
>
> I poked around a bit more to (1) confirm my suspicions that the crash
> was related to memory consumption and (2) figure out why there is no
> error shown in 12_stderr, the spark executor log file from the
> executors on bennett.research.intel.research.net.
>
> The syslog file (from /var/log/syslog on bennett, attached) shows that
> the machine ran out of memory, the memory was mostly consumed by 1
> java process and 3 python processes (I am running pyspark with 3 cores
> per machine), and then the kernel began killing java and python
> processes to ease memory pressure.  It seems likely that these
> processes were the spark processes, and there's no errors recorded in
> 12_stderr because the process was killed by the OS (rather than
> experiencing an unhandled "cannot allocate memory" exception).
>
> I'm a little confused how Spark could consume so much memory during
> the reduce phase of the shuffle.  Shouldn't Spark remain within the
> SPARK_MEM limitations on memory consumption, and spill to disk in the
> event that there isn't enough memory?
>
> -Brad
>
>
> On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller 
> wrote:
> > Hi Patrick,
> >
> >> The shuffle data is written through the buffer cache of the operating
> >> system, so you would expect the files to show up there immediately and
> >> probably to show up as being their full size when you do "ls". In
> reality
> >> though these are likely residing in the OS cache and not on disk.
> >
> > I see.  Perhaps the memory consump

Re: Spark packaging

2014-04-09 Thread prabeesh k
Please refer
http://prabstechblog.blogspot.in/2014/04/creating-single-jar-for-spark-project.html

Regards,
prabeesh


On Wed, Apr 9, 2014 at 1:04 PM, Pradeep baji wrote:

> Hi all,
>
> I am new to spark and trying to learn it. Is there any document which
> describes how spark is packaged. ( like dependencies needed to build spark,
> which jar contains what after build etc)
>
> Thanks for the help.
>
> Regards,
> Pradeep
>
>


Spark packaging

2014-04-09 Thread Pradeep baji
Hi all,

I am new to spark and trying to learn it. Is there any document which
describes how spark is packaged. ( like dependencies needed to build spark,
which jar contains what after build etc)

Thanks for the help.

Regards,
Pradeep


Spark operators on Objects

2014-04-09 Thread Flavio Pompermaier
Hi to everybody,

In my current scenario I have complex objects stored as xml in an HBase
Table.
What's the best strategy to work with them? My final goal would be to
define operators on those objects (like filter, equals, append, join,
merge, etc) and then work with multiple RDDs to perform some kind of
comparison between those objects. What do you suggest me? Is it possible?

Best,
Flavio


Spark on YARN performance

2014-04-09 Thread Flavio Pompermaier
Hi to everybody,

I'm new to Spark and I'd like to know if running Spark on top of YARN or
Mesos could affect (and how much) its performance. Is there any doc about
this?

Best,
Flavio


KafkaReciever Error when starting ssc (Actor name not unique)

2014-04-09 Thread gaganbm
Hi All,

I am getting this exception when doing ssc.start to start the streaming
context.


ERROR KafkaReceiver - Error receiving data
akka.actor.InvalidActorNameException: actor name [NetworkReceiver-0] is not
unique!
at
akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
at
org.apache.spark.streaming.dstream.NetworkReceiver.actor$lzycompute(NetworkInputDStream.scala:94)
at
org.apache.spark.streaming.dstream.NetworkReceiver.actor(NetworkInputDStream.scala:94)
at
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:122)
at
org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173)
at
org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

I tried cleaning up the zookeeper and kafka temp/cached files. But still the
same. 

Any help on this ? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tp3978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.