Re: question about compiling SimpleApp

2014-02-18 Thread Andrew Ash
Dachuan,

Where did you find that faulty documentation?  I'd like to get it fixed.

Thanks!
Andrew


On Tue, Feb 18, 2014 at 4:15 PM, dachuan  wrote:

> Thanks for your reply.
>
> I have changed scalaVersion := "2.10" to scalaVersion := "2.10.3" then
> everything is good.
>
> So this is a documentation bug :)
>
> dachuan.
>
>
> On Tue, Feb 18, 2014 at 6:50 PM, Denny Lee  wrote:
>
>> What version of Scala are you using?  For example, if you’re using Spark
>> Incubating 0.9, its dependency is Scala 2.10.3.  If you can update the
>> simple.sbt to indicate 2.10.3 (as opposed 2.10) then that might do the
>> trick.
>>
>> HTH!
>>
>> On February 18, 2014 at 11:21:46 AM, dachuan (hdc1...@gmail.com) wrote:
>>
>>Hello,
>>
>> A quick question about my problem in compiling SimpleApp,
>>
>> sbt package reports
>>
>>  [info] Resolving org.scala-lang#scala-library;2.10 ...
>> [warn]  module not found: org.scala-lang#scala-library;2.10
>>  ...
>>  [warn]  ::
>> [warn]  ::  UNRESOLVED DEPENDENCIES ::
>> [warn]  ::
>> [warn]  :: org.scala-lang#scala-library;2.10: not found
>> [warn]  :: org.scala-lang#scala-compiler;2.10: not found
>> [warn]  ::
>> sbt.ResolveException: unresolved dependency:
>> org.scala-lang#scala-library;2.10: not found
>> unresolved dependency: org.scala-lang#scala-compiler;2.10: not found
>>
>> This might be a common problem, please help me with this.
>>
>> thanks,
>> dachuan.
>>
>>
>
>
> --
> Dachuan Huang
> Cellphone: 614-390-7234
> 2015 Neil Avenue
> Ohio State University
> Columbus, Ohio
> U.S.A.
> 43210
>


Re: Defining SparkShell Init?

2014-02-18 Thread Andrew Ash
Why would scala 0.11 change things here? I'm not familiar with what
features you're referring.

I would support a prelude file in ~/.sparkrc our similar that is
automatically imported on spark shell startup if it exists.

Sent from my mobile phone
On Feb 17, 2014 9:11 PM, "Prashant Sharma"  wrote:

> There is a way to :load in shell, where you can specify the path of your
> boilerplate.scala. These things would be streamlined "once" we have scala
> 2.11 (I hope.)
>
>
> On Tue, Feb 18, 2014 at 7:20 AM, Mayur Rustagi wrote:
>
>> That's actually not a bad idea. To have a shellboilerplate.scala in the
>> same folder that is used to initialize the shell.
>> Shell is a script that end of they day starts a JVM with jars from the
>> spark project , mostly you'll have to modify the spark classes and
>> reassemble using sbt. It's messy but thr may be easier ways to feed some
>> data to shell script/JVM then connect with stdin.
>> Regards
>> Mayur
>>
>> On Monday, February 17, 2014, Kyle Ellrott  wrote:
>>
>>> Is there a way to define a set of commands to 'initialize' the
>>> environment in the SparkShell?
>>> I'd like to create a wrapper script that starts up the sparkshell and
>>> does some boiler plate initialization (imports and variable
>>> creation) before handing things over to me.
>>>
>>> Kyle
>>>
>>
>>
>> --
>> Sent from Gmail Mobile
>>
>
>
>
> --
> Prashant
>


Re: ADD_JARS not working on 0.9

2014-02-16 Thread Andrew Ash
// cc Patrick, who I think helps with the Amplab Jira

Amplab Jira admins, can we make sure that newly-created users have comment
permissions?  This has been standard in the open source Jira instances I've
worked with in the past (like Hadoop).

Thanks!
Andrew


On Sat, Feb 15, 2014 at 4:25 AM, Vyacheslav Baranov <
slavik.bara...@gmail.com> wrote:

>  Andrew,
>
> I've created account on Amplab Jira, but unfortunately I don't have
> permission to comment.
>
> Vyacheslav
>
>
> On 15/02/14 00:28, Andrew Ash wrote:
>
> Hi Vyacheslav,
>
>  If you could add that to the ticket directly that would be valuable
> because you're more familiar with the specific problem than me!
>
>  Andrew
>
>
> On Fri, Feb 14, 2014 at 8:10 AM, Vyacheslav Baranov <
> slavik.bara...@gmail.com> wrote:
>
>>  Hello Andrew,
>>
>> I'm running on the same problem when I try to import a jar using ':cp'
>> repl command. This used to work on 0.8:
>>
>> scala> import org.msgpack
>> :10: error: msgpack is not a member of org
>>import org.msgpack
>>   ^
>>
>> scala> :cp /path/to/msgpack-0.6.8.jar
>> Added '/path/to/msgpack-0.6.8.jar'.  Your new classpath is:
>>
>> "/usr/share/lib/spark/*:/usr/lib/spark/conf:/usr/lib/spark/jars/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/path/to/msgpack-0.6.8.jar"
>> 14/02/14 20:04:00 INFO server.Server: jetty-7.x.y-SNAPSHOT
>> 14/02/14 20:04:00 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:64454
>>
>> scala> import org.msgpack
>> import org.msgpack
>>
>> And it's not working on 0.9:
>>
>> scala> import org.msgpack
>> :10: error: object msgpack is not a member of package org
>>import org.msgpack
>>   ^
>>
>> scala> :cp /path/to/msgpack-0.6.8.jar
>> Added '/path/to/msgpack-0.6.8.jar'.  Your new classpath is:
>>
>> "/usr/share/lib/spark/*:/usr/lib/spark/conf:/usr/lib/spark/jars/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar:/path/to/msgpack-0.6.8.jar"
>> Nothing to replay.
>>
>> scala> import org.msgpack
>> :7: error: object msgpack is not a member of package org
>>import org.msgpack
>>   ^
>>
>> Probably, it's worth to add this to issue's comments
>>
>> Thank you,
>> Vyacheslav
>>
>>
>> On 14/02/14 02:26, Andrew Ash wrote:
>>
>> I filed a bug so we can track the fix:
>> https://spark-project.atlassian.net/browse/SPARK-1089
>>
>>
>>  On Thu, Feb 13, 2014 at 2:21 PM, Soumya Simanta <
>> soumya.sima...@gmail.com> wrote:
>>
>>>  Use
>>> SPARK_CLASSPATH along with ADD_JARS
>>>
>>>
>>>  On Thu, Feb 13, 2014 at 5:12 PM, Andre Kuhnen wrote:
>>>
>>>> Hello, my spark-shell tells me taht the jar are added but it can not
>>>> import any of my stuff
>>>>
>>>>
>>>>  When I used the same steps on 0.8  everything worked fine
>>>>
>>>>  Thanks
>>>>
>>>>
>>>
>>
>>
>
>


Re: ADD_JARS not working on 0.9

2014-02-14 Thread Andrew Ash
Hi Vyacheslav,

If you could add that to the ticket directly that would be valuable because
you're more familiar with the specific problem than me!

Andrew


On Fri, Feb 14, 2014 at 8:10 AM, Vyacheslav Baranov <
slavik.bara...@gmail.com> wrote:

>  Hello Andrew,
>
> I'm running on the same problem when I try to import a jar using ':cp'
> repl command. This used to work on 0.8:
>
> scala> import org.msgpack
> :10: error: msgpack is not a member of org
>import org.msgpack
>   ^
>
> scala> :cp /path/to/msgpack-0.6.8.jar
> Added '/path/to/msgpack-0.6.8.jar'.  Your new classpath is:
>
> "/usr/share/lib/spark/*:/usr/lib/spark/conf:/usr/lib/spark/jars/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/path/to/msgpack-0.6.8.jar"
> 14/02/14 20:04:00 INFO server.Server: jetty-7.x.y-SNAPSHOT
> 14/02/14 20:04:00 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:64454
>
> scala> import org.msgpack
> import org.msgpack
>
> And it's not working on 0.9:
>
> scala> import org.msgpack
> :10: error: object msgpack is not a member of package org
>import org.msgpack
>   ^
>
> scala> :cp /path/to/msgpack-0.6.8.jar
> Added '/path/to/msgpack-0.6.8.jar'.  Your new classpath is:
>
> "/usr/share/lib/spark/*:/usr/lib/spark/conf:/usr/lib/spark/jars/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar:/path/to/msgpack-0.6.8.jar"
> Nothing to replay.
>
> scala> import org.msgpack
> :7: error: object msgpack is not a member of package org
>import org.msgpack
>   ^
>
> Probably, it's worth to add this to issue's comments
>
> Thank you,
> Vyacheslav
>
>
> On 14/02/14 02:26, Andrew Ash wrote:
>
> I filed a bug so we can track the fix:
> https://spark-project.atlassian.net/browse/SPARK-1089
>
>
>  On Thu, Feb 13, 2014 at 2:21 PM, Soumya Simanta  > wrote:
>
>>  Use
>> SPARK_CLASSPATH along with ADD_JARS
>>
>>
>>  On Thu, Feb 13, 2014 at 5:12 PM, Andre Kuhnen wrote:
>>
>>> Hello, my spark-shell tells me taht the jar are added but it can not
>>> import any of my stuff
>>>
>>>
>>>  When I used the same steps on 0.8  everything worked fine
>>>
>>>  Thanks
>>>
>>>
>>
>
>


Re: ADD_JARS not working on 0.9

2014-02-13 Thread Andrew Ash
I filed a bug so we can track the fix:
https://spark-project.atlassian.net/browse/SPARK-1089


On Thu, Feb 13, 2014 at 2:21 PM, Soumya Simanta wrote:

> Use
> SPARK_CLASSPATH along with ADD_JARS
>
>
> On Thu, Feb 13, 2014 at 5:12 PM, Andre Kuhnen wrote:
>
>> Hello, my spark-shell tells me taht the jar are added but it can not
>> import any of my stuff
>>
>>
>> When I used the same steps on 0.8  everything worked fine
>>
>> Thanks
>>
>>
>


Re: ADD_JARS not working on 0.9

2014-02-13 Thread Andrew Ash
Hi Andre,

I've also noticed this.  The jar needs to be added to SPARK_CLASSPATH also
now.

See
https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201402.mbox/%3ccajbo4nemlitrnm1xbyqomwmp0m+eucg4ye-txurgsvkob5k...@mail.gmail.com%3E


On Thu, Feb 13, 2014 at 2:12 PM, Andre Kuhnen  wrote:

> Hello, my spark-shell tells me taht the jar are added but it can not
> import any of my stuff
>
>
> When I used the same steps on 0.8  everything worked fine
>
> Thanks
>
>


Re: Best practice for retrieving big data from RDD to local machine

2014-02-12 Thread Andrew Ash
Hi Egor,

It sounds like you should vote for
https://spark-project.atlassian.net/browse/SPARK-914 which is to make an
RDD iterable from the driver.


On Wed, Feb 12, 2014 at 1:07 AM, Egor Pahomov wrote:

> Hello. I've got big RDD(1gb) in yarn cluster. On local machine, which use
> this cluster I have only 512 mb. I'd like to iterate over values in result
> RDD on my local machine. I can't use collect(), because it would create too
> big array locally which more then my heap. I need some iterative way. There
> is method iterator(), but it requires some additional information, I can't
> provide. (
> http://stackoverflow.com/questions/21698443/best-practice-for-retrieving-big-data-from-rdd-to-local-machine
> )
>
> --
>
>
>
> *Sincerely yours Egor PakhomovScala Developer, Yandex*
>


Re: GC issues

2014-02-12 Thread Andrew Ash
Alternatively, Spark's estimate of how much space you're using in the heap
is off on the low-side of true, so it runs out of memory when it thinks it
has breathing room.

Try lowering spark.storage.memoryFraction from its default (0.6) a bit to
something like 0.5 to make it more conservative with memory use within the
JVM if you don't have more physical memory to expand the Xmx setting.


On Wed, Feb 12, 2014 at 12:20 AM, Sean Owen  wrote:

> This is just Java's way of saying 'out of memory'. Your workers need more
> heap.
>  On Feb 12, 2014 7:23 AM, "Livni, Dana"  wrote:
>
>>  Hi,
>>
>> When running a map task I got the following exception.
>>
>> It is new, I have run this code many times in the past, and it the first
>> time it happens,
>>
>> any ideas why? Or how can I monitor when it happens?
>>
>>
>>
>> Thanks Dana.
>>
>>
>>
>> 14/02/11 16:15:56 ERROR executor.Executor: Exception in task ID 128
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> at java.lang.StringBuilder.toString(StringBuilder.java:430)
>>
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
>>
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819)
>>
>> at
>> java.io.ObjectInputStream.readString(ObjectInputStream.java:1598)
>>
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>>
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>>
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>>
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
>>
>> at
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
>>
>> at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>
>> at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
>>
>> at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:26)
>>
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>
>> at
>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:40)
>>
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
>>
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
>>
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>
>> at
>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:32)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>>
>> -
>> Intel Electronics Ltd.
>>
>> This e-mail and any attachments may contain confidential material for
>> the sole use of the intended recipient(s). Any review or distribution
>> by others is strictly prohibited. If you are not the intended
>> recipient, please contact the sender and delete all copies.
>>
>


Re: how is fault tolerance achieved in spark

2014-02-11 Thread Andrew Ash
Here's the original paper on how the framework achieves fault tolerance.
 You shouldn't have to do anything special as a user of the framework.

https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf


On Tue, Feb 11, 2014 at 12:21 PM, Adrian Mocanu
wrote:

>  Anyone willing to link some resource on how to achieve fault tolerance?
>
>
>
> *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com]
> *Sent:* February-10-14 1:44 PM
> *To:* user@spark.incubator.apache.org
> *Subject:* how is fault tolerance achieved in spark
>
>
>
> Hi all,
>
> I am curious how fault tolerance is achieved in spark. Well, more like
> what do I need to do to make sure my aggregations which comes from streams
> are fault tolerant and saved into cassandra. I will have nodes die and
> would not like to count “tuples” multiple times.
>
>
>
> For example, in trident you have to implement different interfaces. Is
> there a similar process for spark?
>
>
>
> Thanks
>
> -Adrian
>
>
>


Re: Query execution in spark

2014-02-11 Thread Andrew Ash
Hi Ravi,

Have you read through the docs?  I'm not sure there's a page that directly
answers your question but this one gives you an overview of the cluster.

http://spark.incubator.apache.org/docs/latest/cluster-overview.html

Andrew


On Tue, Feb 11, 2014 at 8:31 AM, Ravi Hemnani wrote:

> Hey,
>
> Can anyone explain how the job basically runs in spark?
>
> The number of mapper, reducers, the tmp files created and which tmp file
> contains what data and how to set the number of reducer tasks as we do in
> hadoop.
>
> This would prove to be a big help. Thank you.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Query-execution-in-spark-tp1390.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Task not serializable (java.io.NotSerializableException)

2014-02-11 Thread Andrew Ash
The full file on all the machines or just write the partitions that are
already on each machine to disk?

If the latter, try rdd.saveAsTextFile("file:///tmp/mydata")


On Tue, Feb 11, 2014 at 9:39 AM, David Thomas  wrote:

> I want it to be available on all machines in the cluster.
>
>
> On Tue, Feb 11, 2014 at 10:35 AM, Andrew Ash  wrote:
>
>> Do you want the files scattered across the local temp directories of all
>> your machines or just one of them?  If just one, I'd recommend having your
>> driver program execute hadoop fs -getmerge /path/to/files...  using Scala's
>> external process libraries.
>>
>>
>> On Tue, Feb 11, 2014 at 9:18 AM, David Thomas wrote:
>>
>>> I'm trying to copy a file from hdfs to a temp local directory within a
>>> map function using static method of FileUtil and I get the below error. Is
>>> there a way to get around this?
>>>
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: org.apache.hadoop.fs.Path
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>
>>
>>
>


Re: Task not serializable (java.io.NotSerializableException)

2014-02-11 Thread Andrew Ash
Do you want the files scattered across the local temp directories of all
your machines or just one of them?  If just one, I'd recommend having your
driver program execute hadoop fs -getmerge /path/to/files...  using Scala's
external process libraries.


On Tue, Feb 11, 2014 at 9:18 AM, David Thomas  wrote:

> I'm trying to copy a file from hdfs to a temp local directory within a map
> function using static method of FileUtil and I get the below error. Is
> there a way to get around this?
>
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.apache.hadoop.fs.Path
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>


Re: more complex analytics

2014-02-11 Thread Andrew Ash
I would key by those things that should be the same and then reduce by sum.

sc.parallelize(inputList)
.map(x => (x._1, x._2.toLong, x._3.toLong)) // parse to numeric values from
String
.map(x => ((x._1, x._3), x._2)) // key by the name and final number field
.reduceByKey(_+_)

Andrew


On Tue, Feb 11, 2014 at 7:07 AM, Adrian Mocanu wrote:

>  Hi
>
> Are there any examples on how to do any other operation apart from
> counting in spark via map then reduceByKey.
>
> It’s pretty straight forward to do counts but how do I add in my own
> function (say conditional sum based on tuple fields or moving average)?
>
>
>
> Here’s my count example so we have some code to work with
>
>
>
> val inputList= List(
> ("name","1","11134"),("name","2","11134"),("name","1","11130"),
> ("name2","1","11133") )
>
> sc.parallelize( inputList )
>
> .map(x => (x,1) )
>
> .reduceByKey(sumTuples)
>
> .foreach(x=>println(x))
>
>
>
> How would I add up field 2 from tuples which have fields “name” and the
> last field the same.
>
> In my example the result I want is:
>
> "name","1+2","11134"
>
> “name","1","11130”
>
> “name2","1","11133”
>
>
>
> Thanks
>
> -A
>


Re: [0.9.0] MEMORY_AND_DISK_SER not falling back to disk

2014-02-10 Thread Andrew Ash
My understanding of off-heap storage was that you'd still need to get those
JVM objects on-heap in order to actually use them with map, filter, etc.
 Would we be trading CPU time to get memory efficiency if we went down the
off-heap storage route?  I'm not sure what discussions have already
happened here and what kind of implementation we're talking about.




On Mon, Feb 10, 2014 at 2:28 AM, Rafal Kwasny  wrote:

> Hi Everyone,
> Maybe it's a good time to reevaluate off-heap storage for RDD's with
> custom allocator?
>
> On a few occasions recently I had to lower both
> spark.storage.memoryFraction and spark.shuffle.memoryFraction
> spark.shuffle.spill helps a bit with large scale reduces
>
> Also it could be you're hitting:
> https://github.com/apache/incubator-spark/pull/180
>
> /Rafal
>
>
>
> Andrew Ash wrote:
>
> I dropped down to 0.5 but still OOM'd, so sent it all the way to 0.1 and
> didn't get an OOM.  I could tune this some more to find where the cliff is,
> but this is a one-off job so now that it's completed I don't want to spend
> any more time tuning it.
>
> Is there a reason that this value couldn't be dynamically adjusted in
> response to actual heap usage?
>
> I can imagine a scenario where spending too much time in GC (descending
> into GC hell) drops the value a little to keep from OOM, or directly
> measuring how much of the heap is spent on this scratch space and adjusting
> appropriately.
>
>
> On Sat, Feb 8, 2014 at 3:40 PM, Matei Zaharia wrote:
>
>> This probably means that there’s not enough free memory for the “scratch”
>> space used for computations, so we OOM before the Spark cache decides that
>> it’s full and starts to spill stuff. Try reducing
>> spark.storage.memoryFraction (default is 0.66, try 0.5).
>>
>> Matei
>>
>> On Feb 5, 2014, at 10:29 PM, Andrew Ash  wrote:
>>
>> // version 0.9.0
>>
>> Hi Spark users,
>>
>> My understanding of the MEMORY_AND_DISK_SER persistence level was that if
>> an RDD could fit into memory then it would be left there (same as
>> MEMORY_ONLY), and only if it was too big for memory would it spill to disk.
>>  Here's how the docs describe it:
>>
>> MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions
>> that don't fit in memory to disk instead of recomputing them on the fly
>> each time they're needed.
>>
>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
>>
>>
>>
>> What I'm observing though is that really large RDDs are actually causing
>> OOMs.  I'm not sure if this is a regression in 0.9.0 or if it has been this
>> way for some time.
>>
>> While I look through the source code, has anyone actually observed the
>> correct spill to disk behavior rather than an OOM?
>>
>> Thanks!
>> Andrew
>>
>>
>>
>
>


Re: [0.9.0] MEMORY_AND_DISK_SER not falling back to disk

2014-02-10 Thread Andrew Ash
I dropped down to 0.5 but still OOM'd, so sent it all the way to 0.1 and
didn't get an OOM.  I could tune this some more to find where the cliff is,
but this is a one-off job so now that it's completed I don't want to spend
any more time tuning it.

Is there a reason that this value couldn't be dynamically adjusted in
response to actual heap usage?

I can imagine a scenario where spending too much time in GC (descending
into GC hell) drops the value a little to keep from OOM, or directly
measuring how much of the heap is spent on this scratch space and adjusting
appropriately.


On Sat, Feb 8, 2014 at 3:40 PM, Matei Zaharia wrote:

> This probably means that there’s not enough free memory for the “scratch”
> space used for computations, so we OOM before the Spark cache decides that
> it’s full and starts to spill stuff. Try reducing
> spark.storage.memoryFraction (default is 0.66, try 0.5).
>
> Matei
>
> On Feb 5, 2014, at 10:29 PM, Andrew Ash  wrote:
>
> // version 0.9.0
>
> Hi Spark users,
>
> My understanding of the MEMORY_AND_DISK_SER persistence level was that if
> an RDD could fit into memory then it would be left there (same as
> MEMORY_ONLY), and only if it was too big for memory would it spill to disk.
>  Here's how the docs describe it:
>
> MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that
> don't fit in memory to disk instead of recomputing them on the fly each
> time they're needed.
> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
>
>
>
> What I'm observing though is that really large RDDs are actually causing
> OOMs.  I'm not sure if this is a regression in 0.9.0 or if it has been this
> way for some time.
>
> While I look through the source code, has anyone actually observed the
> correct spill to disk behavior rather than an OOM?
>
> Thanks!
> Andrew
>
>
>


[0.9.0] MEMORY_AND_DISK_SER not falling back to disk

2014-02-05 Thread Andrew Ash
// version 0.9.0

Hi Spark users,

My understanding of the MEMORY_AND_DISK_SER persistence level was that if
an RDD could fit into memory then it would be left there (same as
MEMORY_ONLY), and only if it was too big for memory would it spill to disk.
 Here's how the docs describe it:

MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that
don't fit in memory to disk instead of recomputing them on the fly each
time they're needed.
https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html



What I'm observing though is that really large RDDs are actually causing
OOMs.  I'm not sure if this is a regression in 0.9.0 or if it has been this
way for some time.

While I look through the source code, has anyone actually observed the
correct spill to disk behavior rather than an OOM?

Thanks!
Andrew


Re: data locality in logs

2014-02-05 Thread Andrew Ash
If you have multiple executors running on a single node then you might have
data that's on the same server but in different JVMs.  Just on the same
server is NODE_LOCAL, but being in the same JVM is PROCESS_LOCAL.

Yes it was changed to be more specific than just preferred/non-preferred.
 The new options are PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY in
decreasing order of co-location.

Andrew


On Wed, Feb 5, 2014 at 10:16 PM, Tsai Li Ming  wrote:

> Hi,
>
> In older posts on Google Groups, there was mention of checking the logs on
> “preferred/non-preferred” for data locality.
>
> But I can’t seem to find this on 0.9.0 anymore? Has this been changed to
> “PROCESS_LOCAL” , like this:
> 14/02/06 13:51:45 INFO TaskSetManager: Starting task 9.0:50 as TID 568 on
> executor 0: xxx (PROCESS_LOCAL)
>
> What is the difference between process-local and node-local?
>
> Thanks,
> Liming
>
>
>
>
>


Re: Clean up app metadata on worker nodes

2014-02-05 Thread Andrew Ash
I'm observing this as well on 0.9.0, with several 10s of GB accumulating in
that directory but never being cleaned up.  I think this has gotten more
pronounced in 0.9.0 as well with large reducers spilling to disk.


On Wed, Feb 5, 2014 at 3:46 PM, Mingyu Kim  wrote:

> After creating a lot of Spark connections, work/app-* folders in Worker
> nodes keep getting created without any clean-up being done. This
> particularly becomes a problem when the Spark driver programs ship jars or
> files. Is there any way to garbage collect these without manually deleting
> them? Thanks!
>
> Mingyu
>


Re: Problem connecting to Spark Cluster from a standalone Scala program

2014-02-05 Thread Andrew Ash
When you look in the webui (port 8080) for the master does it list at least
one connected worker?


On Wed, Feb 5, 2014 at 7:19 AM, Soumya Simanta wrote:

> I'm running a Spark cluster. (Spark-0.9.0_SNAPSHOT).
>
> I connect to the Spark cluster from the spark-shell.  I can see the Spark
> web UI on n001:8080 and it shows that the master is running on
> spark://n001:7077
>
>
> However, when I try to connect to it using a standalone Scala program but
> I'm getting an exception which says that I cannot connect to the
>
>
> This is how I'm creating the Spark context in my Scala program.
>
> val sc = new SparkContext(*"spark://n001:7077"*, "Simple
> Twitter Analysis",
> "/home/myuserid/incubator-spark",List("target/scala-2.10/simple-project_2.10-1.0.jar"))
> And this is exception I'm getting.
>
> 14/02/05 10:05:20 INFO scheduler.DAGScheduler: Submitting Stage 1
> (MapPartitionsRDD[4] at reduceByKey at SimpleApp.scala:14), which has no
> missing parents
>
> 14/02/05 10:05:20 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
> from Stage 1 (MapPartitionsRDD[4] at reduceByKey at SimpleApp.scala:14)
>
> 14/02/05 10:05:20 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
> with 2 tasks
>
> 14/02/05 10:05:35 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
> 14/02/05 10:05:39 INFO client.AppClient$ClientActor: Connecting to master
> spark://n001:7077...
>
> *14/02/05 10:05:50 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory*
>
> 14/02/05 10:05:59 INFO client.AppClient$ClientActor: Connecting to master
> spark://n001:7077...
>
> 14/02/05 10:06:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
> *14/02/05 10:06:19 ERROR client.AppClient$ClientActor: All masters are
> unresponsive! Giving up.*
>
> *14/02/05 10:06:19 ERROR cluster.SparkDeploySchedulerBackend: Spark
> cluster looks dead, giving up.*
>
> 14/02/05 10:06:19 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0
> from pool
>
> *14/02/05 10:06:19 INFO scheduler.DAGScheduler: Failed to run count at
> SimpleApp.scala:16*
>
> *[error] (run-main) org.apache.spark.SparkException: Job aborted: Spark
> cluster looks down*
>
> *org.apache.spark.SparkException: Job aborted: Spark cluster looks down*
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>
> at scala.Option.foreach(Option.scala:236)
>
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
>
> [trace] Stack trace suppressed: run last compile:run for the full output.
>
> 14/02/05 10:06:19 INFO network.ConnectionManager: Selector thread was
> interrupted!
>
> java.lang.RuntimeException: Nonzero exit code: 1
>
> at scala.sys.package$.error(package.scala:27)
>
> [trace] Stack trace suppressed: run last compile:run for the full output.
>
> [error] (compile:run) Nonzero exit code: 1
>
> [error] Total time: 63 s, completed Feb 5, 2014 10:06:19 AM
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Using Parquet from an interactive Spark shell

2014-02-05 Thread Andrew Ash
I'm assuming you checked all the jars in SPARK_CLASSPATH to confirm that
parquet/org/codehaus/jackson/JsonGenerationException.class exists in one of
them?


On Wed, Feb 5, 2014 at 12:02 PM, Uri Laserson  wrote:

> Has anyone tried this?  I'd like to read a bunch of Avro GenericRecords
> from a Parquet file. I'm having a bit of trouble with respect to
> dependencies.  My latest attempt looks like this:
>
> export
> SPARK_CLASSPATH="/Users/laserson/repos/parquet-mr/parquet-avro/target/parquet-avro-1.3.3-SNAPSHOT.jar:/Users/laserson/repos/parquet-mr/parquet-hadoop/target/parquet-hadoop-1.3.3-SNAPSHOT.jar:/Users/laserson/repos/parquet-mr/parquet-common/target/parquet-common-1.3.3-SNAPSHOT.jar:/Users/laserson/repos/parquet-mr/parquet-column/target/parquet-column-1.3.3-SNAPSHOT.jar:/Users/laserson/repos/parquet-format/target/parquet-format-2.0.1-SNAPSHOT.jar"
>
> MASTER=local ~/repos/incubator-spark/bin/spark-shell
>
> Then in the shell:
>
> val records1 =
> sc.newAPIHadoopFile("/Users/laserson/temp/test-parquet/alltypeuri",
> classOf[AvroParquetInputFormat], classOf[Void], classOf[IndexedRecord],
> sc.hadoopConfiguration)
> records1.collect
>
> At which point it barfs:
>
> 14/02/05 12:02:32 INFO FileInputFormat: Total input paths to process : 3
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> java.io.IOException: Could not read footer:
> java.lang.NoClassDefFoundError:
> parquet/org/codehaus/jackson/JsonGenerationException
> at
> parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:189)
>  at
> parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:145)
> at
> parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:354)
>  at
> parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:339)
> at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:246)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:85)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:863)
>  at org.apache.spark.rdd.RDD.collect(RDD.scala:602)
> at $iwC$$iwC$$iwC$$iwC.(:20)
>  at $iwC$$iwC$$iwC.(:25)
> at $iwC$$iwC.(:27)
> at $iwC.(:29)
>  at (:31)
> at .(:35)
> at .()
>  at .(:7)
> at .()
> at $print()
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
>  at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
>  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
>  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
>  at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
>  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:593)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:600)
>  at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:603)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:926)
>  at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
>  at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
>  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:968)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
>  at org.apache.spark.repl.Main.main(Main.scala)
> Caused by: java.lang.NoClassDefFoundError:
> parquet/org/codehaus/jackson/JsonGenerationException
> at
> parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:359)
>  at
> parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:312)
> at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:295)
>  at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:179)
> at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:175)
>  at java.util.concurrent.FutureTask.run(FutureTask.

Re: What I am missing from configuration?

2014-02-05 Thread Andrew Ash
Try depending on spark-core_2.10 rather than 2.10.3 -- the third digit was
dropped in the maven artifact and I hit this just yesterday as well.

Sent from my mobile phone
On Feb 5, 2014 10:41 AM, "Dana Tontea"  wrote:

>Hi Matei,
>
> Firstly thank you a lot for answer.You are right I'm missing on local the
> hadoop-client dependency.
> But in my cluster I deployed the last version of spark-0.9.0 and now on
> same
> code I get the next error to sbt package:
>
> [warn]  ::
> [warn]  ::  UNRESOLVED DEPENDENCIES ::
> [warn]  ::
> [warn]  :: org.apache.spark#spark-core_2.10.3;0.9.0-incubating: not found
> [warn]  ::
> [error]
>
> {file:/root/workspace_Spark/scala%20standalone%20app/}default-2327b2/*:update:
> sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-core_2.10.3;0.9.0-incubating: not found
> [error] Total time: 12 s, completed Feb 5, 2014 8:12:25 PM
> I don't know what  I am missing again...
> My scala -version is:
> Scala code runner version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL
>
> Thanks in advanced!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-I-am-missing-from-configuration-tp878p1246.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Stream RDD to local disk

2014-01-30 Thread Andrew Ash
Hadn't thought of calling the hadoop process from within the scala code but
that is an improvement over my current process. Thanks for the suggestion
Chris!

It still requires saving to HDFS, dumping out to a file, and then cleaning
that temp directory out of HDFS though so isn't quite my ideal process.

Sent from my mobile phone
On Jan 30, 2014 2:37 AM, "Christopher Nguyen"  wrote:

> Andrew, couldn't you do in the Scala code:
>
>   scala.sys.process.Process("hadoop fs -copyToLocal ...")!
>
> or is that still considered a second step?
>
> "hadoop fs" is almost certainly going to be better at copying these files
> than some memory-to-disk-to-memory serdes within Spark.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Thu, Jan 30, 2014 at 2:21 AM, Andrew Ash  wrote:
>
>> Hi Spark users,
>>
>> I'm often using Spark for ETL type tasks, where the input is a large file
>> on-disk and the output is another large file on-disk.  I've loaded
>> everything into HDFS, but still need to produce files out on the other side.
>>
>> Right now I produce these processed files in a 2-step process:
>>
>> 1) in a single spark job, read from HDFS location A, process, and write
>> to HDFS location B
>> 2) run hadoop fs -cat hdfs:///path/to/* > /path/tomyfile to get it onto
>> the local disk.
>>
>> It would be great to get this down to a 1-step process.
>>
>> If I run .saveAsTextFile("...") on my RDD, then the shards of the file
>> are scattered onto the local disk across the cluster.  But if I .collect()
>> on the driver and then save to disk using normal Scala disk IO utilities,
>> I'll certainly OOM the driver.
>>
>> *So the question*: is there a way to get an iterator for an RDD that I
>> can scan through the contents on the driver and flush to disk?
>>
>> I found the RDD.iterator() method but it looks to be intended for use by
>> RDD subclasses not end users (requires a Partition and TaskContext
>> parameter).  The .foreach() method executes on each worker also, rather
>> than on the driver, so would also scatter files across the cluster if I
>> saved from there.
>>
>> Any suggestions?
>>
>> Thanks!
>> Andrew
>>
>
>


Stream RDD to local disk

2014-01-30 Thread Andrew Ash
Hi Spark users,

I'm often using Spark for ETL type tasks, where the input is a large file
on-disk and the output is another large file on-disk.  I've loaded
everything into HDFS, but still need to produce files out on the other side.

Right now I produce these processed files in a 2-step process:

1) in a single spark job, read from HDFS location A, process, and write to
HDFS location B
2) run hadoop fs -cat hdfs:///path/to/* > /path/tomyfile to get it onto the
local disk.

It would be great to get this down to a 1-step process.

If I run .saveAsTextFile("...") on my RDD, then the shards of the file are
scattered onto the local disk across the cluster.  But if I .collect() on
the driver and then save to disk using normal Scala disk IO utilities, I'll
certainly OOM the driver.

*So the question*: is there a way to get an iterator for an RDD that I can
scan through the contents on the driver and flush to disk?

I found the RDD.iterator() method but it looks to be intended for use by
RDD subclasses not end users (requires a Partition and TaskContext
parameter).  The .foreach() method executes on each worker also, rather
than on the driver, so would also scatter files across the cluster if I
saved from there.

Any suggestions?

Thanks!
Andrew


Re: Please Help: Amplab Benchmark Performance

2014-01-29 Thread Andrew Ash
The biggest difference I see is that Shark stores data in a Column-oriented
form a la C-Store and Vertica, whereas Spark keeps things in row-oriented
form.  Chris pointed this out in the RDD[TablePartition] vs
RDD[Array[String]] comparison.

I'd be interested in hearing how TablePartition compares to the Parquet
format, which has been getting a lot of attention recently.
https://github.com/Parquet/parquet-format

Personally as far as performance goes, I remember once being surprised that
Shark row counting query completed much faster than the equivalent Spark,
even after I had both sitting in memory.  This was a select count(*) from
TABLE on a cached table in Spark vs a val rdd = sc.textFile(...).cache;
rdd.count; in Shark.  I attributed it to the column-oriented format at the
time but didn't dig any deeper.


On Wed, Jan 29, 2014 at 11:22 PM, Christopher Nguyen  wrote:

> Hi Chen, it's certainly correct to say it is hard to make an
> apple-to-apple comparison in terms of being able to assume that there is an
> implementation-equivalent for any given Shark query, in "Spark only".
>
> That said, I think the results of your comparisons could still be a
> valuable reference. There are scenarios where perhaps someone wants to
> consider the trade-offs between implementing some ETL operation with Shark
> or with only Spark. Some sense of performance/cost difference would be
> helpful in making that decision.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao 
> linkedin.com/in/ctnguyen
>
>
>
> On Wed, Jan 29, 2014 at 11:10 PM, Chen Jin  wrote:
>
>> Hi Christopher,
>>
>> Thanks a lot for taking time to explain some details under Shark's
>> hood. It is probably very hard to make an apple-to-apple comparison
>> between Shark and Spark since they might be suitable for different
>> types of tasks. From what you have explained, is it OK to think Shark
>> is better off for SQL-like tasks, while Spark is more for iterative
>> machine learning algorithms?
>>
>> Cheers,
>>
>> -chen
>>
>> On Wed, Jan 29, 2014 at 8:59 PM, Christopher Nguyen 
>> wrote:
>> > Chen, interesting comparisons you're trying to make. It would be great
>> to
>> > share this somewhere when you're done.
>> >
>> > Some suggestions of non-obvious things to consider:
>> >
>> > In general there are any number of differences between Shark and some
>> > "equivalent" Spark implementation of the same query.
>> >
>> > Shark isn't necessarily what we may think of as "let's see which lines
>> of
>> > code accomplish the same thing in Spark". Its current implementation is
>> > based on Hive which has its own query planning, optimization, and
>> execution.
>> > Shark's code has some of its own tricks. You can use "EXPLAIN" to see
>> > Shark's execution plan, and compare to your Spark approach.
>> >
>> > Further Shark has its own memory storage format, e.g.,
>> typed-column-oriented
>> > RDD[TablePartition], that can make it more memory-efficient, and help
>> > execute many column aggregation queries a lot faster than the
>> row-oriented
>> > RDD[Array[String]] you may be using.
>> >
>> > In short, Shark does a number of things that are smarter and more
>> optimized
>> > for SQL queries than a straightforward Spark RDD implementation of the
>> same.
>> > --
>> > Christopher T. Nguyen
>> > Co-founder & CEO, Adatao
>> > linkedin.com/in/ctnguyen
>> >
>> >
>> >
>> > On Wed, Jan 29, 2014 at 8:10 PM, Chen Jin  wrote:
>> >>
>> >> Hi All,
>> >>
>> >> https://amplab.cs.berkeley.edu/benchmark/ has given a nice benchmark
>> >> report. I am trying to reproduce the same set of queries in the
>> >> spark-shell so that we can understand more about shark and spark and
>> >> their performance on EC2.
>> >>
>> >> As for the Aggregation Query when X=8,  Shark-disk takes 210 seconds
>> >> and Shark-mem takes 111 seconds. However, when I materialize the
>> >> results to the disk, spark-shell takes more than 5 minutes
>> >> (reduceByKey is used in the shell for aggregation) . Further, if I
>> >> cache uservisits RDD, since the dataset is way too big, the
>> >> performance deteriorates quite a lot.
>> >>
>> >> Can anybody shed some light on why there is a more than 2x difference
>> >> between shark-disk and spark-shell-disk and how to cache data in spark
>> >> correctly such that we can achieve comparable performance as
>> >> shark-mem?
>> >>
>> >> Thank you very much,
>> >>
>> >> -chen
>> >
>> >
>>
>
>


Re: Exception in serialization hangs saving-to-disk

2014-01-28 Thread Andrew Ash
Are you able to get a copy of the exception you refer to?


On Tue, Jan 28, 2014 at 2:26 AM, Ionized  wrote:

> I noticed that running the following code results in the process hanging
> forever waiting for the Job to complete.
> It seems the exception never propagates to the caller.
>
> Should a bug be filed on this?
>
> - Paul
>
>
>
> import java.io.IOException;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutputStream;
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
>
> public class SparkSerializationTest {
>
> public static void main(String[] args) {
> JavaSparkContext context = new JavaSparkContext("local[3]", "test");
>  List list = new ArrayList<>();
> list.add(new MyObject());
> JavaRDD rdd = context.parallelize(list);
>  rdd.saveAsObjectFile("/tmp/sparkserializationtest");
> }
>
> private static final class MyObject implements Serializable {
>
> private static final long serialVersionUID = 1L;
>
> private void readObject(ObjectInputStream in) throws IOException,
>  ClassNotFoundException {
> }
>
> private void writeObject(ObjectOutputStream out) throws IOException {
>  throw new RuntimeException();
> }
>
> }
> }
>
>


Re: Running spark driver inside a servlet

2014-01-24 Thread Andrew Ash
Can you paste the exception you're seeing?

Sent from my mobile phone
On Jan 24, 2014 2:36 PM, "Kapil Malik"  wrote:

>  Hi all,
>
>
>
> Is it possible to create a Spark Context (i.e. the driver program) from a
> servlet deployed on some application server ?
>
> I am able to run spark Java driver successfully via maven / standalone
> (after specifying the classpath), but when I bundle spark libraries in a
> JAR, along with my servlet (using maven shade plugin), it gives me security
> exception. Any suggestions?
>
>
>
> Thanks and regards,
>
>
>
> Kapil Malik
>
>
>


Re: How to create RDD over hashmap?

2014-01-24 Thread Andrew Ash
By my reading of the code, it uses the partitioner to decide which worker
the key lands on, then does an O(N) scan of that partition.  I think we're
saying the same thing.

https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L549


On Fri, Jan 24, 2014 at 1:26 PM, Cheng Lian  wrote:

> PairRDDFunctions.lookup is good enough in Spark, it's just that its time
> complexity is O(N).  Of course, for RDDs equipped with a partitioner, N is
> the average size of a partition.
>
>
> On Sat, Jan 25, 2014 at 5:16 AM, Andrew Ash  wrote:
>
>> If you have a pair RDD (an RDD[A,B]) then you can use the .lookup()
>> method on it for faster access.
>>
>>
>> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions
>>
>> Spark's strength is running computations across a large set of data.  If
>> you're trying to do fast lookup of a few individual keys, I'd recommend
>> something more like memcached or Elasticsearch.
>>
>>
>> On Fri, Jan 24, 2014 at 1:11 PM, Manoj Samel wrote:
>>
>>> Yes, that works.
>>>
>>> But then the hashmap functionality of the fast key lookup etc. is gone
>>> and the search will be linear using a iterator etc. Not sure if Spark
>>> internally creates additional optimizations for Seq but otherwise one has
>>> to assume this becomes a List/Array without a fast key lookup of a hashmap
>>> or b-tree
>>>
>>> Any thoughts ?
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 24, 2014 at 1:00 PM, Frank Austin Nothaft <
>>> fnoth...@berkeley.edu> wrote:
>>>
>>>> Manoj,
>>>>
>>>> I assume you’re trying to create an RDD[(String, Double)]? Couldn’t you
>>>> just do:
>>>>
>>>> val cr_rdd = sc.parallelize(cr.toSeq)
>>>>
>>>> The toSeq would convert the HashMap[String,Double] into a Seq[(String,
>>>> Double)] before calling the parallelize function.
>>>>
>>>> Regards,
>>>>
>>>> Frank Austin Nothaft
>>>> fnoth...@berkeley.edu
>>>> fnoth...@eecs.berkeley.edu
>>>> 202-340-0466
>>>>
>>>> On Jan 24, 2014, at 12:56 PM, Manoj Samel 
>>>> wrote:
>>>>
>>>> > Is there a way to create RDD over a hashmap ?
>>>> >
>>>> > If I have a hash map and try sc.parallelize, it gives
>>>> >
>>>> > :17: error: type mismatch;
>>>> >  found   : scala.collection.mutable.HashMap[String,Double]
>>>> >  required: Seq[?]
>>>> > Error occurred in an application involving default arguments.
>>>> >val cr_rdd = sc.parallelize(cr)
>>>> >^
>>>>
>>>>
>>>
>>
>


Re: How to create RDD over hashmap?

2014-01-24 Thread Andrew Ash
If you have a pair RDD (an RDD[A,B]) then you can use the .lookup() method
on it for faster access.

http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions

Spark's strength is running computations across a large set of data.  If
you're trying to do fast lookup of a few individual keys, I'd recommend
something more like memcached or Elasticsearch.


On Fri, Jan 24, 2014 at 1:11 PM, Manoj Samel wrote:

> Yes, that works.
>
> But then the hashmap functionality of the fast key lookup etc. is gone and
> the search will be linear using a iterator etc. Not sure if Spark
> internally creates additional optimizations for Seq but otherwise one has
> to assume this becomes a List/Array without a fast key lookup of a hashmap
> or b-tree
>
> Any thoughts ?
>
>
>
>
>
> On Fri, Jan 24, 2014 at 1:00 PM, Frank Austin Nothaft <
> fnoth...@berkeley.edu> wrote:
>
>> Manoj,
>>
>> I assume you’re trying to create an RDD[(String, Double)]? Couldn’t you
>> just do:
>>
>> val cr_rdd = sc.parallelize(cr.toSeq)
>>
>> The toSeq would convert the HashMap[String,Double] into a Seq[(String,
>> Double)] before calling the parallelize function.
>>
>> Regards,
>>
>> Frank Austin Nothaft
>> fnoth...@berkeley.edu
>> fnoth...@eecs.berkeley.edu
>> 202-340-0466
>>
>> On Jan 24, 2014, at 12:56 PM, Manoj Samel 
>> wrote:
>>
>> > Is there a way to create RDD over a hashmap ?
>> >
>> > If I have a hash map and try sc.parallelize, it gives
>> >
>> > :17: error: type mismatch;
>> >  found   : scala.collection.mutable.HashMap[String,Double]
>> >  required: Seq[?]
>> > Error occurred in an application involving default arguments.
>> >val cr_rdd = sc.parallelize(cr)
>> >^
>>
>>
>


Re: How to create RDD over hashmap?

2014-01-24 Thread Andrew Ash
In Java you'd want to convert it to an entry set, which is a set of (key,
value) pairs from the hashmap.  The closest I can see in scaladoc is the
.iterator method -- try that?


On Fri, Jan 24, 2014 at 12:56 PM, Manoj Samel wrote:

> Is there a way to create RDD over a hashmap ?
>
> If I have a hash map and try sc.parallelize, it gives
>
> :17: error: type mismatch;
>  found   : scala.collection.mutable.HashMap[String,Double]
>  required: Seq[?]
> Error occurred in an application involving default arguments.
>val cr_rdd = sc.parallelize(cr)
>^
>


Re: .intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
Probably makes sense to discuss this on GitHub next to the code instead,
but I agree that fanciness like Bloom filters would be appreciated.

I've got the cogroup proposal working with tests here:
https://github.com/apache/incubator-spark/pull/506


On Fri, Jan 24, 2014 at 12:09 AM, Josh Rosen  wrote:

> For cases where you expect the intersection to be small, we might be able
> to use Bloom filters to prune out tuples that can't possibly be in the
> intersection in order to reduce the amount of data that we need to shuffle.
>
> Implementation-wise, this would run into similar issues as sortByKey()
> with respect to laziness since it would involve an action to collect and
> broadcast the Bloom filters.
>
>
> On Thu, Jan 23, 2014 at 10:58 PM, Matei Zaharia 
> wrote:
>
>> I know some other places used null; haven’t seen None but it might exist.
>>
>> Join actually uses cogroup internally right now so it will be at least as
>> slow as that, but the problem is that it will generate lots of pairs of
>> objects if there are multiple items in both datasets with the same key
>> (unlikely if you really are using them as sets, but could happen).
>>
>> Matei
>>
>> On Jan 23, 2014, at 10:50 PM, Evan Sparks  wrote:
>>
>> If the intersection is really big, would join be better?
>>
>> Agreed on "null" vs None -but how frequent is this in the current
>> codebase?
>>
>> On Jan 23, 2014, at 10:38 PM, Matei Zaharia 
>> wrote:
>>
>> You’d have to add a filter after the cogroup too. Cogroup gives you (key,
>> (list of values in RDD 1, list in RDD 2)).
>>
>> Also one small thing, instead of setting the value to None, it may be
>> cheaper to use null.
>>
>> Matei
>>
>> On Jan 23, 2014, at 10:30 PM, Andrew Ash  wrote:
>>
>> You mean cogroup like this?
>>
>> A.map(v => (v,None)).cogroup(B.map(v => (v,None))).keys
>>
>> If so I might send a PR to start code review for getting this into master.
>>
>> Good to know about the strategy for sharding RDDs and for the core
>> operations.
>>
>> Thanks!
>> Andrew
>>
>>
>> On Thu, Jan 23, 2014 at 11:17 PM, Matei Zaharia 
>> wrote:
>>
>>> Using cogroup would probably be slightly more efficient than join
>>> because you don’t have to generate every pair of keys for elements that
>>> occur in each dataset multiple times.
>>>
>>> We haven’t tried to explicitly separate the API between “core” methods
>>> and others, but in practice, everything can be built on mapPartitions and
>>> cogroup for transformations, and SparkContext.runJob (internal method) for
>>> actions. What really matters is actually the level at which the code sees
>>> dependencies in the DAGScheduler, which is done through the Dependency
>>> class. There are only two types of dependencies (narrow and shuffle), which
>>> correspond to those operations above. So in a sense there is this
>>> separation at the lowest level. But for the levels above, the goal was
>>> first and foremost to make the API as usable as possible, which meant
>>> giving people quick access to all the operations that might be useful, and
>>> dealing with how we’ll implement those later. Over time it will be possible
>>> to divide things like RDD.scala into multiple traits if they become
>>> unwieldy.
>>>
>>> Matei
>>>
>>>
>>> On Jan 23, 2014, at 9:40 PM, Andrew Ash  wrote:
>>>
>>> And I think the followup to Ian's question:
>>>
>>> Is there a way to implement .intersect() in the core API that's more
>>> efficient than the .join() method Evan suggested?
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Jan 23, 2014 at 10:26 PM, Ian O'Connell wrote:
>>>
>>>> Is there any separation in the API between functions that can be built
>>>> solely on the existing exposed public API and ones which require access to
>>>> internals?
>>>>
>>>> Just to maybe avoid bloat for composite functions like this that are
>>>> for user convenience?
>>>>
>>>> (Ala something like lua's aux api vs core api?)
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 8:33 PM, Matei Zaharia >>> > wrote:
>>>>
>>>>> I’d be happy to see this added to the core API.
>>>>>
>>>>> Matei
>>>>>
>>>>> On Jan 23, 2014, at 5:39 PM, Andrew Ash  wrote:
>>>>&

Re: .intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
You mean cogroup like this?

A.map(v => (v,None)).cogroup(B.map(v => (v,None))).keys

If so I might send a PR to start code review for getting this into master.

Good to know about the strategy for sharding RDDs and for the core
operations.

Thanks!
Andrew


On Thu, Jan 23, 2014 at 11:17 PM, Matei Zaharia wrote:

> Using cogroup would probably be slightly more efficient than join because
> you don’t have to generate every pair of keys for elements that occur in
> each dataset multiple times.
>
> We haven’t tried to explicitly separate the API between “core” methods and
> others, but in practice, everything can be built on mapPartitions and
> cogroup for transformations, and SparkContext.runJob (internal method) for
> actions. What really matters is actually the level at which the code sees
> dependencies in the DAGScheduler, which is done through the Dependency
> class. There are only two types of dependencies (narrow and shuffle), which
> correspond to those operations above. So in a sense there is this
> separation at the lowest level. But for the levels above, the goal was
> first and foremost to make the API as usable as possible, which meant
> giving people quick access to all the operations that might be useful, and
> dealing with how we’ll implement those later. Over time it will be possible
> to divide things like RDD.scala into multiple traits if they become
> unwieldy.
>
> Matei
>
>
> On Jan 23, 2014, at 9:40 PM, Andrew Ash  wrote:
>
> And I think the followup to Ian's question:
>
> Is there a way to implement .intersect() in the core API that's more
> efficient than the .join() method Evan suggested?
>
> Andrew
>
>
> On Thu, Jan 23, 2014 at 10:26 PM, Ian O'Connell wrote:
>
>> Is there any separation in the API between functions that can be built
>> solely on the existing exposed public API and ones which require access to
>> internals?
>>
>> Just to maybe avoid bloat for composite functions like this that are for
>> user convenience?
>>
>> (Ala something like lua's aux api vs core api?)
>>
>>
>> On Thu, Jan 23, 2014 at 8:33 PM, Matei Zaharia 
>> wrote:
>>
>>> I’d be happy to see this added to the core API.
>>>
>>> Matei
>>>
>>> On Jan 23, 2014, at 5:39 PM, Andrew Ash  wrote:
>>>
>>> Ah right of course -- perils of typing code without running it!
>>>
>>> It feels like this is a pretty core operation that should be added to
>>> the main RDD API.  Do other people not run into this often?
>>>
>>> When I'm validating a foreign key join in my cluster I often check to
>>> make sure that the foreign keys land on valid values on the referenced
>>> table, and the way I do that is checking to see what percentage of the
>>> references actually land.
>>>
>>>
>>> On Thu, Jan 23, 2014 at 6:36 PM, Evan R. Sparks 
>>> wrote:
>>>
>>>> Yup (well, with _._1 at the end!)
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 5:28 PM, Andrew Ash wrote:
>>>>
>>>>> You're thinking like this?
>>>>>
>>>>> A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2)
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks >>>> > wrote:
>>>>>
>>>>>> You could map each to an RDD[(String,None)] and do a join.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash wrote:
>>>>>>
>>>>>>> Hi spark users,
>>>>>>>
>>>>>>> I recently wanted to calculate the set intersection of two RDDs of
>>>>>>> Strings.  I couldn't find a .intersection() method in the autocomplete 
>>>>>>> or
>>>>>>> in the Scala API docs, so used a little set theory to end up with this:
>>>>>>>
>>>>>>> lazy val A = ...
>>>>>>> lazy val B = ...
>>>>>>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))
>>>>>>>
>>>>>>> Which feels very cumbersome.
>>>>>>>
>>>>>>> Does anyone have a more idiomatic way to calculate intersection?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Andrew
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>


Re: .intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
And I think the followup to Ian's question:

Is there a way to implement .intersect() in the core API that's more
efficient than the .join() method Evan suggested?

Andrew


On Thu, Jan 23, 2014 at 10:26 PM, Ian O'Connell  wrote:

> Is there any separation in the API between functions that can be built
> solely on the existing exposed public API and ones which require access to
> internals?
>
> Just to maybe avoid bloat for composite functions like this that are for
> user convenience?
>
> (Ala something like lua's aux api vs core api?)
>
>
> On Thu, Jan 23, 2014 at 8:33 PM, Matei Zaharia wrote:
>
>> I’d be happy to see this added to the core API.
>>
>> Matei
>>
>> On Jan 23, 2014, at 5:39 PM, Andrew Ash  wrote:
>>
>> Ah right of course -- perils of typing code without running it!
>>
>> It feels like this is a pretty core operation that should be added to the
>> main RDD API.  Do other people not run into this often?
>>
>> When I'm validating a foreign key join in my cluster I often check to
>> make sure that the foreign keys land on valid values on the referenced
>> table, and the way I do that is checking to see what percentage of the
>> references actually land.
>>
>>
>> On Thu, Jan 23, 2014 at 6:36 PM, Evan R. Sparks wrote:
>>
>>> Yup (well, with _._1 at the end!)
>>>
>>>
>>> On Thu, Jan 23, 2014 at 5:28 PM, Andrew Ash wrote:
>>>
>>>> You're thinking like this?
>>>>
>>>> A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2)
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks 
>>>> wrote:
>>>>
>>>>> You could map each to an RDD[(String,None)] and do a join.
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash wrote:
>>>>>
>>>>>> Hi spark users,
>>>>>>
>>>>>> I recently wanted to calculate the set intersection of two RDDs of
>>>>>> Strings.  I couldn't find a .intersection() method in the autocomplete or
>>>>>> in the Scala API docs, so used a little set theory to end up with this:
>>>>>>
>>>>>> lazy val A = ...
>>>>>> lazy val B = ...
>>>>>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))
>>>>>>
>>>>>> Which feels very cumbersome.
>>>>>>
>>>>>> Does anyone have a more idiomatic way to calculate intersection?
>>>>>>
>>>>>> Thanks!
>>>>>> Andrew
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>


Re: .intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
Ah right of course -- perils of typing code without running it!

It feels like this is a pretty core operation that should be added to the
main RDD API.  Do other people not run into this often?

When I'm validating a foreign key join in my cluster I often check to make
sure that the foreign keys land on valid values on the referenced table,
and the way I do that is checking to see what percentage of the references
actually land.


On Thu, Jan 23, 2014 at 6:36 PM, Evan R. Sparks wrote:

> Yup (well, with _._1 at the end!)
>
>
> On Thu, Jan 23, 2014 at 5:28 PM, Andrew Ash  wrote:
>
>> You're thinking like this?
>>
>> A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2)
>>
>>
>> On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks wrote:
>>
>>> You could map each to an RDD[(String,None)] and do a join.
>>>
>>>
>>> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash wrote:
>>>
>>>> Hi spark users,
>>>>
>>>> I recently wanted to calculate the set intersection of two RDDs of
>>>> Strings.  I couldn't find a .intersection() method in the autocomplete or
>>>> in the Scala API docs, so used a little set theory to end up with this:
>>>>
>>>> lazy val A = ...
>>>> lazy val B = ...
>>>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))
>>>>
>>>> Which feels very cumbersome.
>>>>
>>>> Does anyone have a more idiomatic way to calculate intersection?
>>>>
>>>> Thanks!
>>>> Andrew
>>>>
>>>
>>>
>>
>


Re: .intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
You're thinking like this?

A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2)


On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks wrote:

> You could map each to an RDD[(String,None)] and do a join.
>
>
> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash  wrote:
>
>> Hi spark users,
>>
>> I recently wanted to calculate the set intersection of two RDDs of
>> Strings.  I couldn't find a .intersection() method in the autocomplete or
>> in the Scala API docs, so used a little set theory to end up with this:
>>
>> lazy val A = ...
>> lazy val B = ...
>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))
>>
>> Which feels very cumbersome.
>>
>> Does anyone have a more idiomatic way to calculate intersection?
>>
>> Thanks!
>> Andrew
>>
>
>


.intersection() method on RDDs?

2014-01-23 Thread Andrew Ash
Hi spark users,

I recently wanted to calculate the set intersection of two RDDs of Strings.
 I couldn't find a .intersection() method in the autocomplete or in the
Scala API docs, so used a little set theory to end up with this:

lazy val A = ...
lazy val B = ...
A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))

Which feels very cumbersome.

Does anyone have a more idiomatic way to calculate intersection?

Thanks!
Andrew


Re: Handling occasional bad data ...

2014-01-22 Thread Andrew Ash
Why can't you preprocess to filter out the bad rows?  I often do this on
CSV files by testing if the raw line is "parseable" before splitting on ","
or similar.  Just validate the line before attempting to apply BigDecimal
or anything like that.

Cheers,
Andrew


On Wed, Jan 22, 2014 at 9:04 PM, Manoj Samel wrote:

> Hi,
>
> How does spark handles following case?
>
> Thousands of CSV files (each with 50MB size) comes from external system.
> One RDD is defined on all of these. RDD defines some of the CSV fields as
> BigDecimal etc. When building the RDD, it errors out saying bad BigDecimal
> format after some time (error shows max retries 4).
>
> 1) It is very likely that massive dataset will have occasional bad rows.
> It is not possible to fix this data set or do a pre-processing on it to
> eliminate bad data. How does spark handles it? Is it possible to say ignore
> first N bad rows etc. ?
>
> 2) What was the max 4 retries in error message? Any way to control it?
>
> Thanks,
>
>
>


Re: spark.default.parallelism

2014-01-21 Thread Andrew Ash
https://github.com/apache/incubator-spark/pull/489


On Tue, Jan 21, 2014 at 3:41 PM, Ognen Duzlevski <
og...@plainvanillagames.com> wrote:

> On Tue, Jan 21, 2014 at 10:37 PM, Andrew Ash  wrote:
>
>> Documentation suggestion:
>>
>> Default number of tasks to use *across the cluster* for distributed
>> shuffle operations (groupByKey, reduceByKey,
>> etc) when not set by user.
>>
>> Ognen would that have clarified for you?
>>
>
> Of course :)
>
> Thanks!
> Ognen
>


Re: spark.default.parallelism

2014-01-21 Thread Andrew Ash
Documentation suggestion:

Default number of tasks to use *across the cluster* for distributed shuffle
operations (groupByKey, reduceByKey, etc) when
not set by user.

Ognen would that have clarified for you?


On Tue, Jan 21, 2014 at 3:35 PM, Matei Zaharia wrote:

> It’s just 4 over the whole cluster.
>
> Matei
>
> On Jan 21, 2014, at 2:27 PM, Ognen Duzlevski 
> wrote:
>
> This is what docs/configuration.md says about the property:
> " Default number of tasks to use for distributed shuffle operations
> (groupByKey,
> reduceByKey, etc) when not set by user.
> "
>
> If I set this property to, let's say, 4 - what does this mean? 4 tasks per
> core, per worker, per...? :)
>
> Thanks!
> Ognen
>
>
>


Re: FileNotFoundException on distinct()?

2014-01-20 Thread Andrew Ash
Also you will need to bounce the spark services from a new ssh session to
make the ulimit changes take effect (if you changed the value in
/etc/limits)

Sent from my mobile phone
On Jan 20, 2014 5:32 PM, "Jey Kottalam"  wrote:

> Can you try ulimit -n to make sure the increased limit has taken effect?
>
> On Monday, January 20, 2014, Ryan Compton  wrote:
>
>> I've got
>>
>> System.setProperty("spark.shuffle.consolidate.files", "true");
>>
>> but I'm getting the same error.
>>
>> The output of the distinct count will be 101,230,940 (I did it in
>> pig). I've got 13 nodes and each node allows 13,069,279 open files. So
>> even with 1 record per file I think I've got enough. But what do the
>> rest of you have for /proc/sys/fs/file-max?
>>
>> On Sun, Jan 19, 2014 at 5:12 PM, Mark Hamstra 
>> wrote:
>> > You should try setting spark.shuffle.consolidate.files to true.
>> >
>> >
>> > On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton 
>> > wrote:
>> >>
>> >> I think I've shuffled this data before (I often join on it), and I
>> >> know I was using distinct() in 0.7.3 for the same computation.
>> >>
>> >> What do people usually have in  /proc/sys/fs/file-max? I'm real
>> >> surprised that 13M isn't enough.
>> >>
>> >> On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <
>> m...@clearstorydata.com>
>> >> wrote:
>> >> > distinct() needs to do a shuffle -- which is resulting in the need to
>> >> > materialize the map outputs as files.  count() doesn't.
>> >> >
>> >> >
>> >> > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <
>> compton.r...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> I'm able to open ~13M files. I expect the output of
>> >> >> .distinct().count() to be under 100M, why do I need so many files
>> >> >> open?
>> >> >>
>> >> >> rfcompton@node19 ~> cat /etc/redhat-release
>> >> >> CentOS release 5.7 (Final)
>> >> >> rfcompton@node19 ~> cat /proc/sys/fs/file-max
>> >> >> 13069279
>> >> >>
>> >> >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam 
>> >> >> wrote:
>> >> >> > The "too many open files" error is due to running out of available
>> >> >> > FDs, usually due to a limit set in the OS.
>> >> >> >
>> >> >> > The fix will depend on your specific OS, but under Linux it
>> usually
>> >> >> > involves the "fs.file-max" syctl.
>> >> >> >
>> >> >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton
>> >> >> > 
>> >> >> > wrote:
>> >> >> >> When I try .distinct() my jobs fail. Possibly related:
>> >> >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>> >> >> >>
>> >> >> >> This works
>> >> >> >>
>> >> >> >> //get the node ids
>> >> >> >> val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >> >> //count the nodes
>> >> >> >> val numNodes = nodes.count()
>> >> >> >> logWarning("numNodes:\t"+numNodes)
>> >> >> >>
>> >> >> >> this fails
>> >> >> >>
>> >> >> >> //get the node ids
>> >> >> >> val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >> >> //count the nodes
>> >> >> >> val numNodes = nodes.distinct().count()
>> >> >> >> logWarning("numNodes:\t"+numNodes)
>> >> >> >>
>> >> >> >> with these stacktraces:
>> >> >> >>
>> >> >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges:
>> >> >> >> 915189977
>> >> >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from
>> >> >> >> persistence
>> >> >> >> list
>> >> >> >> --
>> >> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was
>> due
>> >> >> >> to
>> >> >> >> java.io.IOException
>> >> >> >> java.io.IOException: Filesystem closed
>> >> >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
>> >> >> >> at
>> org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
>> >> >> >> at
>>
>


Re: Worker hangs with 100% CPU in Standalone cluster

2014-01-16 Thread Andrew Ash
It sounds like the takeaway is that if you're using custom classes, you
need to make sure that their hashCode() and equals() methods are
value-based?


On Thu, Jan 16, 2014 at 12:08 PM, Patrick Wendell wrote:

> Thanks for following up and explaining this one! Definitely something
> other users might run into...
>
>
> On Thu, Jan 16, 2014 at 5:58 AM, Grega Kešpret  wrote:
>
>> Just to follow up, we have since pinpointed the problem to be in
>> application code (not Spark). In some cases, there was an infinite loop in
>> Scala HashTable linear probing algorithm, where an element's next() pointed
>> at itself. It was probably caused by wrong hashCode() and equals() methods
>> on the object we were storing.
>>
>> Milos, we also have Master node separate from Worker nodes. Could someone
>> from Spark team comment about that?
>>
>> Grega
>> --
>> [image: Inline image 1]
>> *Grega Kešpret*
>> Analytics engineer
>>
>> Celtra — Rich Media Mobile Advertising
>> celtra.com  | 
>> @celtramobile
>>
>>
>> On Thu, Jan 16, 2014 at 2:46 PM, Milos Nikolic > > wrote:
>>
>>> Hello,
>>>
>>> I’m facing the same (or similar) problem. In my case, the last two tasks
>>> hang in a map function following sc.sequenceFile(…). It happens from time
>>> to time (more often with TorrentBroadcast than HttpBroadcast) and after
>>> restarting it works fine.
>>>
>>> The problem always happens on the same node — on the node that plays the
>>> roles of the master and one worker. Once this node becomes master-only
>>> (i.e., I removed this nodes from conf/slaves), the problem is gone.
>>>
>>> Does that mean that the master and workers have to be on separate nodes?
>>>
>>> Best,
>>> Milos
>>>
>>>
>>> On Jan 6, 2014, at 5:44 PM, Grega Kešpret  wrote:
>>>
>>> Hi,
>>>
>>> we are seeing several times a day one worker in a Standalone cluster
>>> hang up with 100% CPU at the last task and doesn't proceed. After we
>>> restart the job, it completes successfully.
>>>
>>> We are using Spark v0.8.1-incubating.
>>>
>>> Attached please find jstack logs of Worker
>>> and CoarseGrainedExecutorBackend JVM processes.
>>>
>>> Grega
>>> --
>>> 
>>> *Grega Kešpret*
>>> Analytics engineer
>>>
>>> Celtra — Rich Media Mobile Advertising
>>> celtra.com  | 
>>> @celtramobile
>>>  
>>>
>>>
>>>
>>
>
<>

Re: Help needed. Not sure how to reduceByKey works in spark

2014-01-10 Thread Andrew Ash
So for each (col2, col3) pair, you want the difference between the earliest
col1 value and the latest col1 value?

I'd suggest something like this:

val data = sc.textFile(...).map(l => l.split("\t"))
data.map(r => ((r(1), r(2)), r(0)) // produce an RDD of ((col2, col3), col1)
.groupByKey() // now have ((col2, col3) [col1s])
.map(p => (p._1, (max(p._2) - min(p._2 // now have ((col2, col3),
diffInCol1s)

The downside of this approach is that if you have a (col2, col3) pair with
tons of col1 values, you might OOM one of your executors in the groupByKey.

Andrew


On Fri, Jan 10, 2014 at 11:01 AM, suman bharadwaj wrote:

> Hi,
>
> I'm new to spark. And i needed some help in understanding how reduceByKey
> works.
>
> I have the following data:
>
> col1col2   col3
> 1/11/2014 12:18:40 AM123 143
> 1/11/2014 12:18:45 AM123 143
> 1/11/2014 12:18:49 AM123 143
>
> the output i need is
>
> col2  col3totaltime(currect value of col1 - prev val of col1)
> 123   1439
>
> I'm doing the following:
>
> map((col2,col3),col1).reduceByKey( * subtraction of dates > *)
>
> How to perform subtraction of dates ?
> How does reduceByKey work when my map emits as follows
> ((col2,col3),(col1,col4))?
>
>
> Thanks in advance.
>


Re: Why does sortByKey launch cluster job?

2014-01-09 Thread Andrew Ash
I filed it and submitted the PR that Josh suggested:

https://spark-project.atlassian.net/browse/SPARK-1021
https://github.com/apache/incubator-spark/pull/379


On Wed, Jan 8, 2014 at 9:56 AM, Andrew Ash  wrote:

> And at the moment we should use the atlassian.net Jira instance, not the
> apache.org one?  The apache one looks empty.
>
> https://spark-project.atlassian.net/browse/SPARK
> https://issues.apache.org/jira/browse/SPARK
>
>
> On Wed, Jan 8, 2014 at 9:04 AM, Aaron Davidson  wrote:
>
>> Feel free to always file official bugs in Jira, as long as it's not
>> already there!
>>
>>
>> On Tue, Jan 7, 2014 at 9:47 PM, Andrew Ash  wrote:
>>
>>> Hi Josh,
>>>
>>> I just ran into this again myself and noticed that the source hasn't
>>> changed since we discussed in December.  Should I file an official bug in
>>> Jira?
>>>
>>> Andrew
>>>
>>>
>>> On Tue, Dec 10, 2013 at 8:34 AM, Josh Rosen wrote:
>>>
>>>> I wonder whether making RangePartitoner .rangeBounds into a lazy val
>>>> would fix this (
>>>> https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
>>>>  We'd need to make sure that rangeBounds() is never called before an action
>>>> is performed.  This could be tricky because it's called in the
>>>> RangePartitioner.equals() method.  Maybe it's sufficient to just compare
>>>> the number of partitions, the ids of the RDDs used to create the
>>>> RangePartitioner, and the sort ordering.  This still supports the case
>>>> where I range-partition one RDD and pass the same partitioner to a
>>>> different RDD.  It breaks support for the case where two range partitioners
>>>> created on different RDDs happened to have the same rangeBounds(), but it
>>>> seems unlikely that this would really harm performance since it's probably
>>>> unlikely that the range partitioners are equal by chance.
>>>>
>>>>
>>>> On Tue, Dec 10, 2013 at 8:18 AM, Ryan Prenger wrote:
>>>>
>>>>> Thanks for the responses!  I agree that b seems like it would be
>>>>> better.  I could imagine optimizations that could be made if a filter call
>>>>> came after the sortByKey that would make the initial partitioning
>>>>> sub-optimal.  Plus this way, it's a pain to use in the REPL.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Ryan
>>>>>
>>>>>
>>>>> On Tue, Dec 10, 2013 at 7:06 AM, Andrew Ash wrote:
>>>>>
>>>>>> Since sortByKey() invokes those right now, we should either a) change
>>>>>> the documentation to treat note that it kicks off actions or b) change 
>>>>>> the
>>>>>> method to execute those things lazily.
>>>>>>
>>>>>> Personally I'd prefer b but don't know how difficult that would be.
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 10, 2013 at 1:52 AM, Jason Lenderman <
>>>>>> jslender...@gmail.com> wrote:
>>>>>>
>>>>>>> Hey Ryan,
>>>>>>>
>>>>>>> The *sortByKey* method creates a *RangePartitioner* (see
>>>>>>> Partitioner.scala), and the initialization code of the
>>>>>>> *RangePartitioner* invokes actions *count* and *sample*.
>>>>>>>
>>>>>>>
>>>>>>> Jason
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 9, 2013 at 7:01 PM, Ryan Prenger 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> sortByKey is listed as a data transformation, not an action, yet it
>>>>>>>> launches a job.  This doesn't seem to square with the documentation.
>>>>>>>>
>>>>>>>> Ryan
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: PLEASE HELP: ./shark-withinfo not connecting to spark master

2014-01-09 Thread Andrew Ash
I haven't seen that particular one before, but Shark only works with its
bundled version of Hive-0.9.0, not any other version.  The reason is Shark
had to make some patches in Hive 0.9.0 so it's not vanilla 0.9.0, but
moving Shark to later versions of Hive takes some dev work that's not quite
landed yet.

Make sure you're using hive-0.9.0-shark-0.8.0 and don't have any other hive
jars on your classpath anywhere.


On Thu, Jan 9, 2014 at 5:24 PM, danoomistmatiste wrote:

> Andrew, Thank you very much.  That is exactly what I did,  I downloaded
> 0.8.0
> of spark, rebuilt it and now I am able to connect to spark successfully.  I
> am however running into another issue when trying to run commands from the
> shark shell,
>
> a simple show tables; command gives me this error.  I have configured
> hive-default.xml and placed it in the hive directory packaged with shark.
>
>
>  > show tables;
> java.lang.NoSuchFieldError: METASTORE_MODE
> at
>
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:110)
> at
>
> org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2092)
> at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2102)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1076)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1065)
> at
> org.apache.hadoop.hive.ql.exec.DDLTask.showTables(DDLTask.java:1992)
> at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:323)
> at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:134)
> at
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1312)
> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1104)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:937)
> at shark.SharkCliDriver.processCmd(SharkCliDriver.scala:294)
> at
> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:406)
> at
> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:341)
> at shark.SharkCliDriver$.main(SharkCliDriver.scala:203)
> at shark.SharkCliDriver.main(SharkCliDriver.scala)
> FAILED: Execution Error, return code -101 from
> org.apache.hadoop.hive.ql.exec.DDLTask
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PLEASE-HELP-shark-withinfo-not-connecting-to-spark-master-tp419p426.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: PLEASE HELP: ./shark-withinfo not connecting to spark master

2014-01-09 Thread Andrew Ash
Hello,

Shark doesn't have a matching version to the recent Spark 0.8.1 release
yet.  If you want to run Shark, you'll need to stick with Spark 0.8.0 for
the moment until Shark 0.8.1 is released.  I'd guess dropping back on that
version would fix your problems.

Andrew


On Thu, Jan 9, 2014 at 1:23 PM, danoomistmatiste wrote:

> Hi,  I have posted this query a couple of times but not received any
> responses.
>
> I have the following components installed and running for spark,
>
> scala-2.9.3
> spark-0.8.1-incubating-bin-cdh4
>
> I am able to start the spark master (running on port 7077) and one worker.
> I have also installed shark (shark-0.8.0-bin-cdh4).  I have set the
> following in my shark-env.sh
>
> export HADOOP_HOME=/Users/hadoop/hadoop-2.0.0-cdh4.2.0
> export
> HIVE_HOME=/Users/hadoop/shark-0.8.0-bin-cdh4/hive-0.9.0-shark-0.8.0-bin
> export MASTER=spark://localhost:7077
> export SPARK_HOME=/Users/hadoop/spark-0.8.1-incubating-bin-cdh4
> export SPARK_MEM=1g
> export SCALA_HOME=/Users/hadoop/scala-2.9.3
>
> However, when I try to run the shark shell with ./shark-withinfo, I get the
> following exception (buried within a lot of other info messages)
>
> 14/01/08 15:32:28 ERROR client.Client$ClientActor: Connection to master
> failed; stopping client
> 14/01/08 15:32:28 ERROR cluster.SparkDeploySchedulerBackend: Disconnected
> from Spark cluster!
> 14/01/08 15:32:28 ERROR cluster.ClusterScheduler: Exiting due to error from
> cluster scheduler: Disconnected from Spark cluster
>
> Anyone run into this issue before?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PLEASE-HELP-shark-withinfo-not-connecting-to-spark-master-tp419.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: performance

2014-01-08 Thread Andrew Ash
My first thought on hearing that you're calling collect is that taking all
the data back to the driver is intensive on the network.  Try checking the
basic systems stuff on the machines to get a sense of what's being heavily
used:

disk IO
CPU
network

Any kind of distributed system monitoring framework should be able to
handle these sorts of things.

Cheers!
Andrew


On Wed, Jan 8, 2014 at 1:49 PM, Yann Luppo  wrote:

>  Hi,
>
>  I have what I hope is a simple question. What's a typical approach to
> diagnostic performance issues on a Spark cluster?
> We've followed all the pertinent parts of the following document already:
> http://spark.incubator.apache.org/docs/latest/tuning.html
> But we seem to still have issues. More specifically we have a
> leftouterjoin followed by a flatmap and then a collect running a bit long.
>
>  How would I go about determining the bottleneck operation(s) ?
> Is our leftouterjoin taking a long time?
> Is the function we send to the flatmap not optimized?
>
>  Thanks,
> Yann
>


Re: Dying workers since migration to 0.8.1

2014-01-08 Thread Andrew Ash
Any exceptions you see in the worker machine's logs would be particularly
useful too.


On Wed, Jan 8, 2014 at 6:00 AM, Prashant Sharma wrote:

> Hi,
>
> Can you give a little more details about the problem apart from a few
> hints that would be great !. I would like to exactly what you did and how
> did you end up getting those stuck up executors. This can be due to network
> too. Are you on ec2 ? in that case ec2 n/w is often unpredictable.
>
>
> On Wed, Jan 8, 2014 at 6:58 PM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>  Hi,
>>
>> We migrated from 0.8.0 to 0.8.1 on Monday, since then we have observed a
>> high rate of disappearing (they're not in the list anymore) or dying
>> (marked DEAD) workers.
>>
>> This is particularly strange since the processes often continue running.
>>
>> Any idea / advice about this particular problem ?
>>
>> Thanks
>>
>> Guillaume
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80
>>
>> eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
>
> --
> Prashant
>
<>

Re: native-lzo / gpl lib

2014-01-08 Thread Andrew Ash
To get shark on LZO files working (I have it up and running with CDH4.4.0)
you first need the hadoop-lzo jar on the classpath for shark (and spark).
 Hadoop-lzo seems to require its native code component, unlike Hadoop which
can run non-native if it can't find native.  So you'll need to add
hadoop-lzo's native component to the library path too.

Here's an excerpt from my puppet module that does these things.  Edit
accordingly and put these two rows into your shark-env.sh

export SPARK_LIBRARY_PATH="<%= scope['common::masterBaseDir']
%>/hadoop-current/lib/native/"
export SPARK_CLASSPATH="<%= scope['common::masterBaseDir']
%>/hadoop-current/lib/hadoop-lzo.jar"

And here's what I have in hadoop-current/lib/native:

[user@machine hadoop-current]$ ls
bin   hadoop-ant-2.0.0-mr1-cdh4.4.0.jar
hadoop-examples-2.0.0-mr1-cdh4.4.0.jar  hadoop-tools-2.0.0-mr1-cdh4.4.0.jar
 lib  logs  webapps
conf  hadoop-core-2.0.0-mr1-cdh4.4.0.jar
 hadoop-test-2.0.0-mr1-cdh4.4.0.jar  include
   libexec  sbin
[user@machine hadoop-current]$ ls lib/native/
libgplcompression.a  libgplcompression.la  libgplcompression.so
 libgplcompression.so.0  libgplcompression.so.0.0.0  Linux-amd64-64
[user@machine hadoop-current]$


Does that help?

Andrew


On Wed, Jan 8, 2014 at 7:02 AM, leosand...@gmail.com
wrote:

>  HI,
> I do a query from shark , it read a compress data from hdfs . but
> spark could't find the native-lzo lib .
>
>  14/01/08 22:58:21 ERROR executor.Executor: Exception in task ID 286
> java.lang.RuntimeException: native-lzo library not available
> at
> com.hadoop.compression.lzo.LzoCodec.getDecompressorType(LzoCodec.java:175)
> at
> org.apache.hadoop.hive.ql.io.CodecPool.getDecompressor(CodecPool.java:122)
> at org.apache.hadoop.hive.ql.io.RCFile$Reader.init(RCFile.java:1299)
> at org.apache.hadoop.hive.ql.io.RCFile$Reader.(RCFile.java:1139)
> at org.apache.hadoop.hive.ql.io.RCFile$Reader.(RCFile.java:1118)
> at
> org.apache.hadoop.hive.ql.io.RCFileRecordReader.(RCFileRecordReader.java:52)
> at
> org.apache.hadoop.hive.ql.io.RCFileInputFormat.getRecordReader(RCFileInputFormat.java:57)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:93)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.rdd.MapPartitionsWithIndexRDD.compute(MapPartitionsWithIndexRDD.scala:40)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.rdd.MapPartitionsWithIndexRDD.compute(MapPartitionsWithIndexRDD.scala:40)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:662)
>
>  can anyone give me the hint
>
> thank you !
>
> --
>  leosand...@gmail.com
>


Re: Why does sortByKey launch cluster job?

2014-01-08 Thread Andrew Ash
And at the moment we should use the atlassian.net Jira instance, not the
apache.org one?  The apache one looks empty.

https://spark-project.atlassian.net/browse/SPARK
https://issues.apache.org/jira/browse/SPARK


On Wed, Jan 8, 2014 at 9:04 AM, Aaron Davidson  wrote:

> Feel free to always file official bugs in Jira, as long as it's not
> already there!
>
>
> On Tue, Jan 7, 2014 at 9:47 PM, Andrew Ash  wrote:
>
>> Hi Josh,
>>
>> I just ran into this again myself and noticed that the source hasn't
>> changed since we discussed in December.  Should I file an official bug in
>> Jira?
>>
>> Andrew
>>
>>
>> On Tue, Dec 10, 2013 at 8:34 AM, Josh Rosen  wrote:
>>
>>> I wonder whether making RangePartitoner .rangeBounds into a lazy val
>>> would fix this (
>>> https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
>>>  We'd need to make sure that rangeBounds() is never called before an action
>>> is performed.  This could be tricky because it's called in the
>>> RangePartitioner.equals() method.  Maybe it's sufficient to just compare
>>> the number of partitions, the ids of the RDDs used to create the
>>> RangePartitioner, and the sort ordering.  This still supports the case
>>> where I range-partition one RDD and pass the same partitioner to a
>>> different RDD.  It breaks support for the case where two range partitioners
>>> created on different RDDs happened to have the same rangeBounds(), but it
>>> seems unlikely that this would really harm performance since it's probably
>>> unlikely that the range partitioners are equal by chance.
>>>
>>>
>>> On Tue, Dec 10, 2013 at 8:18 AM, Ryan Prenger wrote:
>>>
>>>> Thanks for the responses!  I agree that b seems like it would be
>>>> better.  I could imagine optimizations that could be made if a filter call
>>>> came after the sortByKey that would make the initial partitioning
>>>> sub-optimal.  Plus this way, it's a pain to use in the REPL.
>>>>
>>>> Cheers,
>>>>
>>>> Ryan
>>>>
>>>>
>>>> On Tue, Dec 10, 2013 at 7:06 AM, Andrew Ash wrote:
>>>>
>>>>> Since sortByKey() invokes those right now, we should either a) change
>>>>> the documentation to treat note that it kicks off actions or b) change the
>>>>> method to execute those things lazily.
>>>>>
>>>>> Personally I'd prefer b but don't know how difficult that would be.
>>>>>
>>>>>
>>>>> On Tue, Dec 10, 2013 at 1:52 AM, Jason Lenderman <
>>>>> jslender...@gmail.com> wrote:
>>>>>
>>>>>> Hey Ryan,
>>>>>>
>>>>>> The *sortByKey* method creates a *RangePartitioner* (see
>>>>>> Partitioner.scala), and the initialization code of the
>>>>>> *RangePartitioner* invokes actions *count* and *sample*.
>>>>>>
>>>>>>
>>>>>> Jason
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 9, 2013 at 7:01 PM, Ryan Prenger wrote:
>>>>>>
>>>>>>> sortByKey is listed as a data transformation, not an action, yet it
>>>>>>> launches a job.  This doesn't seem to square with the documentation.
>>>>>>>
>>>>>>> Ryan
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark SequenceFile Java API Repeat Key Values

2014-01-07 Thread Andrew Ash
Agreed on the clone by default approach -- this reused object gotcha has
hit several people I know when using Avro.

We should be careful to not ignore the performance impact that made Hadoop
reuse objects in the first place though.  I'm not sure what this means in
practice though, you either clone the objects in Spark or you don't.


On Tue, Jan 7, 2014 at 9:47 PM, Matei Zaharia wrote:

> Yup, a) would make it work.
>
> I’d actually prefer that we change it so it clones the objects by default,
> and add a boolean flag (default false) for people who want to reuse
> objects. We’d have to do the same in hadoopRDD and the various versions of
> that as well.
>
> Matei
>
> On Jan 8, 2014, at 12:38 AM, Andrew Ash  wrote:
>
> Matei, do you mean something like A rather than B below?
>
> A) rdd.map(_.clone).cache
> B) rdd.cache
>
> I'd be happy to add documentation if there's a good place for it, but I'm
> not sure there's an obvious place for it.
>
>
> On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia wrote:
>
>> Yeah, unfortunately sequenceFile() reuses the Writable object across
>> records. If you plan to use each record repeatedly (e.g. cache it), you
>> should clone them using a map function. It was originally designed assuming
>> you only look at each record once, but it’s poorly documented.
>>
>> Matei
>>
>> On Jan 7, 2014, at 11:32 PM, Michael Quinlan  wrote:
>>
>> > I've spent some time trying to import data into an RDD using the Spark
>> Java
>> > API, but am not able to properly load data stored in a Hadoop v1.1.1
>> > sequence file with key and value types both LongWritable. I've attached
>> a
>> > copy of the sequence file to this posting. It contains 3000 key, value
>> > pairs. I'm attempting to read using the following code snip:
>> >
>> > System.setProperty("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer");
>> >
>> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>> >
>> > JavaSparkContext ctx = new JavaSparkContext("local[2]",
>> >"AppName",
>> >"/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>> >
>> > //Load DataCube via Spark sequenceFile
>> > JavaPairRDD DataCube =
>> > ctx.sequenceFile("/local_filesystem/output.seq",
>> >LongWritable.class, LongWritable.class);
>> >
>> > The code above produces a DataCube filled with duplicate entries
>> relating in
>> > some way to the number of splits. For example, the last 1500 or so
>> entries
>> > all have the same key and value: (2999,22483). The previous 1500 entries
>> > appear to represent the last key value from first split of the file.
>> I've
>> > confirmed that changing the number of threads (local[3]) does change
>> the RDD
>> > representation, maintaining this general last key value pattern.
>> >
>> > Using the Hadoop (only) API methods, I am able to correctly read the
>> file
>> > even from within the same Jar:
>> >
>> > Configuration conf = new Configuration();
>> > FileSystem fs = FileSystem.get(conf);
>> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
>> > Path("/local_filesystem/output.seq"), conf);
>> > LongWritable key = new LongWritable();
>> > LongWritable value = new LongWritable();
>> > while(reader.next(key, value)) {
>> > System.out.println(key + ":" + value);
>> > }
>> >
>> > I've also confirmed that an RDD populated by the ctx.parallelize()
>> method:
>> >
>> > int n=100;
>> > List tl = new ArrayList(n);
>> > for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
>> > JavaRDD preCube = ctx.parallelize(tl, 1);
>> > DataCube = preCube.map(
>> >new PairFunction
>> ()
>> > {
>> >@Override
>> >public Tuple2
>> >call(LongWritable in) throws Exception {
>> >return (new Tuple2(in, in));
>> >}
>> >});
>> >
>> > can be written to a sequence file using the RDD method:
>> >
>> > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
>> > LongWritable.class, SequenceFileOutputFormat.class);
>> >
>> > and correctly read using the Hadoop (only) API copied above.
>> >
>> > It seems like there only a problem when I'm attempting to read the
>> sequence
>> > file directly into the RDD. All other operations are performing as
>> expected.
>> >
>> > I'd greatly appreciate any advice someone could provide.
>> >
>> > Regards,
>> >
>> > Michael
>> >
>> > output.seq
>> > <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>>
>
>


Re: Spark SequenceFile Java API Repeat Key Values

2014-01-07 Thread Andrew Ash
Matei, do you mean something like A rather than B below?

A) rdd.map(_.clone).cache
B) rdd.cache

I'd be happy to add documentation if there's a good place for it, but I'm
not sure there's an obvious place for it.


On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia wrote:

> Yeah, unfortunately sequenceFile() reuses the Writable object across
> records. If you plan to use each record repeatedly (e.g. cache it), you
> should clone them using a map function. It was originally designed assuming
> you only look at each record once, but it’s poorly documented.
>
> Matei
>
> On Jan 7, 2014, at 11:32 PM, Michael Quinlan  wrote:
>
> > I've spent some time trying to import data into an RDD using the Spark
> Java
> > API, but am not able to properly load data stored in a Hadoop v1.1.1
> > sequence file with key and value types both LongWritable. I've attached a
> > copy of the sequence file to this posting. It contains 3000 key, value
> > pairs. I'm attempting to read using the following code snip:
> >
> > System.setProperty("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer");
> >
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
> >
> > JavaSparkContext ctx = new JavaSparkContext("local[2]",
> >"AppName",
> >"/Users/mquinlan/spark-0.8.0-incubating","jar.name");
> >
> > //Load DataCube via Spark sequenceFile
> > JavaPairRDD DataCube =
> > ctx.sequenceFile("/local_filesystem/output.seq",
> >LongWritable.class, LongWritable.class);
> >
> > The code above produces a DataCube filled with duplicate entries
> relating in
> > some way to the number of splits. For example, the last 1500 or so
> entries
> > all have the same key and value: (2999,22483). The previous 1500 entries
> > appear to represent the last key value from first split of the file. I've
> > confirmed that changing the number of threads (local[3]) does change the
> RDD
> > representation, maintaining this general last key value pattern.
> >
> > Using the Hadoop (only) API methods, I am able to correctly read the file
> > even from within the same Jar:
> >
> > Configuration conf = new Configuration();
> > FileSystem fs = FileSystem.get(conf);
> > SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> > Path("/local_filesystem/output.seq"), conf);
> > LongWritable key = new LongWritable();
> > LongWritable value = new LongWritable();
> > while(reader.next(key, value)) {
> > System.out.println(key + ":" + value);
> > }
> >
> > I've also confirmed that an RDD populated by the ctx.parallelize()
> method:
> >
> > int n=100;
> > List tl = new ArrayList(n);
> > for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> > JavaRDD preCube = ctx.parallelize(tl, 1);
> > DataCube = preCube.map(
> >new PairFunction
> ()
> > {
> >@Override
> >public Tuple2
> >call(LongWritable in) throws Exception {
> >return (new Tuple2(in, in));
> >}
> >});
> >
> > can be written to a sequence file using the RDD method:
> >
> > DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> > LongWritable.class, SequenceFileOutputFormat.class);
> >
> > and correctly read using the Hadoop (only) API copied above.
> >
> > It seems like there only a problem when I'm attempting to read the
> sequence
> > file directly into the RDD. All other operations are performing as
> expected.
> >
> > I'd greatly appreciate any advice someone could provide.
> >
> > Regards,
> >
> > Michael
> >
> > output.seq
> > <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>


Re: Why does sortByKey launch cluster job?

2014-01-07 Thread Andrew Ash
Hi Josh,

I just ran into this again myself and noticed that the source hasn't
changed since we discussed in December.  Should I file an official bug in
Jira?

Andrew


On Tue, Dec 10, 2013 at 8:34 AM, Josh Rosen  wrote:

> I wonder whether making RangePartitoner .rangeBounds into a lazy val would
> fix this (
> https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
>  We'd need to make sure that rangeBounds() is never called before an action
> is performed.  This could be tricky because it's called in the
> RangePartitioner.equals() method.  Maybe it's sufficient to just compare
> the number of partitions, the ids of the RDDs used to create the
> RangePartitioner, and the sort ordering.  This still supports the case
> where I range-partition one RDD and pass the same partitioner to a
> different RDD.  It breaks support for the case where two range partitioners
> created on different RDDs happened to have the same rangeBounds(), but it
> seems unlikely that this would really harm performance since it's probably
> unlikely that the range partitioners are equal by chance.
>
>
> On Tue, Dec 10, 2013 at 8:18 AM, Ryan Prenger wrote:
>
>> Thanks for the responses!  I agree that b seems like it would be better.
>>  I could imagine optimizations that could be made if a filter call came
>> after the sortByKey that would make the initial partitioning sub-optimal.
>>  Plus this way, it's a pain to use in the REPL.
>>
>> Cheers,
>>
>> Ryan
>>
>>
>> On Tue, Dec 10, 2013 at 7:06 AM, Andrew Ash  wrote:
>>
>>> Since sortByKey() invokes those right now, we should either a) change
>>> the documentation to treat note that it kicks off actions or b) change the
>>> method to execute those things lazily.
>>>
>>> Personally I'd prefer b but don't know how difficult that would be.
>>>
>>>
>>> On Tue, Dec 10, 2013 at 1:52 AM, Jason Lenderman 
>>> wrote:
>>>
>>>> Hey Ryan,
>>>>
>>>> The *sortByKey* method creates a *RangePartitioner* (see
>>>> Partitioner.scala), and the initialization code of the
>>>> *RangePartitioner* invokes actions *count* and *sample*.
>>>>
>>>>
>>>> Jason
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 9, 2013 at 7:01 PM, Ryan Prenger wrote:
>>>>
>>>>> sortByKey is listed as a data transformation, not an action, yet it
>>>>> launches a job.  This doesn't seem to square with the documentation.
>>>>>
>>>>> Ryan
>>>>>
>>>>
>>>>
>>>
>>
>


Re: the spark worker assignment Question?

2014-01-07 Thread Andrew Ash
I think that would do what you want.  I'm guessing in "..." you have an rdd
and then call .collect on it -- normally this would be a bad idea because
of large data sizes, but if you KNOW that it's small then you can force it
through just that one machine.


On Tue, Jan 7, 2014 at 9:20 AM, Aureliano Buendia wrote:

>
>
>
> On Tue, Jan 7, 2014 at 5:13 PM, Andrew Ash  wrote:
>
>> If small-file is hosted in HDFS I think the default is one partition per
>> HDFS block. If it's in one block, which are 64MB each by default, that
>> might be one partition.
>>
> So if I want to parallelize processing that small file (which only fits in
> one block) over 100 machines, instead of calling:
>
> sc.parallelize(..., smallInput.partitions.length)
>
> should I call?:
>
> sc.parallelize(..., System.getProperty("spark.cores.max").toInt)
>
>
>> Sent from my mobile phone
>> On Jan 7, 2014 8:46 AM, "Aureliano Buendia"  wrote:
>>
>>>
>>>
>>>
>>> On Thu, Jan 2, 2014 at 5:52 PM, Andrew Ash  wrote:
>>>
>>>> That sounds right Mayur.
>>>>
>>>> Also in 0.8.1 I hear there's a new repartition method that you might be
>>>> able to use to further distribute the data.  But if your data is so small
>>>> that it fits in just a couple blocks, why are you using 20 machines just to
>>>> process a quarter GB of data?
>>>>
>>>
>>> Here is a use case: We could start from an extremely small file which
>>> could be transformed into a huge in-memory dataset, then reduced to a very
>>> small dataset.
>>>
>>> In a more concrete form, assume we have 100 worker machines and start
>>> from a small input file:
>>>
>>> val smallInput = sc.textFile("small-input")
>>>
>>> In this case, would smallInput.partitions.length be a small number, or
>>> would it be 100?
>>>
>>> If we do expect the next transformation to make the data significantly
>>> bigger, how to force it to be processed over the 100 machines?
>>>
>>>
>>>> Is the computation on each bit extremely intensive?
>>>>
>>>>
>>>> On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi >>> > wrote:
>>>>
>>>>> I have experienced a similar issue. The easiest fix I found was to
>>>>> increase the replication of the data being used in the worker to the 
>>>>> number
>>>>> of workers you want to use for processing. The RDD seem to created on all
>>>>> the machines where the blocks are replicated. Please correct me if I am
>>>>> wrong.
>>>>>
>>>>> Regards
>>>>> Mayur
>>>>>
>>>>> Mayur Rustagi
>>>>> Ph: +919632149971
>>>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>>>> https://twitter.com/mayur_rustagi
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash wrote:
>>>>>
>>>>>> Hi lihu,
>>>>>>
>>>>>> Maybe the data you're accessing is in in HDFS and only resides on 4
>>>>>> of your 20 machines because it's only about 4 blocks (at default 64MB /
>>>>>> block that's around a quarter GB).  Where is your source data located and
>>>>>> how is it stored?
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 2, 2014 at 7:53 AM, lihu  wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>I run  spark on a cluster with 20 machine, but when I start an
>>>>>>> application use the spark-shell, there only 4 machine is working , the
>>>>>>> other with just idle, without memery and cpu used, I watch this through
>>>>>>> webui.
>>>>>>>
>>>>>>>I wonder the other machine maybe  busy, so i watch the machines
>>>>>>> using  "top" and "free" command, but this is not。
>>>>>>>
>>>>>>>   * So I just wonder why not spark assignment work to all all the
>>>>>>> 20 machine? this is not a good resource usage.*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Re: the spark worker assignment Question?

2014-01-07 Thread Andrew Ash
If small-file is hosted in HDFS I think the default is one partition per
HDFS block. If it's in one block, which are 64MB each by default, that
might be one partition.

Sent from my mobile phone
On Jan 7, 2014 8:46 AM, "Aureliano Buendia"  wrote:

>
>
>
> On Thu, Jan 2, 2014 at 5:52 PM, Andrew Ash  wrote:
>
>> That sounds right Mayur.
>>
>> Also in 0.8.1 I hear there's a new repartition method that you might be
>> able to use to further distribute the data.  But if your data is so small
>> that it fits in just a couple blocks, why are you using 20 machines just to
>> process a quarter GB of data?
>>
>
> Here is a use case: We could start from an extremely small file which
> could be transformed into a huge in-memory dataset, then reduced to a very
> small dataset.
>
> In a more concrete form, assume we have 100 worker machines and start from
> a small input file:
>
> val smallInput = sc.textFile("small-input")
>
> In this case, would smallInput.partitions.length be a small number, or
> would it be 100?
>
> If we do expect the next transformation to make the data significantly
> bigger, how to force it to be processed over the 100 machines?
>
>
>> Is the computation on each bit extremely intensive?
>>
>>
>> On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi 
>> wrote:
>>
>>> I have experienced a similar issue. The easiest fix I found was to
>>> increase the replication of the data being used in the worker to the number
>>> of workers you want to use for processing. The RDD seem to created on all
>>> the machines where the blocks are replicated. Please correct me if I am
>>> wrong.
>>>
>>> Regards
>>> Mayur
>>>
>>> Mayur Rustagi
>>> Ph: +919632149971
>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>> https://twitter.com/mayur_rustagi
>>>
>>>
>>>
>>> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash wrote:
>>>
>>>> Hi lihu,
>>>>
>>>> Maybe the data you're accessing is in in HDFS and only resides on 4 of
>>>> your 20 machines because it's only about 4 blocks (at default 64MB / block
>>>> that's around a quarter GB).  Where is your source data located and how is
>>>> it stored?
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Thu, Jan 2, 2014 at 7:53 AM, lihu  wrote:
>>>>
>>>>> Hi,
>>>>>I run  spark on a cluster with 20 machine, but when I start an
>>>>> application use the spark-shell, there only 4 machine is working , the
>>>>> other with just idle, without memery and cpu used, I watch this through
>>>>> webui.
>>>>>
>>>>>I wonder the other machine maybe  busy, so i watch the machines
>>>>> using  "top" and "free" command, but this is not。
>>>>>
>>>>>   * So I just wonder why not spark assignment work to all all the 20
>>>>> machine? this is not a good resource usage.*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: the spark worker assignment Question?

2014-01-06 Thread Andrew Ash
Hi Li,

I've also found this setting confusing in the past.  Take a look at this
change -- do you think it makes the setting more clear?

https://github.com/apache/incubator-spark/pull/341/files

Andrew


On Mon, Jan 6, 2014 at 8:19 AM, lihu  wrote:

> Sorry for my late reply, because the gmail do not notice me.
>
> It is my fault that cause this problem.
> I take the config parameter* spark.core.max *as the maximum num in every
> machine, but it is the total number in fact.
>
> and thank Andrew and Mayur very much, your answer let understand more
> about the spark system.
>
>
>
> On Fri, Jan 3, 2014 at 2:28 AM, Mayur Rustagi wrote:
>
>> Andrew that a good point. I have done that for handling a large number of
>> queries. Typically to get good response time on large number of queries in
>> parallel, you would want them replicated on a lot of systems.
>> Regards
>> Mayur Rustagi
>> Ph: +919632149971
>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>> https://twitter.com/mayur_rustagi
>>
>>
>>
>> On Thu, Jan 2, 2014 at 11:22 PM, Andrew Ash  wrote:
>>
>>> That sounds right Mayur.
>>>
>>> Also in 0.8.1 I hear there's a new repartition method that you might be
>>> able to use to further distribute the data.  But if your data is so small
>>> that it fits in just a couple blocks, why are you using 20 machines just to
>>> process a quarter GB of data?  Is the computation on each bit extremely
>>> intensive?
>>>
>>>
>>> On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi 
>>> wrote:
>>>
>>>> I have experienced a similar issue. The easiest fix I found was to
>>>> increase the replication of the data being used in the worker to the number
>>>> of workers you want to use for processing. The RDD seem to created on all
>>>> the machines where the blocks are replicated. Please correct me if I am
>>>> wrong.
>>>>
>>>> Regards
>>>> Mayur
>>>>
>>>> Mayur Rustagi
>>>> Ph: +919632149971
>>>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>>>> https://twitter.com/mayur_rustagi
>>>>
>>>>
>>>>
>>>> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash wrote:
>>>>
>>>>> Hi lihu,
>>>>>
>>>>> Maybe the data you're accessing is in in HDFS and only resides on 4 of
>>>>> your 20 machines because it's only about 4 blocks (at default 64MB / block
>>>>> that's around a quarter GB).  Where is your source data located and how is
>>>>> it stored?
>>>>>
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Thu, Jan 2, 2014 at 7:53 AM, lihu  wrote:
>>>>>
>>>>>> Hi,
>>>>>>I run  spark on a cluster with 20 machine, but when I start an
>>>>>> application use the spark-shell, there only 4 machine is working , the
>>>>>> other with just idle, without memery and cpu used, I watch this through
>>>>>> webui.
>>>>>>
>>>>>>I wonder the other machine maybe  busy, so i watch the machines
>>>>>> using  "top" and "free" command, but this is not。
>>>>>>
>>>>>>   * So I just wonder why not spark assignment work to all all the 20
>>>>>> machine? this is not a good resource usage.*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> *Best Wishes!*
>
> *Li Hu(李浒) | Graduate Student*
>
> *Institute for Interdisciplinary Information Sciences(IIIS
> <http://iiis.tsinghua.edu.cn/>)*
> *Tsinghua University, China*
>
> *Email: lihu...@gmail.com *
> *Tel  : +86 15120081920 <%2B86%2015120081920>*
> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
> <http://iiis.tsinghua.edu.cn/zh/lihu/>*
>
>
>


Re: Data locality during Spark RDD creation

2014-01-03 Thread Andrew Ash
I definitely think so.  Network transfer is often a bottleneck for
distributed jobs, especially if you're using groupBys or re-keying things
often.

What network speed do you have between each HDFS node?  1GB?


On Fri, Jan 3, 2014 at 2:34 PM, Debasish Das wrote:

> Hi,
>
> I have HDFS and MapReduce running on 20 nodes and a experimental spark
> cluster running on subset of the HDFS node (say 8 of them).
>
> If some ETL is done using MR most likely the data will be replicated
> across all 20 nodes (assuming I used all the nodes).
>
> Is it a good idea to run spark cluster on all 20 nodes where HDFS is
> running so that all the RDDs are data local and the bulk data transfer is
> minimized ?
>
> Thanks.
> Deb
>


Re: Turning kryo on does not decrease binary output

2014-01-03 Thread Andrew Ash
saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions which
uses some Scala magic to become available when you have an that's RDD[Key,
Value]

https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648

Agreed, something like Chill would make this much easier for the default
cases.


On Fri, Jan 3, 2014 at 2:04 PM, Aureliano Buendia wrote:

> RDD only defines saveAsTextFile and saveAsObjectFile. I think
> saveAsHadoopFile and saveAsNewAPIHadoopFile belong to the older versions.
>
> saveAsObjectFile definitely outputs hadoop format.
>
> I'm not trying to save big objects by saveAsObjectFile, I'm just trying to
> minimize the java serialization overhead when saving to a binary file.
>
> I can see spark can benefit from something like
> https://github.com/twitter/chill in this matter.
>
>
> On Fri, Jan 3, 2014 at 6:42 PM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>  Hi,
>>
>> After a little bit of thinking, I'm not sure anymore if saveAsObjectFile
>> uses the spark.hadoop.*
>>
>> Also, I did write a mistake. The use of *.mapred.* or *.mapreduce.* does
>> not depend on the hadoop version you use, but onthe API version you use
>>
>> So, I can assure you that if you use the saveAsNewAPIHadoopFile, with the
>> spark.hadoop.mapreduce.* properties, the compression will be used.
>>
>> If you use the saveAsHadoopFile, it should be used with mapred.*
>>
>> If you use the saveAsObjectFile to a hdfs path, I'm not sure if the
>> output is compressed.
>>
>> Anyway, saveAsObjectFile should be used for small objects, in my opinion.
>>
>> Guillaume
>>
>>   Even
>>
>> someMap.saveAsTextFile("out", classOf[GzipCodec])
>>
>>  has no effect.
>>
>>  Also, I notices that saving sequence files has no compression option (my
>> original question was about compressing binary output).
>>
>>  Having said this, I still do not understand why kryo cannot be helpful
>> when saving binary output. Binary output uses java serialization, which has
>> a pretty hefty overhead.
>>
>>  How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>>
>>
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<>

Re: Turning kryo on does not decrease binary output

2014-01-03 Thread Andrew Ash
For hadoop properties I find the most reliable way to be to set them on a
Configuration object and use a method on SparkContext that accepts that
conf object.

>From working code:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def nlLZOfile(path: String) = {
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n")
sc.newAPIHadoopFile(path,
classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
classOf[Text], conf)
  .map(_._2.toString)
}


On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia wrote:

> Thanks for clarifying this.
>
> I tried setting hadoop properties before constructing SparkContext, but it
> had no effect.
>
> Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>  Hi,
>>
>> I believe Kryo is only use during RDD serialization (i.e. communication
>> between nodes), not for saving. If you want to compress output, you can use
>> GZip or snappy codec like that :
>>
>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>> "true")
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>> codec)
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>> "BLOCK")
>>
>> (That's for HDP2, for HDP1, the keys are different)
>> Regards
>> Guillaume
>>
>>   Hi,
>>
>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>> Double)*], expecting the output binary to be smaller, but it is exactly
>> the same size of when kryo is not on.
>>
>>  I've checked the log, and there is no trace of kryo related errors.
>>
>>  The code looks something like:
>>
>> class MyRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>> kryo.setRegistrationRequired(true)
>> kryo.register(classOf[*(Int, Int, Double Double)*])
>>   }
>> }
>>  System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>>  At the end, I tried to call:
>>
>> kryo.setRegistrationRequired(*true*)
>>
>>  to make sure my class gets registered. But I found errors like:
>>
>> Exception in thread "DAGScheduler"
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> *scala.math.Numeric$IntIsIntegral$*
>> Note: To register this class use:
>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>>  It appears many scala internal types have to be registered in order to
>> have full kryo support.
>>
>>  Any idea why my simple tuple type should not get kryo benefits?
>>
>>
>>
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<>

Re: Issue with sortByKey.

2014-01-03 Thread Andrew Ash
It probably uses hashcode too so make sure those two methods are in sync

Sent from my mobile phone
On Jan 3, 2014 3:26 AM, "Archit Thakur"  wrote:

> I realized my mistake as soon as I posted it. I actually meant groupByKey
> not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks.
>
> The issue I am facing is When I do a groupBy, it reduces the no. of unique
> keys in the Rdd and modify them also.
>
> For eg:
>
> I have a custom DS.
>
> Below is the set of unique keys in the baseRdd
>
> (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
> (40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]]
> (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
> (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
> (40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]]
> (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
> (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
> (40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]]
> (40^0^0[2^1380^1383839294]6[2[19^71.75.156.224:36652][2^WP]]
> (40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]]
> (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]
>
>
> and when I do a groupBy on the Rdd, it gives me:
>
> (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]]
> (40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]]
> (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]]
> (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]]
> (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]]
> (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]]
> (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]]
>
>
> Not only it has reduced the no. of keys but also have modified it.
>
> groupBy operation only uses equals method of the Key class (to check the
> equality of the key), right?
>
>
> On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash  wrote:
>
>> Hi Archit,
>>
>> A partition is a chunk of data about the size of an HDFS block, not that
>> of a single key.  Because every partition is tracked individually and each
>> is processed in a task on one CPU core, having massive numbers of them
>> causes slowdowns in the scheduler and elsewhere in the system.  About how
>> much data are you looking at here?  If the source of your RDDs are in HDFS,
>> then how many HDFS blocks are required to hold the 6 RDDs?
>>
>> Andrew
>>
>>
>> On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur 
>> wrote:
>>
>>> I saw Code of sortByKey:
>>>
>>> def sortByKey(ascending: Boolean = true, numPartitions: Int =
>>> self.partitions.size): RDD[P] = {
>>>
>>> It makes numPartitions = self.partitions.size which comes from
>>> getPartitions method of RDD, if you dont specify it explicitly.
>>>
>>> In this case it will be rdd which will be created by step (3rd). Isn't
>>> it wrong?
>>>
>>>
>>> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur >> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I have 6 sequence files as input to spark code.
>>>> What I am doing is:
>>>> 1. Create 6 individual RDD's out of them.
>>>> 2. Union them.
>>>> 3. Then Some Mapping.
>>>> 4. Count no of ele in RDD.
>>>> 5. Then SortByKey.
>>>>
>>>> Now, If I see logging:
>>>>
>>>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at
>>>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)
>>>>
>>>> This is count step (4th)
>>>>
>>>> Doubt 1: Why 6 output partitions?
>>>>
>>>> It then prints progress for each of them
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *14/01/03 09:04:05 INFO
>>>> storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager
>>>> guavus-000392:52345 with 47.4 GB RAM 14/01/03 09:04:08 INFO
>>>> cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392
>>>> (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed
>>>> ResultTask(0, 5)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager:
>>>> Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6) 

Re: Issue with sortByKey.

2014-01-03 Thread Andrew Ash
Hi Archit,

A partition is a chunk of data about the size of an HDFS block, not that of
a single key.  Because every partition is tracked individually and each is
processed in a task on one CPU core, having massive numbers of them causes
slowdowns in the scheduler and elsewhere in the system.  About how much
data are you looking at here?  If the source of your RDDs are in HDFS, then
how many HDFS blocks are required to hold the 6 RDDs?

Andrew


On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur wrote:

> I saw Code of sortByKey:
>
> def sortByKey(ascending: Boolean = true, numPartitions: Int =
> self.partitions.size): RDD[P] = {
>
> It makes numPartitions = self.partitions.size which comes from
> getPartitions method of RDD, if you dont specify it explicitly.
>
> In this case it will be rdd which will be created by step (3rd). Isn't it
> wrong?
>
>
> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur 
> wrote:
>
>> Hi,
>>
>> I have 6 sequence files as input to spark code.
>> What I am doing is:
>> 1. Create 6 individual RDD's out of them.
>> 2. Union them.
>> 3. Then Some Mapping.
>> 4. Count no of ele in RDD.
>> 5. Then SortByKey.
>>
>> Now, If I see logging:
>>
>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at
>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false)
>>
>> This is count step (4th)
>>
>> Doubt 1: Why 6 output partitions?
>>
>> It then prints progress for each of them
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *14/01/03 09:04:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
>> Registering block manager guavus-000392:52345 with 47.4 GB RAM 14/01/03
>> 09:04:08 INFO cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on
>> guavus-000392 (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler:
>> Completed ResultTask(0, 5)14/01/03 09:04:09 INFO
>> cluster.ClusterTaskSetManager: Finished TID 4 in 4211 ms on guavus-000392
>> (progress: 2/6) 14/01/03 09:04:09 INFO scheduler.DAGScheduler: Completed
>> ResultTask(0, 4)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager:
>> Finished TID 1 in 4221 ms on guavus-000392 (progress: 3/6)14/01/03 09:04:09
>> INFO scheduler.DAGScheduler: Completed ResultTask(0, 1) 14/01/03 09:04:10
>> INFO cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on
>> guavus-000392 (progress: 4/6)14/01/03 09:04:10 INFO scheduler.DAGScheduler:
>> Completed ResultTask(0, 0)14/01/03 09:04:12 INFO
>> cluster.ClusterTaskSetManager: Finished TID 3 in 7830 ms on guavus-000392
>> (progress: 5/6) 14/01/03 09:04:12 INFO scheduler.DAGScheduler: Completed
>> ResultTask(0, 3)14/01/03 09:04:20 INFO cluster.ClusterTaskSetManager:
>> Finished TID 2 in 15786 ms on guavus-000392 (progress: 6/6)14/01/03
>> 09:04:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 2) 14/01/03
>> 09:04:20 INFO scheduler.DAGScheduler: Stage 0 (count at
>> PreBaseCubeCreator.scala:96) finished in 16.320 s14/01/03 09:04:20 INFO
>> cluster.ClusterScheduler: Remove TaskSet 0.0 from pool14/01/03 09:04:20
>> INFO spark.SparkContext: Job finished: count*
>>
>> After that when it goes to sortByKey:
>>
>> *14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at
>> PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)*
>>
>> However, It should have been n output partitions, where n = unique no. of
>> keys in RDD. Isn't it?
>>
>> Thanks and Regards,
>> Archit Thakur.
>>
>
>


Is spark-env.sh supposed to be stateless?

2014-01-02 Thread Andrew Ash
In my spark-env.sh I append to the SPARK_CLASSPATH variable rather than
overriding it, because I want to support both adding a jar to all instances
of a shell (in spark-env.sh) and adding a jar to a single shell
instance (SPARK_CLASSPATH=/path/to/my.jar
/path/to/spark-shell)

That looks like this:

# spark-env.sh
export SPARK_CLASSPATH+=":/path/to/hadoop-lzo.jar"

However when my Master and workers run, they have duplicates of the
SPARK_CLASSPATH jars.  There are 3 copies of hadoop-lzo on the classpath, 2
of which are unnecessary.

The resulting command line in ps looks like this:
/path/to/java -cp
:/path/to/hadoop-lzo.jar:/path/to/hadoop-lzo.jar:/path/to/hadoop-lzo.jar:[core
spark jars] ... -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
spark://my-host:7077

I tracked it down and the problem is that spark-env.sh is sourced 3 times:
in spark-daemon.sh, in compute-classpath.sh, and in spark-class.  Each of
those adds to the SPARK_CLASSPATH until its contents are in triplicate.

Are all of those calls necessary?  Is it possible to edit the daemon
scripts to only call spark-env.sh once?

FYI I'm starting the daemons with ./bin/start-master.sh and
./bin/start-slave.sh 1 $SPARK_URL

Thanks,
Andrew


Re: How to deal with multidimensional keys?

2014-01-02 Thread Andrew Ash
If you had RDD[[i, j, k], value] then you could reduce by j by essentially
mapping j into the key slot, doing the reduce, and then mapping it back:

rdd.map( ((i,j,k),v) => (j, (i, k, v)).reduce( ... ).map( (j,(i,k,v)) =>
((i,j,k),v))

It's not pretty, but I've had to use this pattern before too.


On Thu, Jan 2, 2014 at 6:23 PM, Aureliano Buendia wrote:

> Hi,
>
> How is it possible to reduce by multidimensional keys?
>
> For example, if every line is a tuple like:
>
> (i, j, k, value)
>
> or, alternatively:
>
> ((I, j, k), value)
>
> how can spark handle reducing over j, or k?
>


Re: Standalone spark cluster dead nodes

2014-01-02 Thread Andrew Ash
Do you have stacktraces or other errors for the workers' deaths?

This is a great case for using the Tanuki service wrapper that can be
configured to automatically restart JVMs that die.

Andrew


On Thu, Jan 2, 2014 at 4:38 PM, Debasish Das wrote:

> Hi,
>
> I have been running standalone spark cluster but sometimes I do see dead
> nodes. The physical machines are not dead but the JVM worker dies.
>
> Is there a methodology which automatically restart worker JVM if it dies ?
>
> Thanks.
> Deb
>
>


Re: rdd.saveAsTextFile problem

2014-01-02 Thread Andrew Ash
I'm guessing it's a documentation issue, but certainly something could have
broken.

- what version of Spark?  -- 0.8.1
- what mode are you running with? (local, standalone, mesos, YARN) -- local
on Windows
- are you using the shell or a application - shell?
- what language (scala / java / Python) - scala

Can you provide a deeper error stacktrace from the executor?  Look in the
webui (port 4040) and in the stdout/stderr files.

Also, give it a shot on the linux box to see if that works.

Cheers!
Andrew


On Thu, Jan 2, 2014 at 1:31 PM, Philip Ogren wrote:

>  Yep - that works great and is what I normally do.
>
> I perhaps should have framed my email as a bug report.  The documentation
> for saveAsTextFile says you can write results out to a local file but it
> doesn't work for me per the described behavior.  It also worked before and
> now it doesn't.  So, it seems like a bug.  Should I file a Jira issue?  I
> haven't done that yet for this project but would be happy to.
>
> Thanks,
> Philip
>
>
> On 1/2/2014 11:23 AM, Andrew Ash wrote:
>
> For testing, maybe try using .collect and doing the comparison between
> expected and actual in memory rather than on disk?
>
>
> On Thu, Jan 2, 2014 at 12:54 PM, Philip Ogren wrote:
>
>>  I just tried your suggestion and get the same results with the
>> _temporary directory.  Thanks though.
>>
>>
>> On 1/2/2014 10:28 AM, Andrew Ash wrote:
>>
>> You want to write it to a local file on the machine?  Try using
>> "file:///path/to/target/mydir/" instead
>>
>>  I'm not sure what behavior would be if you did this on a multi-machine
>> cluster though -- you may get a bit of data on each machine in that local
>> directory.
>>
>>
>> On Thu, Jan 2, 2014 at 12:22 PM, Philip Ogren wrote:
>>
>>> I have a very simple Spark application that looks like the following:
>>>
>>>
>>> var myRdd: RDD[Array[String]] = initMyRdd()
>>> println(myRdd.first.mkString(", "))
>>> println(myRdd.count)
>>>
>>> myRdd.saveAsTextFile("hdfs://myserver:8020/mydir")
>>> myRdd.saveAsTextFile("target/mydir/")
>>>
>>>
>>> The println statements work as expected.  The first saveAsTextFile
>>> statement also works as expected.  The second saveAsTextFile statement does
>>> not (even if the first is commented out.)  I get the exception pasted
>>> below.  If I inspect "target/mydir" I see that there is a directory called
>>> _temporary/0/_temporary/attempt_201401020953__m_00_1 which contains
>>> an empty part-0 file.  It's curious because this code worked before
>>> with Spark 0.8.0 and now I am running on Spark 0.8.1. I happen to be
>>> running this on Windows in "local" mode at the moment.  Perhaps I should
>>> try running it on my linux box.
>>>
>>> Thanks,
>>> Philip
>>>
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted:
>>> Task 2.0:0 failed more than 0 times; aborting job
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:827)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:825)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:825)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:440)
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:502)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:157)
>>>
>>>
>>>
>>
>>
>
>


Re: rdd.saveAsTextFile problem

2014-01-02 Thread Andrew Ash
For testing, maybe try using .collect and doing the comparison between
expected and actual in memory rather than on disk?


On Thu, Jan 2, 2014 at 12:54 PM, Philip Ogren wrote:

>  I just tried your suggestion and get the same results with the _temporary
> directory.  Thanks though.
>
>
> On 1/2/2014 10:28 AM, Andrew Ash wrote:
>
> You want to write it to a local file on the machine?  Try using
> "file:///path/to/target/mydir/" instead
>
>  I'm not sure what behavior would be if you did this on a multi-machine
> cluster though -- you may get a bit of data on each machine in that local
> directory.
>
>
> On Thu, Jan 2, 2014 at 12:22 PM, Philip Ogren wrote:
>
>> I have a very simple Spark application that looks like the following:
>>
>>
>> var myRdd: RDD[Array[String]] = initMyRdd()
>> println(myRdd.first.mkString(", "))
>> println(myRdd.count)
>>
>> myRdd.saveAsTextFile("hdfs://myserver:8020/mydir")
>> myRdd.saveAsTextFile("target/mydir/")
>>
>>
>> The println statements work as expected.  The first saveAsTextFile
>> statement also works as expected.  The second saveAsTextFile statement does
>> not (even if the first is commented out.)  I get the exception pasted
>> below.  If I inspect "target/mydir" I see that there is a directory called
>> _temporary/0/_temporary/attempt_201401020953__m_00_1 which contains
>> an empty part-0 file.  It's curious because this code worked before
>> with Spark 0.8.0 and now I am running on Spark 0.8.1. I happen to be
>> running this on Windows in "local" mode at the moment.  Perhaps I should
>> try running it on my linux box.
>>
>> Thanks,
>> Philip
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted:
>> Task 2.0:0 failed more than 0 times; aborting job
>> java.lang.NullPointerException
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:827)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:825)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:825)
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:440)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:502)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:157)
>>
>>
>>
>
>


Re: the spark worker assignment Question?

2014-01-02 Thread Andrew Ash
That sounds right Mayur.

Also in 0.8.1 I hear there's a new repartition method that you might be
able to use to further distribute the data.  But if your data is so small
that it fits in just a couple blocks, why are you using 20 machines just to
process a quarter GB of data?  Is the computation on each bit extremely
intensive?


On Thu, Jan 2, 2014 at 12:39 PM, Mayur Rustagi wrote:

> I have experienced a similar issue. The easiest fix I found was to
> increase the replication of the data being used in the worker to the number
> of workers you want to use for processing. The RDD seem to created on all
> the machines where the blocks are replicated. Please correct me if I am
> wrong.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +919632149971
> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Thu, Jan 2, 2014 at 10:46 PM, Andrew Ash  wrote:
>
>> Hi lihu,
>>
>> Maybe the data you're accessing is in in HDFS and only resides on 4 of
>> your 20 machines because it's only about 4 blocks (at default 64MB / block
>> that's around a quarter GB).  Where is your source data located and how is
>> it stored?
>>
>> Andrew
>>
>>
>> On Thu, Jan 2, 2014 at 7:53 AM, lihu  wrote:
>>
>>> Hi,
>>>I run  spark on a cluster with 20 machine, but when I start an
>>> application use the spark-shell, there only 4 machine is working , the
>>> other with just idle, without memery and cpu used, I watch this through
>>> webui.
>>>
>>>I wonder the other machine maybe  busy, so i watch the machines using
>>>  "top" and "free" command, but this is not。
>>>
>>>   * So I just wonder why not spark assignment work to all all the 20
>>> machine? this is not a good resource usage.*
>>>
>>>
>>>
>>>
>>>
>>
>


Re: rdd.saveAsTextFile problem

2014-01-02 Thread Andrew Ash
You want to write it to a local file on the machine?  Try using
"file:///path/to/target/mydir/" instead

I'm not sure what behavior would be if you did this on a multi-machine
cluster though -- you may get a bit of data on each machine in that local
directory.


On Thu, Jan 2, 2014 at 12:22 PM, Philip Ogren wrote:

> I have a very simple Spark application that looks like the following:
>
>
> var myRdd: RDD[Array[String]] = initMyRdd()
> println(myRdd.first.mkString(", "))
> println(myRdd.count)
>
> myRdd.saveAsTextFile("hdfs://myserver:8020/mydir")
> myRdd.saveAsTextFile("target/mydir/")
>
>
> The println statements work as expected.  The first saveAsTextFile
> statement also works as expected.  The second saveAsTextFile statement does
> not (even if the first is commented out.)  I get the exception pasted
> below.  If I inspect "target/mydir" I see that there is a directory called
> _temporary/0/_temporary/attempt_201401020953__m_00_1 which
> contains an empty part-0 file.  It's curious because this code worked
> before with Spark 0.8.0 and now I am running on Spark 0.8.1. I happen to be
> running this on Windows in "local" mode at the moment.  Perhaps I should
> try running it on my linux box.
>
> Thanks,
> Philip
>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted:
> Task 2.0:0 failed more than 0 times; aborting job
> java.lang.NullPointerException
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:827)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:825)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:825)
> at org.apache.spark.scheduler.DAGScheduler.processEvent(
> DAGScheduler.scala:440)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$run(DAGScheduler.scala:502)
> at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(
> DAGScheduler.scala:157)
>
>
>


Re: the spark worker assignment Question?

2014-01-02 Thread Andrew Ash
Hi lihu,

Maybe the data you're accessing is in in HDFS and only resides on 4 of your
20 machines because it's only about 4 blocks (at default 64MB / block
that's around a quarter GB).  Where is your source data located and how is
it stored?

Andrew


On Thu, Jan 2, 2014 at 7:53 AM, lihu  wrote:

> Hi,
>I run  spark on a cluster with 20 machine, but when I start an
> application use the spark-shell, there only 4 machine is working , the
> other with just idle, without memery and cpu used, I watch this through
> webui.
>
>I wonder the other machine maybe  busy, so i watch the machines using
>  "top" and "free" command, but this is not。
>
>   * So I just wonder why not spark assignment work to all all the 20
> machine? this is not a good resource usage.*
>
>
>
>
>


Re: 回复: Where to put "local" data files?

2014-01-01 Thread Andrew Ash
Yes it will.  This is called data locality and it works by matching the
hostname in Spark with the one in HDFS.


On Wed, Jan 1, 2014 at 2:40 AM, guxiaobo1982  wrote:

> Hi Andrew,
>
>
> Thanks for your reply, I have another question about using HDFS, when running 
> hdfs and the standalone mode on the same cluster, will the spark workers only 
> read data on the same server to avoid transfering data over network.
>
> Xiaobo gu
>
> 在 2014年01月01日 05:37:36
> "andrew" 写道:
>
> Hi Xiaobo,
>
> I would recommend putting the files into an HDFS cluster on the same
> machines instead if possible. ?If you're concerned about duplicating the
> data, you can set the replication factor to 1 so you don't use more space
> than before.
>
> In my experience of Spark around 0.7.0 or so, when reading from a local
> file with sc.textFile("file:///...") you had to have that file in that
> exact path on every Spark worker machine.
>
> Cheers,
> Andrew
>
>
> On Tue, Dec 31, 2013 at 5:34 AM, guxiaobo1982  wrote:
>
>> Hi,
>>
>> We are going to deploy a standalone mode cluster, we know Spark can read
>> local data files into RDDs, but the question is where should we put the
>> data file, on the server where commit our application, or the server where
>> the master service runs?
>>
>> Regards,
>>
>> Xiaobo Gu
>>
>
>


Re: Where to put "local" data files?

2013-12-31 Thread Andrew Ash
Hi Xiaobo,

I would recommend putting the files into an HDFS cluster on the same
machines instead if possible.  If you're concerned about duplicating the
data, you can set the replication factor to 1 so you don't use more space
than before.

In my experience of Spark around 0.7.0 or so, when reading from a local
file with sc.textFile("file:///...") you had to have that file in that
exact path on every Spark worker machine.

Cheers,
Andrew


On Tue, Dec 31, 2013 at 5:34 AM, guxiaobo1982  wrote:

> Hi,
>
> We are going to deploy a standalone mode cluster, we know Spark can read
> local data files into RDDs, but the question is where should we put the
> data file, on the server where commit our application, or the server where
> the master service runs?
>
> Regards,
>
> Xiaobo Gu
>


Re: How to map each line to (line number, line)?

2013-12-30 Thread Andrew Ash
Hi Aureliano,

It's very easy to get lines into (start byte number, line) using Hadoop's
TextInputFormat.  See how SparkContext's textFile() method does it here:
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291

What is the use case where you must have the global line number in the
file, vs a global ordered unique identifier (my suggestion above) or a
partition-local line number (discussed extensively below)?

Also if you have any way to do this in plain Hadoop, Spark can use that as
well.

The fundamental difficulty is that knowing global line number breaks the
assumption Hadoop makes everywhere that each record is independent of all
the others.  Maybe you should consider adding a line number to the
beginning of every line on import time into HDFS instead of doing it
afterwards in Spark.

Cheers!
Andrew


On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia wrote:

> I assumed that number of lines in each partition, except the last
> partition, is equal. Isn't this the case? In that case Guillaume's approach
> makes sense.
>
> All of these methods are inefficient. Spark needs to support this feature
> at lower level, as Michael suggested.
>
>
> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>  You're assuming each partition has the same line count. I don't think
>> it's true (actually, I'm almost certain it's false). And anyway your code
>> also require two maps.
>>
>> In my code, the sorting as well as the other operations are performed on
>> a very small dataset : one element per partition
>>
>> Guillaume
>>
>>
>>
>>
>>> Did you try the code I sent ? I think the sortBy is probably in the
>>> wrong direction, so change it with -i instead of i
>>>
>>
>>  I'm confused why would need in memory sorting. We just use a loop like
>> any other loops in spark. Why shouldn't this solve the problem?:
>>
>> val count = lines.count() // lines is the rdd
>> val partitionLinesCount = count / rdd.partitions.length
>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>   var i = pi * partitionLinesCount
>>   it.map {
>> *line => (i, line)*
>>  i += 1
>>}
>> }
>>
>>
>>
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<>

Re: reading LZO compressed file in spark

2013-12-26 Thread Andrew Ash
Rajeev,

You should have something like this in in your core-site.xml file in Hadoop:


io.compression.codecs

com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec


I also had to add the LZO jar into Spark with SPARK_CLASSPATH in
spark-env.sh so you may need to do that too.

Cheers,
Andrew



On Thu, Dec 26, 2013 at 3:48 PM, Rajeev Srivastava  wrote:

> Hi Andrew,
>  Thanks for your example
> I used your command and i get the following errors from worker  ( missing
> codec from worker i guess)
> How do i get codecs over to worker machines
> regards
> Rajeev
> ***
> 13/12/26 12:34:42 INFO TaskSetManager: Loss was due to
> java.io.IOException: Codec for file
> hdfs://hadoop00/tmp/ldpc_dec_top_245_to_275.vcd.sstv3.lzo not
> found, cannot
> run
> at
> com.hadoop.mapreduce.LzoLineRecordReader.initialize(LzoLineRecordReader.java:97)
> at
> spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:68)
> at
> spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:57)
> at
> spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
> at
> spark.RDD.iterator(RDD.scala:196)
> at
> spark.scheduler.ResultTask.run(ResultTask.scala:77)
> at
> spark.executor.Executor$TaskRunner.run(Executor.scala:98)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at
> java.lang.Thread.run(Thread.java:724)
> 13/12/26 12:34:42 INFO TaskSetManager: Starting task 0.0:15 as TID 28 on
> executor 4: hadoop02
> (preferred)
> 13/12/26 12:34:42 INFO TaskSetManager: Serialized task 0.0:15 as 1358 bytes
> in 0 ms  13/12/26 12:34:42 INFO
> TaskSetManager: Lost TID 22 (task
> 0.0:20) 13/12/26
> 12:34:42 INFO TaskSetManager: Loss was due to java.io.IOException: Codec
> for file hdfs://hadoop00/tmp/ldpc_dec_top_245_to_275.vcd.sstv3.lzo
> not found, cannot run [duplicate 1]
>
> Rajeev Srivastava
> Silverline Design Inc
> 2118 Walsh ave, suite 204
> Santa Clara, CA, 95050
> cell : 408-409-0940
>
>
> On Tue, Dec 24, 2013 at 5:20 PM, Andrew Ash  wrote:
>
>> Hi Berkeley,
>>
>> By RF=3 I mean replication factor of 3 on the files in HDFS, so each
>> block is stored 3 times across the cluster.  It's a pretty standard choice
>> for the replication factor in order to give a hardware team time to replace
>> bad hardware in the case of failure.  With RF=3 the cluster can sustain
>> failure on any two nodes without data loss, but the loss of the third node
>> may cause loss.
>>
>> When reading the LZO files with the newAPIHadoopFile() call I showed
>> below, the data in the RDD is already decompressed -- it transparently
>> looks the same to my Spark program as if I was operating on an uncompressed
>> file.
>>
>> Cheers,
>> Andrew
>>
>>
>> On Tue, Dec 24, 2013 at 12:29 PM, Berkeley Malagon <
>> berke...@firestickgames.com> wrote:
>>
>>> Andrew, This is great.
>>>
>>> Excuse my ignorance, but what do you mean by RF=3? Also, after reading
>>> the LZO files, are you able to access the contents directly, or do you have
>>> to decompress them after reading them?
>>>
>>> Sent from my iPhone
>>>
>>> On Dec 24, 2013, at 12:03 AM, Andrew Ash  wrote:
>>>
>>> Hi Rajeev,
>>>
>>> I'm not sure if you ever got it working, but I just got mine up and
>>> going.  If you just use sc.textFile(...) the file will be read but the LZO
>>> index won't be used so a .count() on my 1B+ row file took 2483s.  When I
>>> ran it like this though:
>>>
>>> sc.newAPIHadoopFile("hdfs:///path/to/myfile.lzo",
>>> classOf[com.hadoop.mapreduce.LzoTextInputFormat],
>>> classOf[org.apache.hadoop.io.LongWritable],
>>> classOf[org.apache.hadoop.io.Text]).count
>>>
>>> the LZO index file was used and the .count() took just 101s.  For
>>> reference this file is 43GB when .gz compressed and 78.4GB when .lzo
>>> compressed.  I have RF=3 and this is across 4 pretty beefy machines with
>>> Hadoop DataNodes and Spark both running on each machine.
>>>
>>> Cheers!
>>> Andrew
>>>
>>>
>>> On Mon, Dec 16, 2013 at 2:34 PM, Rajeev Srivastava <
>>> raj...@silverline-da.com> wrote:
>>>
>>>> Thanks for your suggestion. I will try this and u

Re: reading LZO compressed file in spark

2013-12-24 Thread Andrew Ash
Hi Berkeley,

By RF=3 I mean replication factor of 3 on the files in HDFS, so each block
is stored 3 times across the cluster.  It's a pretty standard choice for
the replication factor in order to give a hardware team time to replace bad
hardware in the case of failure.  With RF=3 the cluster can sustain failure
on any two nodes without data loss, but the loss of the third node may
cause loss.

When reading the LZO files with the newAPIHadoopFile() call I showed below,
the data in the RDD is already decompressed -- it transparently looks the
same to my Spark program as if I was operating on an uncompressed file.

Cheers,
Andrew


On Tue, Dec 24, 2013 at 12:29 PM, Berkeley Malagon <
berke...@firestickgames.com> wrote:

> Andrew, This is great.
>
> Excuse my ignorance, but what do you mean by RF=3? Also, after reading the
> LZO files, are you able to access the contents directly, or do you have to
> decompress them after reading them?
>
> Sent from my iPhone
>
> On Dec 24, 2013, at 12:03 AM, Andrew Ash  wrote:
>
> Hi Rajeev,
>
> I'm not sure if you ever got it working, but I just got mine up and going.
>  If you just use sc.textFile(...) the file will be read but the LZO index
> won't be used so a .count() on my 1B+ row file took 2483s.  When I ran it
> like this though:
>
> sc.newAPIHadoopFile("hdfs:///path/to/myfile.lzo",
> classOf[com.hadoop.mapreduce.LzoTextInputFormat],
> classOf[org.apache.hadoop.io.LongWritable],
> classOf[org.apache.hadoop.io.Text]).count
>
> the LZO index file was used and the .count() took just 101s.  For
> reference this file is 43GB when .gz compressed and 78.4GB when .lzo
> compressed.  I have RF=3 and this is across 4 pretty beefy machines with
> Hadoop DataNodes and Spark both running on each machine.
>
> Cheers!
> Andrew
>
>
> On Mon, Dec 16, 2013 at 2:34 PM, Rajeev Srivastava <
> raj...@silverline-da.com> wrote:
>
>> Thanks for your suggestion. I will try this and update by late evening.
>>
>> regards
>> Rajeev
>>
>> Rajeev Srivastava
>> Silverline Design Inc
>> 2118 Walsh ave, suite 204
>> Santa Clara, CA, 95050
>> cell : 408-409-0940
>>
>>
>> On Mon, Dec 16, 2013 at 11:24 AM, Andrew Ash wrote:
>>
>>> Hi Rajeev,
>>>
>>> It looks like you're using the 
>>> com.hadoop.mapred.DeprecatedLzoTextInputFormat
>>> input format above, while Stephen referred to com.hadoop.mapreduce.
>>> LzoTextInputFormat
>>>
>>> I think the way to use this in Spark would be to use the
>>> SparkContext.hadoopFile() or SparkContext.newAPIHadoopFile() methods with
>>> the path and the InputFormat as parameters.  Can you give those a shot?
>>>
>>> Andrew
>>>
>>>
>>> On Wed, Dec 11, 2013 at 8:59 PM, Rajeev Srivastava <
>>> raj...@silverline-da.com> wrote:
>>>
>>>> Hi Stephen,
>>>>  I tried the same lzo file with a simple hadoop script
>>>> this seems to work fine
>>>>
>>>> HADOOP_HOME=/usr/lib/hadoop
>>>> /usr/bin/hadoop  jar
>>>> /opt/cloudera/parcels/CDH-4.4.0-1.cdh4.4.0.p0.39/lib/hadoop-mapreduce/hadoop-streaming.jar
>>>> \
>>>> -libjars
>>>> /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
>>>> \
>>>> -input /tmp/ldpc.sstv3.lzo \
>>>> -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
>>>> -output wc_test \
>>>> -mapper 'cat' \
>>>> -reducer 'wc -l'
>>>>
>>>> This means hadoop is able to handle the lzo file correctly
>>>>
>>>> Can you suggest me what i should do in spark for it to work
>>>>
>>>> regards
>>>> Rajeev
>>>>
>>>>
>>>> Rajeev Srivastava
>>>> Silverline Design Inc
>>>> 2118 Walsh ave, suite 204
>>>> Santa Clara, CA, 95050
>>>> cell : 408-409-0940
>>>>
>>>>
>>>> On Tue, Dec 10, 2013 at 1:20 PM, Stephen Haberman <
>>>> stephen.haber...@gmail.com> wrote:
>>>>
>>>>>
>>>>> > System.setProperty("spark.io.compression.codec",
>>>>> > "com.hadoop.compression.lzo.LzopCodec")
>>>>>
>>>>> This spark.io.compression.codec is a completely different setting than
>>>>> the
>>>>> codecs that are used for reading/writing from HDFS. (It is for
>>>>> compressing
>>>>> Spark's internal/non-HDFS intermediate output.)
>>>>>
>>>>> > Hope this helps and someone can help read a LZO file
>>>>>
>>>>> Spark just uses the regular Hadoop File System API, so any issues with
>>>>> reading
>>>>> LZO files would be Hadoop issues. I would search in the Hadoop issue
>>>>> tracker,
>>>>> and look for information on using LZO files with Hadoop/Hive, and
>>>>> whatever works
>>>>> for them, should magically work for Spark as well.
>>>>>
>>>>> This looks like a good place to start:
>>>>>
>>>>> https://github.com/twitter/hadoop-lzo
>>>>>
>>>>> IANAE, but I would try passing one of these:
>>>>>
>>>>>
>>>>> https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java
>>>>>
>>>>> To the SparkContext.hadoopFile method.
>>>>>
>>>>> - Stephen
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: reading LZO compressed file in spark

2013-12-24 Thread Andrew Ash
Hi Rajeev,

I'm not sure if you ever got it working, but I just got mine up and going.
 If you just use sc.textFile(...) the file will be read but the LZO index
won't be used so a .count() on my 1B+ row file took 2483s.  When I ran it
like this though:

sc.newAPIHadoopFile("hdfs:///path/to/myfile.lzo",
classOf[com.hadoop.mapreduce.LzoTextInputFormat],
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text]).count

the LZO index file was used and the .count() took just 101s.  For reference
this file is 43GB when .gz compressed and 78.4GB when .lzo compressed.  I
have RF=3 and this is across 4 pretty beefy machines with Hadoop DataNodes
and Spark both running on each machine.

Cheers!
Andrew


On Mon, Dec 16, 2013 at 2:34 PM, Rajeev Srivastava  wrote:

> Thanks for your suggestion. I will try this and update by late evening.
>
> regards
> Rajeev
>
> Rajeev Srivastava
> Silverline Design Inc
> 2118 Walsh ave, suite 204
> Santa Clara, CA, 95050
> cell : 408-409-0940
>
>
> On Mon, Dec 16, 2013 at 11:24 AM, Andrew Ash  wrote:
>
>> Hi Rajeev,
>>
>> It looks like you're using the com.hadoop.mapred.DeprecatedLzoTextInputFormat
>> input format above, while Stephen referred to com.hadoop.mapreduce.
>> LzoTextInputFormat
>>
>> I think the way to use this in Spark would be to use the
>> SparkContext.hadoopFile() or SparkContext.newAPIHadoopFile() methods with
>> the path and the InputFormat as parameters.  Can you give those a shot?
>>
>> Andrew
>>
>>
>> On Wed, Dec 11, 2013 at 8:59 PM, Rajeev Srivastava <
>> raj...@silverline-da.com> wrote:
>>
>>> Hi Stephen,
>>>  I tried the same lzo file with a simple hadoop script
>>> this seems to work fine
>>>
>>> HADOOP_HOME=/usr/lib/hadoop
>>> /usr/bin/hadoop  jar
>>> /opt/cloudera/parcels/CDH-4.4.0-1.cdh4.4.0.p0.39/lib/hadoop-mapreduce/hadoop-streaming.jar
>>> \
>>> -libjars
>>> /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
>>> \
>>> -input /tmp/ldpc.sstv3.lzo \
>>> -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
>>> -output wc_test \
>>> -mapper 'cat' \
>>> -reducer 'wc -l'
>>>
>>> This means hadoop is able to handle the lzo file correctly
>>>
>>> Can you suggest me what i should do in spark for it to work
>>>
>>> regards
>>> Rajeev
>>>
>>>
>>> Rajeev Srivastava
>>> Silverline Design Inc
>>> 2118 Walsh ave, suite 204
>>> Santa Clara, CA, 95050
>>> cell : 408-409-0940
>>>
>>>
>>> On Tue, Dec 10, 2013 at 1:20 PM, Stephen Haberman <
>>> stephen.haber...@gmail.com> wrote:
>>>
>>>>
>>>> > System.setProperty("spark.io.compression.codec",
>>>> > "com.hadoop.compression.lzo.LzopCodec")
>>>>
>>>> This spark.io.compression.codec is a completely different setting than
>>>> the
>>>> codecs that are used for reading/writing from HDFS. (It is for
>>>> compressing
>>>> Spark's internal/non-HDFS intermediate output.)
>>>>
>>>> > Hope this helps and someone can help read a LZO file
>>>>
>>>> Spark just uses the regular Hadoop File System API, so any issues with
>>>> reading
>>>> LZO files would be Hadoop issues. I would search in the Hadoop issue
>>>> tracker,
>>>> and look for information on using LZO files with Hadoop/Hive, and
>>>> whatever works
>>>> for them, should magically work for Spark as well.
>>>>
>>>> This looks like a good place to start:
>>>>
>>>> https://github.com/twitter/hadoop-lzo
>>>>
>>>> IANAE, but I would try passing one of these:
>>>>
>>>>
>>>> https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java
>>>>
>>>> To the SparkContext.hadoopFile method.
>>>>
>>>> - Stephen
>>>>
>>>>
>>>
>>
>


Re: Task not running in standalone cluster

2013-12-17 Thread Andrew Ash
Glad you got it figured out!


On Tue, Dec 17, 2013 at 8:43 AM, Jie Deng  wrote:

> don't bother...My problem is using spark-0.9 instead 0.8...because 0.9
> fixed bug which can run from eclipse.
>
>
> 2013/12/17 Jie Deng 
>
>> When I start a task on master, I can see there is a
>> CoarseGralinedExcutorBackend java process running on worker, is that saying
>> something?
>>
>>
>> 2013/12/17 Jie Deng 
>>
>>> Hi Andrew,
>>>
>>> Thanks for helping!
>>> Sorry I did not make my self clear, here is the output from iptables
>>> (both master and worker):
>>>
>>> jie@jie-OptiPlex-7010:~/spark$ sudo ufw status
>>> Status: inactive
>>> jie@jie-OptiPlex-7010:~/spark$ sudo iptables -L
>>> Chain INPUT (policy ACCEPT)
>>> target prot opt source   destination
>>>
>>> Chain FORWARD (policy ACCEPT)
>>> target prot opt source   destination
>>>
>>> Chain OUTPUT (policy ACCEPT)
>>> target prot opt source   destination
>>>
>>>
>>>
>>>
>>> 2013/12/17 Andrew Ash 
>>>
>>>> Hi Jie,
>>>>
>>>> When you say firewall is closed does that mean ports are blocked
>>>> between the worker nodes?  I believe workers start up on a random port and
>>>> send data directly between each other during shuffles.  Your firewall may
>>>> be blocking those connections.  Can you try with the firewall temporarily
>>>> disabled?
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Mon, Dec 16, 2013 at 9:58 AM, Jie Deng  wrote:
>>>>
>>>>> Hi,
>>>>> Thanks for reading,
>>>>>
>>>>> I am trying to running a spark program on cluster. The program can
>>>>> successfully running on local;
>>>>> The standalone topology is working, I can see workers from master
>>>>> webUI; Master and worker are different machine, and worker status is 
>>>>> ALIVE;
>>>>> The thing is no matter I start a program from eclipse or
>>>>> ./run-example, they both stop at some point like:
>>>>> Stage Id Description SubmittedDuration Tasks: Succeeded/TotalShuffle
>>>>> Read Shuffle Write 0 count at 
>>>>> SparkExample.java:31<http://jie-optiplex-7010.local:4040/stages/stage?id=0>2013/12/16
>>>>>  14:50:367 m
>>>>> 0/2
>>>>>  And after a while, the worker's state become DEAD.
>>>>>
>>>>> Spark directory on worker is copy from master by ./make-distribution,
>>>>> firewall is all closed.
>>>>>
>>>>> Has anyone has the same issue before?
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Task not running in standalone cluster

2013-12-16 Thread Andrew Ash
Hi Jie,

When you say firewall is closed does that mean ports are blocked between
the worker nodes?  I believe workers start up on a random port and send
data directly between each other during shuffles.  Your firewall may be
blocking those connections.  Can you try with the firewall temporarily
disabled?

Andrew


On Mon, Dec 16, 2013 at 9:58 AM, Jie Deng  wrote:

> Hi,
> Thanks for reading,
>
> I am trying to running a spark program on cluster. The program can
> successfully running on local;
> The standalone topology is working, I can see workers from master webUI;
> Master and worker are different machine, and worker status is ALIVE;
> The thing is no matter I start a program from eclipse or ./run-example,
> they both stop at some point like:
> Stage Id Description SubmittedDuration Tasks: Succeeded/TotalShuffle 
> ReadShuffle Write0 count
> at 
> SparkExample.java:312013/12/16
>  14:50:367 m
> 0/2
>  And after a while, the worker's state become DEAD.
>
> Spark directory on worker is copy from master by ./make-distribution,
> firewall is all closed.
>
> Has anyone has the same issue before?
>


Re: reading LZO compressed file in spark

2013-12-16 Thread Andrew Ash
Hi Rajeev,

It looks like you're using the com.hadoop.mapred.DeprecatedLzoTextInputFormat
input format above, while Stephen referred to com.hadoop.mapreduce.
LzoTextInputFormat

I think the way to use this in Spark would be to use the
SparkContext.hadoopFile() or SparkContext.newAPIHadoopFile() methods with
the path and the InputFormat as parameters.  Can you give those a shot?

Andrew


On Wed, Dec 11, 2013 at 8:59 PM, Rajeev Srivastava  wrote:

> Hi Stephen,
>  I tried the same lzo file with a simple hadoop script
> this seems to work fine
>
> HADOOP_HOME=/usr/lib/hadoop
> /usr/bin/hadoop  jar
> /opt/cloudera/parcels/CDH-4.4.0-1.cdh4.4.0.p0.39/lib/hadoop-mapreduce/hadoop-streaming.jar
> \
> -libjars
> /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo-cdh4-0.4.15-gplextras.jar
> \
> -input /tmp/ldpc.sstv3.lzo \
> -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
> -output wc_test \
> -mapper 'cat' \
> -reducer 'wc -l'
>
> This means hadoop is able to handle the lzo file correctly
>
> Can you suggest me what i should do in spark for it to work
>
> regards
> Rajeev
>
>
> Rajeev Srivastava
> Silverline Design Inc
> 2118 Walsh ave, suite 204
> Santa Clara, CA, 95050
> cell : 408-409-0940
>
>
> On Tue, Dec 10, 2013 at 1:20 PM, Stephen Haberman <
> stephen.haber...@gmail.com> wrote:
>
>>
>> > System.setProperty("spark.io.compression.codec",
>> > "com.hadoop.compression.lzo.LzopCodec")
>>
>> This spark.io.compression.codec is a completely different setting than the
>> codecs that are used for reading/writing from HDFS. (It is for compressing
>> Spark's internal/non-HDFS intermediate output.)
>>
>> > Hope this helps and someone can help read a LZO file
>>
>> Spark just uses the regular Hadoop File System API, so any issues with
>> reading
>> LZO files would be Hadoop issues. I would search in the Hadoop issue
>> tracker,
>> and look for information on using LZO files with Hadoop/Hive, and
>> whatever works
>> for them, should magically work for Spark as well.
>>
>> This looks like a good place to start:
>>
>> https://github.com/twitter/hadoop-lzo
>>
>> IANAE, but I would try passing one of these:
>>
>>
>> https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java
>>
>> To the SparkContext.hadoopFile method.
>>
>> - Stephen
>>
>>
>


Retry instead of die on workers connect failure

2013-12-11 Thread Andrew Ash
Hi Spark users,

I'm observing behavior where if a master node goes down for a restart, all
the worker JVMs die (in standalone cluster mode).  In other cluster
computing setups with master-worker relationships (namely Hadoop), if a
worker can't connect to the master or its connection drops it retries a few
times and then does exponential backoff or similar.

In Spark though the worker just dies.  Is the die-on-disconnect behavior
intentional or would people be ok with 3 retries 5sec apart and then
exponential backoff?

Andrew


Re: reading LZO compressed file in spark

2013-12-10 Thread Andrew Ash
I'm interested in doing this too Rajeev. Did you make any progress?


On Mon, Dec 9, 2013 at 1:57 PM, Rajeev Srivastava
wrote:

> Hello experts,
>  I would like to read a LZO splittable compressed file into spark.
> I have followed available material on the web on working with LZO
> compressed data.
> I am able to create the index file needed by hadoop.
>
> But i am unable to read the LZO file in spark. I use spark 0.7.2
>
> I would like to know if someone has had success reading a large LZO
> compressed file.
>
> regards
> Rajeev Srivastava
> Silverline Design Inc
> 2118 Walsh ave, suite 204
> Santa Clara, CA, 95050
> cell : 408-409-0940
>


Re: Why does sortByKey launch cluster job?

2013-12-10 Thread Andrew Ash
Since sortByKey() invokes those right now, we should either a) change the
documentation to treat note that it kicks off actions or b) change the
method to execute those things lazily.

Personally I'd prefer b but don't know how difficult that would be.


On Tue, Dec 10, 2013 at 1:52 AM, Jason Lenderman wrote:

> Hey Ryan,
>
> The *sortByKey* method creates a *RangePartitioner* (see
> Partitioner.scala), and the initialization code of the 
> *RangePartitioner*invokes actions
> *count* and *sample*.
>
>
> Jason
>
>
>
>
> On Mon, Dec 9, 2013 at 7:01 PM, Ryan Prenger  wrote:
>
>> sortByKey is listed as a data transformation, not an action, yet it
>> launches a job.  This doesn't seem to square with the documentation.
>>
>> Ryan
>>
>
>


Re: Spark Import Issue

2013-12-08 Thread Andrew Ash
Also note that when you add parameters to the -cp flag on the JVM and want
to include multiple jars, the only way to do that is by including an entire
directory with "dir/*" -- you can't use "dir/*jar" or "dir/spark*jar" or
anything else like that.

http://stackoverflow.com/questions/219585/setting-multiple-jars-in-java-classpath


On Sun, Dec 8, 2013 at 12:25 AM, Matei Zaharia wrote:

> I’m not sure you can have a star inside that quoted classpath argument
> (the double quotes may cancel the *). Try using the JAR through its full
> name, or link to Spark through Maven (
> http://spark.incubator.apache.org/docs/latest/quick-start.html#a-standalone-app-in-java
> ).
>
> Matei
>
> On Dec 6, 2013, at 9:50 AM, Garrett Hamers  wrote:
>
> Hello,
>
> I am new to the spark system, and I am trying to write a simple program to
> get myself familiar with how spark works. I am currently having problem
> with importing the spark package. I am getting the following compiler
> error: package org.apache.spark.api.java does not exist.
>
> I have spark-0.8.0-incubating install. I ran the commands: sbt/sbt
> compile, sbt/sbt assembly, and sbt/sbt publish-local without any errors. My
> sql.java file is located in the spark-0.8.0-incubating root directory. I
> tried to compile the code using “javac sql.java” and “javac -cp
> "assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating*.jar"
> sql.java”.
>
> Here is the code for sql.java:
>
> package shark;
>
> import java.io.Serializable;
>
> import java.util.List;
>
> import java.io.*;
>
> import org.apache.spark.api.java.*; //Issue is here
>
> public class sql implements Serializable {
>
>   public static void main( String[] args) {
>
> System.out.println("Hello World”);
>
>   }
>
> }
>
>
>  What do I need to do in order for java to import the spark code properly?
> Any advice would be greatly appreciated.
>
> Thank you,
> Garrett Hamers
>
>
>


Re: Bump: on disk storage formats

2013-12-08 Thread Andrew Ash
LZO compression at a minimum, and using Parquet as a second step,
seems like the way to go though I haven't tried either personally yet.

Sent from my mobile phone

On Dec 8, 2013, at 16:54, Ankur Chauhan  wrote:

> Hi all,
>
> Sorry for posting this again but I am interested in finding out what 
> different on disk data formats for storing timeline event and analytics 
> aggregate data.
>
> Currently I am just using newline delimited json gzipped files. I was 
> wondering if there were any recommendations.
>
> -- Ankur


Re: Pre-build Spark for Windows 8.1

2013-12-07 Thread Andrew Ash
Thanks for the info Matei.  Seems like lots of other users of Akka have
similar issues -- maybe at some point in the future it'll be worth making
this a bit more flexible, but there are more important places to spend time
right now.


On Fri, Dec 6, 2013 at 12:06 PM, Matei Zaharia wrote:

> Hey Andrew, unfortunately I don’t know how easy this is. Maybe future
> versions of Akka have it. We can certainly ask them to do it in general but
> I imagine there are some use cases where they wanted this behavior.
>
> Matei
>
>
> On Dec 5, 2013, at 2:49 PM, Andrew Ash  wrote:
>
> Speaking of akka and host sensitivity...  How much have you hacked on akka
> to get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1?
>  It's kind of a pain to get the Spark URL to exactly match.  I'm wondering
> if there are usability gains that could be made here or if we're pretty
> stuck.
>
>
> On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia wrote:
>
>> Hi,
>>
>> When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the
>> URL (uppercase instead of lowercase). Unfortunately Akka is very specific
>> about seeing hostnames written in the same way on each node, or else it
>> thinks the message is for another machine!
>>
>> Matei
>>
>> On Dec 5, 2013, at 8:27 AM, Adrian Bonar 
>> wrote:
>>
>>  The master starts up now as expected but the workers are unable to
>> connect to the master. It looks like the master is refusing the connection
>> messages but I’m not sure why. The first two error lines below are from
>> trying to connect a worker from a separate machine and the last two error
>> lines are from trying to connect a worker on the same machine as the
>> master. I verified that the workers do not show up in the master’s web ui.
>>
>> MASTER:
>> D:\spark>spark-class org.apache.spark.deploy.master.Master
>> 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
>> 13/12/05 08:08:34 INFO master.Master: Starting Spark master at
>> spark://ADRIBONA-DEV-1:7077
>> 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/metrics/master/json,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/static,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/app/json,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/app,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{/json,null}
>> 13/12/05 08:08:34 INFO handler.ContextHandler: started
>> o.e.j.s.h.ContextHandler{*,null}
>> 13/12/05 08:08:34 INFO server.AbstractConnector: Started
>> SelectChannelConnector@0.0.0.0:8088
>> 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at
>> http://ADRIBONA-DEV-1:8088 <http://adribona-dev-1:8088/>
>> 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
>> RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
>> for non-local recipient
>> akka://sparkMaster@adribona-dev-1:7077/user/Master at
>> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
>> akka://sparkMaster@ADRIBONA-DEV-1:7077
>> 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
>> DaemonMsgWatch(Actor[
>> akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
>> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
>> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
>> akka://sparkMaster@ADRIBONA-DEV-1:7077
>> 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
>> RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
>> for non-local recipient
>> akka://sparkMaster@adribona-dev-1:7077/user/Master at
>> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
>> akka://sparkMaster@ADRIBONA-DEV-1:7077
>> 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
>> DaemonMsgWatch(Actor[
>> akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
>> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
>> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
>> akka://sparkMaster@ADRIBONA-DEV-1:70

Re: Pre-build Spark for Windows 8.1

2013-12-05 Thread Andrew Ash
Speaking of akka and host sensitivity...  How much have you hacked on akka
to get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1?
 It's kind of a pain to get the Spark URL to exactly match.  I'm wondering
if there are usability gains that could be made here or if we're pretty
stuck.


On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia wrote:

> Hi,
>
> When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the
> URL (uppercase instead of lowercase). Unfortunately Akka is very specific
> about seeing hostnames written in the same way on each node, or else it
> thinks the message is for another machine!
>
> Matei
>
> On Dec 5, 2013, at 8:27 AM, Adrian Bonar 
> wrote:
>
>  The master starts up now as expected but the workers are unable to
> connect to the master. It looks like the master is refusing the connection
> messages but I’m not sure why. The first two error lines below are from
> trying to connect a worker from a separate machine and the last two error
> lines are from trying to connect a worker on the same machine as the
> master. I verified that the workers do not show up in the master’s web ui.
>
> MASTER:
> D:\spark>spark-class org.apache.spark.deploy.master.Master
> 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
> 13/12/05 08:08:34 INFO master.Master: Starting Spark master at
> spark://ADRIBONA-DEV-1:7077
> 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/metrics/master/json,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/static,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/app/json,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/app,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/json,null}
> 13/12/05 08:08:34 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{*,null}
> 13/12/05 08:08:34 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:8088
> 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at
> http://ADRIBONA-DEV-1:8088 
> 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
> RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master
>  at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
> akka://sparkMaster@ADRIBONA-DEV-1:7077
> 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
> DaemonMsgWatch(Actor[
> akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
> akka://sparkMaster@ADRIBONA-DEV-1:7077
> 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
> RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master
>  atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is
> akka://sparkMaster@ADRIBONA-DEV-1:7077
> 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
> DaemonMsgWatch(Actor[
> akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
> for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
> akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
> akka://sparkMaster@ADRIBONA-DEV-1:7077
>
> WORKER:
> D:\spark>spark-class.cmd org.apache.spark.deploy.worker.Worker
> spark://adribona-dev-1:7077
> 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
> 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker
> ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
> 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
> 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/metrics/json,null}
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/static,null}
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/log,null}
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/logPage,null}
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{/json,null}
> 13/12/05 08:18:46 INFO handler.ContextHandler: started
> o.e.j.s.h.ContextHandler{*,null}
> 13/12/05 08:18:46 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:8081
> 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker we

Re: How to balance task load

2013-12-05 Thread Andrew Ash
Hi Hao,

Where tasks go is influenced by where the data they operate on resides.  If
the data is on one executor, it may make more sense to do all the
computation on that node rather than ship data across the network.  How was
the data distributed across your cluster?

Andrew


On Mon, Dec 2, 2013 at 7:52 AM, Hao REN  wrote:

> Sorry for spam.
>
> To complete the my previous post:
>
> The map action sometimes creates 4 tasks which are all executed by the
> same executor.
>
> I believe that if a task dispatch like:
> executor_0 : 1 task;
> executor_1 : 1 task;
> executor_2 : 2 task;
> it will give a better performance.
>
> Can we force this kind of schedule in Spark ?
>
> Thank you.
>
>
>
> 2013/12/2 Hao REN 
>
>> Hi,
>>
>> When running some tests on EC2 with spark, I notice that: the tasks are
>> not fairly distributed to executor.
>>
>> For example, a map action produces 4 tasks, but they all go to the
>>
>>
>> Executors (3)
>>
>>- *Memory:* 0.0 B Used (19.0 GB Total)
>>- *Disk:* 0.0 B Used
>>
>>  Executor IDAddress RDD blocksMemory used Disk usedActive tasks Failed
>> tasksComplete tasks Total tasks 0 ip-10-10-141-143.ec2.internal:52816 00.0 B 
>> / 6.3 GB0.0 B40041
>> ip-10-40-38-190.ec2.internal:60314 0 0.0 B / 6.3 GB 0.0 B0 0 00 
>> 2ip-10-62-35-223.ec2.internal:405.0 B / 6.3 GB0.0 B
>>
>>
>>
>>
>>
>>
>>
>
>
> --
> REN Hao
>
> Data Engineer @ ClaraVista
>
> Paris, France
>
> Tel:  +33 06 14 54 57 24
>


Memory configuration of standalone clusters

2013-12-04 Thread Andrew Ash
Hello,

I have a few questions about configuring memory usage on standalone
clusters.  Can someone help me out?

1) The terms "slave" in ./bin/start-slaves.sh and "worker" in the docs seem
to be used interchangeably.  Are they the same?

2) On a worker/slave, is there only one JVM running that has all the data
in it, or is there a separate JVM spun up for each application (Hadoop
style)?  Ignoring the SPARK_WORKER_INSTANCES setting.

3) There are lots of configuration options for defining memory usage.  What
do they all mean?  Is my below summary correct?
a) SPARK_WORKER_MEMORY -- maximum amount of memory that the Spark worker
will ever use, regardless of applications started.  This sets the maximum
heap size of the Spark worker JVM
b) ./bin/start-worker.sh --memory -- same as SPARK_WORKER_MEMORY?  If both
are set, which takes priority?
c) SPARK_JAVA_OPTS="-Xmx512m -Xms512m" -- same as (a) and (b)?
d) -Dspark.executor.memory -- maximum amount of memory this particular
application will use on a single worker

If that's correct I'll send a PR to the docs that would have clarified
these for me.

http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties
http://spark.incubator.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts


4) If I set SPARK_WORKER_MEMORY really high, I think I then also have to
set spark.executor.memory really high to take advantage of it (since
spark.executor.memory is 512m by default).  Is there a way to optimize a
cluster for the "one big application" scenario better than manually keeping
these in sync?  I'd like spark.executor.memory to match SPARK_WORKER_MEMORY
if it's not set, I think.


Thanks!
Andrew


Re: Splitting into partitions and sorting the partitions ... how to do that?

2013-12-04 Thread Andrew Ash
How important is it that they're partitioned on hashcode() % 32 rather than
Spark's default partitioning?

In scala, you should be able to do this with
rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))

I'm not sure what your end goal is here, but if it's just sort a bunch of
data and remove duplicates, then that should be
rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)


On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs  wrote:

> Thanks for your answer. But the problem is that I only want to sort the 32
> partitions, individually,
> not the complete input. So yes, the output has to consist of 32
> partitions, each sorted.
>
> Ceriel Jacobs
>
>
>
> On 12/04/2013 06:30 PM, Ashish Rangole wrote:
>
>> I am not sure if 32 partitions is a hard limit that you have.
>>
>> Unless you have a strong reason to use only 32 partitions, please try
>> providing the second optional
>> argument (numPartitions) to reduceByKey and sortByKey methods which will
>> paralellize these Reduce operations.
>> A number 3x the number of total cores on the cluster would be a good
>> value to try for numPartitions.
>>
>> http://spark.incubator.apache.org/docs/latest/tuning.html#
>> memory-usage-of-reduce-tasks
>>
>> In case you have to have 32 partitions in the final output, you can use
>> coalesce(32) method on your
>> RDD at the time of final output.
>>
>
>  On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs > c.j.h.jac...@vu.nl>> wrote:
>>
>> Hi,
>>
>> I am a novice to SPARK, and need some help with the following problem:
>> I have a
>>  JavaRDD strings;
>> which is potentially large, hundreds of GBs, and I need to split them
>> into 32 partitions, by means of hashcode()%32, and then sort these
>> partitions,
>> and also remove duplicates. I am having trouble finding an efficient
>> way of
>> expressing this in SPARK. I think I need an RDD to be able to sort,
>> so in
>> this case, I need 32 of them. So I first created an RDD with pairs
>> ,
>> like this:
>>
>>  JavaPairRDD hashStrings = strings
>>  .keyBy(new Function() {
>>  @Override
>>  public Integer call(String s) {
>>  return new Integer(s.hashCode() % 32);
>>  }
>>  });
>>
>> And then I launch 32 threads that do the following (each thread has
>> its own partition):
>>
>>  // Filter for my own partition
>>  JavaPairRDD filtered = hashStrings
>>  .filter(new Function,
>> Boolean>() {
>>  @Override
>>  public Boolean call(Tuple2
>> tpl) {
>>  return tpl._1 == partition;
>>  }
>>  });
>>  JavaRDD values = filtered.values();
>>
>>  // Pair with a boolean, so that we can use sortByKey().
>>  JavaPairRDD values1 =
>>  values.map(new PairFunction> Boolean>() {
>>  @Override
>>  public Tuple2 call(String
>> s) {
>>  return new Tuple2(s,
>> true);
>>  }
>>  });
>>
>>  // Reduce by key to remove duplicates.
>>  JavaPairRDD reduced =
>>  values1.reduceByKey(
>>  new Function2> Boolean>() {
>>  @Override
>>  public Boolean call(Boolean i1,
>>  Boolean i2) {
>>  // return i1 + i2;
>>  return true;
>>  }
>>  });
>>
>>  // Sort and extract keys.
>>  JavaRDD result = reduced.sortByKey().keys();
>>
>> This works for not so large input, but for larger I get all kinds of
>> out-of-memory
>> exceptions. I'm running on 8 nodes, each with 8 cores, and am using
>> SPARK_MEM=16G.
>> I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but
>> that just seems to
>> make things much slower, and still gives out-of-memory exceptions.
>>
>> Now I'm pretty sure that the way I obtain the partitions is really
>> inefficient, and I also
>> have my doubts about starting the RDDs in separate threads. So, what
>> would be the best way
>> to deal with this?
>>
>> Thanks in advance for any hints that you can give me.
>>
>> Ceriel Jacobs
>>
>>
>>
>


Re: Serializable incompatible with Externalizable error

2013-12-02 Thread Andrew Ash
At least from
http://stackoverflow.com/questions/817853/what-is-the-difference-between-serializable-and-externalizable-in-javait
looks like Externalizable is roughly an old-java version of
Serializable.  Does that class implement both interfaces?  Can you take
away the Externalizable interface if it's being used?


On Mon, Dec 2, 2013 at 7:15 PM, Matt Cheah  wrote:

>  Hi everyone,
>
>  I'm running into a case where I'm creating a Java RDD of an
> Externalizable class, and getting this stack trace:
>
>  java.io.InvalidClassException (java.io.InvalidClassException:
> com.palantir.finance.datatable.server.spark.WritableDataRow; Serializable
> incompatible with Externalizable)
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:634)
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483) other Java stuff>
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
> I'm running on a spark cluster generated by the EC2 Scripts. This doesn't
> happen if I'm running things with local[N]. Any ideas?
> Thanks,
> -Matt Cheah
>


Re: forcing node local processing

2013-11-26 Thread Andrew Ash
I've seen issues where no task is node local because the hostname on Spark
and hadoop is close but not quite the same -- e.g. myhost.andrewash.com vs
myhost  One if FQDN the other is not.  Can you confirm that the hostname
present in the master's webui (:8080) in the address column of the workers
section matches what's listed in the namenode's webui? (:50070)

Example:
13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID
75 on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242
(non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202)

The line you pasted is PROCESS_LOCAL, which means that the output of one
transformation is the input of the next one in the same JVM.  If you were
reading out of HBase, I would expect you could only achieve NODE_LOCALITY
on the initial data read since the data isn't yet loaded into the Spark
process.  Can you run a simple query that doesn't involve any
transformation?  I would imagine a simple "select * from table limit 10" or
equivalent would do it.

Andrew



On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed wrote:

> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that
> does not seem to be impacting me positively...
>
>
> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed wrote:
>
>> Thanks - I have pondered that piece of code long and hard trying
>> different combinations of each of those - e.g. setting spark.
>> locality.wait and spark.locality.wait.node very high and the others very
>> low  -- nothing. It worked on 0.7 but at that point we were using lots of
>> hbase regions/spark partitions/tasks and spark.spreadout = true (I think
>> that is no longer supported)
>>
>> The one clue is that it sometimes uses node local sporadically. I would
>> be more than happy to provide tons of logs but there isn't as far as I can
>> see any logging of this part of the code other than many lines all saying
>> something like:
>>
>> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4]
>> Starting task 2.0:0 as TID 104 on executor 0:  (PROCESS_LOCAL)
>>
>> and the fact that the UI shows the RDD not partitioning across the
>> appropriate hbase region nodes. I was thinking this was some sort of DNS
>> short vs full name but changing that didn't seem to do anything.
>>
>>
>> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash  wrote:
>>
>>> Do you also set any of spark.locality.wait.{process,node,rack} ?  Those
>>> override spark.locality.wait for specific locality levels.
>>>
>>>   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
>>> val defaultWait = System.getProperty("spark.locality.wait", "3000")
>>> level match {
>>>   case TaskLocality.PROCESS_LOCAL =>
>>> System.getProperty("spark.locality.wait.process",
>>> defaultWait).toLong
>>>   case TaskLocality.NODE_LOCAL =>
>>> System.getProperty("spark.locality.wait.node",
>>> defaultWait).toLong
>>>   case TaskLocality.RACK_LOCAL =>
>>> System.getProperty("spark.locality.wait.rack",
>>> defaultWait).toLong
>>>   case TaskLocality.ANY =>
>>> 0L
>>> }
>>>   }
>>>
>>> The other option I'm thinking is maybe these tasks are jumping straight
>>> to TaskLocality.ANY with no locality preference.  Do you have any logs you
>>> can share that include this fallback to less-preferred localities?
>>>
>>> Did you have this working properly on 0.7.x ?
>>>
>>>
>>>
>>> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed 
>>> wrote:
>>>
>>>> Hi Andrew  - thanks - that's a good thought - unfortunately, I have
>>>> those set in the same pre context creation place as all the other variables
>>>> that I have been using for months quite happily and that seem to impact
>>>> Spark nicely. I have it set to Int.MaxValue.toString which I am guessing is
>>>> large enough.
>>>>
>>>> It very occasionally will use all data local nodes, and sometimes a
>>>> mix, but mostly all process-local...
>>>>
>>>>
>>>> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash wrote:
>>>>
>>>>> Hi Erik,
>>>>>
>>>>> I would guess that if you set spark.locality.wait to an absurdly large
>>>>> value then you would have essentially that effect.
>>>>>
>>>>> Maybe you aren't settin

Re: forcing node local processing

2013-11-26 Thread Andrew Ash
Do you also set any of spark.locality.wait.{process,node,rack} ?  Those
override spark.locality.wait for specific locality levels.

  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = System.getProperty("spark.locality.wait", "3000")
level match {
  case TaskLocality.PROCESS_LOCAL =>
System.getProperty("spark.locality.wait.process",
defaultWait).toLong
  case TaskLocality.NODE_LOCAL =>
System.getProperty("spark.locality.wait.node", defaultWait).toLong
  case TaskLocality.RACK_LOCAL =>
System.getProperty("spark.locality.wait.rack", defaultWait).toLong
  case TaskLocality.ANY =>
0L
}
  }

The other option I'm thinking is maybe these tasks are jumping straight to
TaskLocality.ANY with no locality preference.  Do you have any logs you can
share that include this fallback to less-preferred localities?

Did you have this working properly on 0.7.x ?



On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed wrote:

> Hi Andrew  - thanks - that's a good thought - unfortunately, I have those
> set in the same pre context creation place as all the other variables that
> I have been using for months quite happily and that seem to impact Spark
> nicely. I have it set to Int.MaxValue.toString which I am guessing is large
> enough.
>
> It very occasionally will use all data local nodes, and sometimes a mix,
> but mostly all process-local...
>
>
> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash  wrote:
>
>> Hi Erik,
>>
>> I would guess that if you set spark.locality.wait to an absurdly large
>> value then you would have essentially that effect.
>>
>> Maybe you aren't setting the system property before creating your Spark
>> context?
>>
>> http://spark.incubator.apache.org/docs/latest/configuration.html
>>
>> Andrew
>>
>>
>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed wrote:
>>
>>> Hi All,
>>> After switching to 0.8, and reducing the number of partitions/tasks for
>>> a large scale computation, I have been unable to force Spark to use only
>>> executors on nodes where hbase data is local. I have not been able to find
>>> a setting for spark.locality.wait that makes any difference. It is not an
>>> option for us to let spark chose non data local nodes. Is their some
>>> example code of how to get this to work the way we want? We have our own
>>> input RDD that mimics the NewHadoopRdd and it seems to be doing the correct
>>> thing in all regards wrt to preferred locations.
>>>
>>> Do I have to write my own compute Tasks and schedule them myself?
>>>
>>> Anyone have any suggestions? I am stumped.
>>>
>>> cheers,
>>> Erik
>>>
>>>
>>>
>>
>
>
> --
> Erik James Freed
> CoDecision Software
> 510.859.3360
> erikjfr...@codecision.com
>
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
>
> 179 Maria Lane
> Orcas, WA
> 98245
>


Re: forcing node local processing

2013-11-26 Thread Andrew Ash
Hi Erik,

I would guess that if you set spark.locality.wait to an absurdly large
value then you would have essentially that effect.

Maybe you aren't setting the system property before creating your Spark
context?

http://spark.incubator.apache.org/docs/latest/configuration.html

Andrew


On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed wrote:

> Hi All,
> After switching to 0.8, and reducing the number of partitions/tasks for a
> large scale computation, I have been unable to force Spark to use only
> executors on nodes where hbase data is local. I have not been able to find
> a setting for spark.locality.wait that makes any difference. It is not an
> option for us to let spark chose non data local nodes. Is their some
> example code of how to get this to work the way we want? We have our own
> input RDD that mimics the NewHadoopRdd and it seems to be doing the correct
> thing in all regards wrt to preferred locations.
>
> Do I have to write my own compute Tasks and schedule them myself?
>
> Anyone have any suggestions? I am stumped.
>
> cheers,
> Erik
>
>
>


Re: Kryo serialization for shuffles

2013-11-25 Thread Andrew Ash
Hi Matei, I've clarified the documentation to include this information in
this pull request.  Can you take a look?

https://github.com/apache/incubator-spark/pull/206


On Mon, Nov 25, 2013 at 5:03 PM, Matei Zaharia wrote:

> Yeah, if you just say spark.serializer to Kryo, it will use it for all
> these things.
>
> Matei
>
> On Nov 25, 2013, at 4:59 PM, Andrew Ash  wrote:
>
> How do you know Spark doesn't also use Kryo for shuffled files?  Are there
> metrics or logs somewhere that make you believe it's normal Java
> serialization?
>
>
> On Mon, Nov 25, 2013 at 4:46 PM, Mayuresh Kunjir <
> mayuresh.kun...@gmail.com> wrote:
>
>> This shows how to serialize user classes. I wanted Spark to serialize all
>> shuffle files and object files using Kryo. How can I specify that? Or would
>> that be done by default if I just set spark.serializer to kryo?
>>
>>
>>
>>
>> On Mon, Nov 25, 2013 at 7:42 PM, Matei Zaharia 
>> wrote:
>>
>>> Did you look through
>>> http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization?It
>>>  shows an example of how to register classes with Kryo. In particular, in
>>> your Registrator, you can use kryo.register(yourClass, new YourSerializer)
>>> to pass a custom serializer too.
>>>
>>> Matei
>>>
>>> On Nov 25, 2013, at 4:25 PM, Mayuresh Kunjir 
>>> wrote:
>>>
>>> Hi Spark users,
>>>
>>> This has probably been answered before, but I could not locate it. I
>>> understand from the tuning guide that using Kryo serialization for shuffles
>>> improves the performance. I would like to know how to register the Kryo
>>> serializer. Apart from the shuffles, my standalone application needs to
>>> store and retrieve a few object files as well. I would really appreciate
>>> any pointers on registering Kryo serializer for both these serialization
>>> tasks.
>>>
>>> Thanks and regards,
>>> ~Mayuresh
>>>
>>>
>>>
>>>
>>
>
>


Re: step-by-step recipe for running spark 0.8 on ec2

2013-11-25 Thread Andrew Ash
I believe this page has what you're looking for:
http://spark.incubator.apache.org/docs/latest/ec2-scripts.html


On Mon, Nov 25, 2013 at 4:57 PM, Walrus theCat wrote:

> Hi,
>
> I just updated my imports and tried to run my app using Spark 0.8, but it
> breaks.  The AMI's spark-shell says it's 0.7.3 or thereabouts, which is
> what my app previously used.  What is the official, step-by-step solution
> to using Spark 0.8 on EC2?
>
> Thanks
>


Re: Kryo serialization for shuffles

2013-11-25 Thread Andrew Ash
How do you know Spark doesn't also use Kryo for shuffled files?  Are there
metrics or logs somewhere that make you believe it's normal Java
serialization?


On Mon, Nov 25, 2013 at 4:46 PM, Mayuresh Kunjir
wrote:

> This shows how to serialize user classes. I wanted Spark to serialize all
> shuffle files and object files using Kryo. How can I specify that? Or would
> that be done by default if I just set spark.serializer to kryo?
>
>
>
>
> On Mon, Nov 25, 2013 at 7:42 PM, Matei Zaharia wrote:
>
>> Did you look through
>> http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization?It
>>  shows an example of how to register classes with Kryo. In particular, in
>> your Registrator, you can use kryo.register(yourClass, new YourSerializer)
>> to pass a custom serializer too.
>>
>> Matei
>>
>> On Nov 25, 2013, at 4:25 PM, Mayuresh Kunjir 
>> wrote:
>>
>> Hi Spark users,
>>
>> This has probably been answered before, but I could not locate it. I
>> understand from the tuning guide that using Kryo serialization for shuffles
>> improves the performance. I would like to know how to register the Kryo
>> serializer. Apart from the shuffles, my standalone application needs to
>> store and retrieve a few object files as well. I would really appreciate
>> any pointers on registering Kryo serializer for both these serialization
>> tasks.
>>
>> Thanks and regards,
>> ~Mayuresh
>>
>>
>>
>>
>


Re: Setting spark.akka.frameSize to the max

2013-11-25 Thread Andrew Ash
Thanks for the quick response Patrick!

The downsides of always allocating an overly large buffer make sense.  I'll
keep that in mind as I tune that setting for my workload.

Also I observed the error this past weekend on 0.8.0, though I don't
remember if it was during fetching results specifically or some other
stage.  I'll try to get you a copy of that stacktrace so we have something
tangible to discuss.

Andrew


On Mon, Nov 25, 2013 at 10:10 AM, Patrick Wendell wrote:

> Good question, I think inside of akka they will allocate a buffer of
> this size for every message. So if you set it super high you'll waste
> some memory temporarily allocating these buffers.
>
> The main issue with this IIRC was for fetching results, which we fixed
> in 0.8.0 to use a different communication library.
>
> - Patrick
>
> On Mon, Nov 25, 2013 at 9:29 AM, Andrew Ash  wrote:
> > There have been a number of threads on this list about needing to set
> > spark.akka.frameSize to something higher than the default.  The issue
> seems
> > to come up most when one key in a groupByKey has particularly large
> amounts
> > of data.
> >
> > What is the downside to setting this configuration parameter to the
> maximum
> > value by default?
> >
> > Andrew
>


Setting spark.akka.frameSize to the max

2013-11-25 Thread Andrew Ash
There have been a number of threads on this list about needing to set
spark.akka.frameSize to something higher than the default.  The issue seems
to come up most when one key in a groupByKey has particularly large amounts
of data.

What is the downside to setting this configuration parameter to the maximum
value by default?

Andrew


Whitelisting Spark ports in iptables

2013-11-24 Thread Andrew Ash
Hi Spark list,

I'm looking to apply some iptables firewall rules to my spark cluster and
am not entirely sure what ports are required.  I didn't see any specific
documentation of what ports Spark requires, so compiled this (incomplete)
list.

*From* *To* *Port (default)* *Purpose* admin machine master 8080 master
webui admin machine worker 8081 worker webui admin machine application
4040application webuiworkermaster7077join clusterworkerworker?shuffle
datamasterworker?schedule jobs
Can someone help me out with the last couple ports and fill in any other
entries that might be missing?

Thanks!
Andrew