Re: GraphX with UUID vertex IDs instead of Long

2014-02-24 Thread Christopher Nguyen
Deepak, to be sure, I was referring to sequential guarantees with the longs.

I would suggest being careful with taking half the UUID as the probability
of collision can be unexpectedly high. Many bits of the UUID is typically
time-based so collision among those bits is virtually guaranteed with
probability 1 when parallelized. Even if you can optimistically find some
64 uniformly random bits to use, due to the birthday paradox, the collision
probability among 1 billion (2^32) values is something like 1 - exp(-1/2),
or a very uncomfortable 40%. If you have orders of magnitude fewer
edges/vertices, you'd have a wider margin of safety---but estimate it to be
sure.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Mon, Feb 24, 2014 at 12:38 PM, Deepak Nulu  wrote:

> Thanks Christopher, I will look into the StackOverflow suggestion of
> generating 64-bit UUIDs in the same fashion as 128-bit UUIDs.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-with-UUID-vertex-IDs-instead-of-Long-tp1953p1990.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: GraphX with UUID vertex IDs instead of Long

2014-02-24 Thread Christopher Nguyen
Deepak, depending on your use case, you might find it appropriate and
certainly easy create a lightweight sequence number service that serves
requests from parallel clients.

http://stackoverflow.com/questions/2671858/distributed-sequence-number-generation/5685869

There's no shame in using non-Spark or custom services in aid of Spark
processing :)
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Mon, Feb 24, 2014 at 10:53 AM, Deepak Nulu  wrote:

> Hi Evan,
>
> Thanks for the quick response. The only mapping between UUIDs and Longs
> that
> I can think of is one where I sequentially assign Longs as I load the UUIDs
> from the DB. But this results in having to centralize this mapping. I am
> guessing that centralizing this is not a good idea for a distributed graph
> processing engine.
>
> Also, I will be running Spark on the same nodes as my distributed DB
> (Cassandra) and I am hoping that the Spark worker on each node will load
> the
> data from the local Cassandra node. I am not sure if this is possible with
> GraphX, but I am hoping it is, and therefore my concern with centralizing
> the UUID<->Long mapping.
>
> Thanks.
>
> -deepak
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-with-UUID-vertex-IDs-instead-of-Long-tp1953p1982.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Connecting an Application to the Cluster

2014-02-17 Thread Christopher Nguyen
David, actually, it's the driver that "creates" and holds a reference to
the SparkContext. The master in this context is only a resource manager
providing information about the cluster, being aware of where workers are,
how many there are, etc.

The SparkContext object can get serialized/deserialized and
instantiated/made available elsewhere (e.g., on the worker nodes), but this
is being overly precise and doesn't apply directly to the question you're
asking.

So yes, if you do collect(), you will be able to see the results on your
local console.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Mon, Feb 17, 2014 at 8:54 AM, David Thomas  wrote:

> So if I do a spark action, say, collect, will I be able to see the result
> on my local console? Or would it be only available only on the cluster
> master?
>
>
> On Mon, Feb 17, 2014 at 9:50 AM, purav aggarwal <
> puravaggarwal...@gmail.com> wrote:
>
>> Your local machine simply submits your job (in the form of jar) to the
>> cluster.
>> The master node is where the SparkContext object is created, a DAG of
>> your job is formed and tasks (stages) are assigned to different workers -
>> which are not aware of anything but computation of task being assigned.
>>
>>
>> On Mon, Feb 17, 2014 at 10:07 PM, David Thomas wrote:
>>
>>> Where is the SparkContext object created then? On my local machine or on
>>> the master node in the cluster?
>>>
>>>
>>> On Mon, Feb 17, 2014 at 4:17 AM, Nhan Vu Lam Chi wrote:
>>>
 Your local app will be called "driver program", which creates jobs and
 submits them to the cluster for running.


 On Mon, Feb 17, 2014 at 9:19 AM, David Thomas wrote:

> From 
> docs
> :
>
>
> *Connecting an Application to the ClusterTo run an application on the
> Spark cluster, simply pass the spark://IP:PORT URL of the master as to the
> SparkContext constructor.*
>
> Could someone enlighten me on what happens if I run the app, from say,
> Eclipse on my local machine, but use the url of the master node which is 
> on
> cloud. What role does my local JVM play then?
>


>>>
>>
>


Re: working on a closed networkworking on a closed network - any recomendations

2014-02-09 Thread Christopher Nguyen
Eran, you could try what Patrick suggested, in detail: 1. Do a full build
on a connected laptop, 2. Copy ~/.m2 and .ivy2 over, 3. Do mvn -o or sbt
"set offline:=true" command; if that meets your needs.

Sent while mobile. Pls excuse typos etc.
On Feb 9, 2014 12:58 PM, "Patrick Wendell"  wrote:

> In general it's hard to do builds on machines that don't access the
> internet.
>
> The most common thing in environments where building is happening
> regularly is to set up and internal repositories (such as a nexus
> repository) where dependencies can be added to that repository
> manually. If it's just a one time build, I'd recommend building on a
> machine that has internet access and then move the binaries.
>
> This is a fairly general issue in modern build systems. I'd google
> around for instructions in installing local repositories if you want
> to go that route.
>
> On Sat, Feb 8, 2014 at 10:44 PM, goi cto  wrote:
> > Hi,
> >
> > With all these new tools that are automatically downloading latest builds
> > from the web. what is the best approach for developing on a closed (not
> > connected) network?
> >
> >
> > --
> > Eran | CTO
>


Re: RDD[URI]

2014-01-30 Thread Christopher Nguyen
Philip, I guess the key problem statement is the "large collection of"
part? If so this may be helpful, at the HDFS level:
http://blog.cloudera.com/blog/2009/02/the-small-files-problem/.

Otherwise you can always start with an RDD[fileUri] and go from there to an
RDD[(fileUri, read_contents)].

Sent while mobile. Pls excuse typos etc.
On Jan 30, 2014 9:13 AM, "尹绪森"  wrote:

> I am also interested in this. My solution now is making a file to a line
> of string, i.e. deleting all '\n', then adding filename as the head of line
> with a space.
>
> [filename] [space] [content]
>
> Anyone have better ideas ?
> 2014-1-31 AM12:18于 "Philip Ogren" 写道:
>
>> In my Spark programming thus far my unit of work has been a single row
>> from an hdfs file by creating an RDD[Array[String]] with something like:
>>
>> spark.textFile(path).map(_.split("\t"))
>>
>> Now, I'd like to do some work over a large collection of files in which
>> the unit of work is a single file (rather than a row from a file.)  Does
>> Spark anticipate users creating an RDD[URI] or RDD[File] or some such and
>> supporting actions and transformations that one might want to do on such an
>> RDD?  Any advice and/or code snippets would be appreciated!
>>
>> Thanks,
>> Philip
>>
>


Re: Stream RDD to local disk

2014-01-30 Thread Christopher Nguyen
Andrew, couldn't you do in the Scala code:

  scala.sys.process.Process("hadoop fs -copyToLocal ...")!

or is that still considered a second step?

"hadoop fs" is almost certainly going to be better at copying these files
than some memory-to-disk-to-memory serdes within Spark.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Jan 30, 2014 at 2:21 AM, Andrew Ash  wrote:

> Hi Spark users,
>
> I'm often using Spark for ETL type tasks, where the input is a large file
> on-disk and the output is another large file on-disk.  I've loaded
> everything into HDFS, but still need to produce files out on the other side.
>
> Right now I produce these processed files in a 2-step process:
>
> 1) in a single spark job, read from HDFS location A, process, and write to
> HDFS location B
> 2) run hadoop fs -cat hdfs:///path/to/* > /path/tomyfile to get it onto
> the local disk.
>
> It would be great to get this down to a 1-step process.
>
> If I run .saveAsTextFile("...") on my RDD, then the shards of the file are
> scattered onto the local disk across the cluster.  But if I .collect() on
> the driver and then save to disk using normal Scala disk IO utilities, I'll
> certainly OOM the driver.
>
> *So the question*: is there a way to get an iterator for an RDD that I
> can scan through the contents on the driver and flush to disk?
>
> I found the RDD.iterator() method but it looks to be intended for use by
> RDD subclasses not end users (requires a Partition and TaskContext
> parameter).  The .foreach() method executes on each worker also, rather
> than on the driver, so would also scatter files across the cluster if I
> saved from there.
>
> Any suggestions?
>
> Thanks!
> Andrew
>


Re: Please Help: Amplab Benchmark Performance

2014-01-29 Thread Christopher Nguyen
Hi Chen, it's certainly correct to say it is hard to make an apple-to-apple
comparison in terms of being able to assume that there is an
implementation-equivalent for any given Shark query, in "Spark only".

That said, I think the results of your comparisons could still be a
valuable reference. There are scenarios where perhaps someone wants to
consider the trade-offs between implementing some ETL operation with Shark
or with only Spark. Some sense of performance/cost difference would be
helpful in making that decision.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Wed, Jan 29, 2014 at 11:10 PM, Chen Jin  wrote:

> Hi Christopher,
>
> Thanks a lot for taking time to explain some details under Shark's
> hood. It is probably very hard to make an apple-to-apple comparison
> between Shark and Spark since they might be suitable for different
> types of tasks. From what you have explained, is it OK to think Shark
> is better off for SQL-like tasks, while Spark is more for iterative
> machine learning algorithms?
>
> Cheers,
>
> -chen
>
> On Wed, Jan 29, 2014 at 8:59 PM, Christopher Nguyen 
> wrote:
> > Chen, interesting comparisons you're trying to make. It would be great to
> > share this somewhere when you're done.
> >
> > Some suggestions of non-obvious things to consider:
> >
> > In general there are any number of differences between Shark and some
> > "equivalent" Spark implementation of the same query.
> >
> > Shark isn't necessarily what we may think of as "let's see which lines of
> > code accomplish the same thing in Spark". Its current implementation is
> > based on Hive which has its own query planning, optimization, and
> execution.
> > Shark's code has some of its own tricks. You can use "EXPLAIN" to see
> > Shark's execution plan, and compare to your Spark approach.
> >
> > Further Shark has its own memory storage format, e.g.,
> typed-column-oriented
> > RDD[TablePartition], that can make it more memory-efficient, and help
> > execute many column aggregation queries a lot faster than the
> row-oriented
> > RDD[Array[String]] you may be using.
> >
> > In short, Shark does a number of things that are smarter and more
> optimized
> > for SQL queries than a straightforward Spark RDD implementation of the
> same.
> > --
> > Christopher T. Nguyen
> > Co-founder & CEO, Adatao
> > linkedin.com/in/ctnguyen
> >
> >
> >
> > On Wed, Jan 29, 2014 at 8:10 PM, Chen Jin  wrote:
> >>
> >> Hi All,
> >>
> >> https://amplab.cs.berkeley.edu/benchmark/ has given a nice benchmark
> >> report. I am trying to reproduce the same set of queries in the
> >> spark-shell so that we can understand more about shark and spark and
> >> their performance on EC2.
> >>
> >> As for the Aggregation Query when X=8,  Shark-disk takes 210 seconds
> >> and Shark-mem takes 111 seconds. However, when I materialize the
> >> results to the disk, spark-shell takes more than 5 minutes
> >> (reduceByKey is used in the shell for aggregation) . Further, if I
> >> cache uservisits RDD, since the dataset is way too big, the
> >> performance deteriorates quite a lot.
> >>
> >> Can anybody shed some light on why there is a more than 2x difference
> >> between shark-disk and spark-shell-disk and how to cache data in spark
> >> correctly such that we can achieve comparable performance as
> >> shark-mem?
> >>
> >> Thank you very much,
> >>
> >> -chen
> >
> >
>


Re: Please Help: Amplab Benchmark Performance

2014-01-29 Thread Christopher Nguyen
Chen, interesting comparisons you're trying to make. It would be great to
share this somewhere when you're done.

Some suggestions of non-obvious things to consider:

In general there are any number of differences between Shark and some
"equivalent" Spark implementation of the same query.

Shark isn't necessarily what we may think of as "let's see which lines of
code accomplish the same thing in Spark". Its current implementation is
based on Hive which has its own query planning, optimization, and
execution. Shark's code has some of its own tricks. You can use "EXPLAIN"
to see Shark's execution plan, and compare to your Spark approach.

Further Shark has its own memory storage format, e.g.,
typed-column-oriented RDD[TablePartition], that can make it more
memory-efficient, and help execute many column aggregation queries a lot
faster than the row-oriented RDD[Array[String]] you may be using.

In short, Shark does a number of things that are smarter and more optimized
for SQL queries than a straightforward Spark RDD implementation of the same.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Wed, Jan 29, 2014 at 8:10 PM, Chen Jin  wrote:

> Hi All,
>
> https://amplab.cs.berkeley.edu/benchmark/ has given a nice benchmark
> report. I am trying to reproduce the same set of queries in the
> spark-shell so that we can understand more about shark and spark and
> their performance on EC2.
>
> As for the Aggregation Query when X=8,  Shark-disk takes 210 seconds
> and Shark-mem takes 111 seconds. However, when I materialize the
> results to the disk, spark-shell takes more than 5 minutes
> (reduceByKey is used in the shell for aggregation) . Further, if I
> cache uservisits RDD, since the dataset is way too big, the
> performance deteriorates quite a lot.
>
> Can anybody shed some light on why there is a more than 2x difference
> between shark-disk and spark-shell-disk and how to cache data in spark
> correctly such that we can achieve comparable performance as
> shark-mem?
>
> Thank you very much,
>
> -chen
>


Re: RDD and Partition

2014-01-28 Thread Christopher Nguyen
David, a PPRDD (mobile abbrev) is a new RDD that contains a subset of
partitions of the original RDD. Subsequent transformations/operations will
only see this subset. So yes, it may do what you need, or not, depending on
whether you still need to do something with the other partitions, as
implied by Mark. You can of course still refer to the original RDD, or
create yet another PPRDD containing that other subset, etc., just as you
can call sc.runJob() on different partitions each time.

I'm sure you can decide which pattern best fits your use case. Beware of
over optimizing leading to unnecessary complexity, though I've also learned
not to underestimate others' real needs based on their toy examples.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 3:41 PM, "David Thomas"  wrote:

> Thanks for those tips.
>
> I was looking into the docs for PartitionPruningRDD. It says, "A RDD used
> to prune RDD partitions/partitions so we can avoid launching tasks on all
> partitions". I did not understand this exactly and I couldn't find any
> sample code. Can we use this to apply a function only on certain partitions?
>
>
> On Tue, Jan 28, 2014 at 4:36 PM, Christopher Nguyen wrote:
>
>> Hence the qualification to determine whether it is necessary *and*
>> sufficient, depending on what David is trying to do overall :)
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Jan 28, 2014 2:10 PM, "Mark Hamstra"  wrote:
>>
>>> SparkContext#runJob is the basis of an RDD action, so the result of
>>> using runJob to call toUpperCase on the A-to-M partitions will be the
>>> uppercased strings materialized in the driver process, not a transformation
>>> of the original RDD.
>>>
>>>
>>> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen wrote:
>>>
>>>> David,
>>>>
>>>> map() would iterate row by row, forcing an if on each row.
>>>>
>>>> mapPartitions*() allows you to have a conditional on the whole
>>>> partition first, as Mark suggests. That should usually be sufficient.
>>>>
>>>> SparkContext.runJob() allows you to specify which partitions to run on,
>>>> if you're sure it's necessary and sufficient, and not over optimization.
>>>>
>>>> Sent while mobile. Pls excuse typos etc.
>>>> On Jan 28, 2014 1:30 PM, "Mark Hamstra" 
>>>> wrote:
>>>>
>>>>> If I'm understanding you correctly, there's lots of ways you could do
>>>>> that.  Here's one, continuing from the previous example:
>>>>>
>>>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>>>
>>>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>>>
>>>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>>>> itr.map(_.toUpperCase) else itr }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas wrote:
>>>>>
>>>>>> Thank you! That helps.
>>>>>>
>>>>>> A follow up question on this. How can I apply a function only on a
>>>>>> subset of this RDD. Lets say, I need all strings starting in the range 
>>>>>> 'A'
>>>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that 
>>>>>> possible
>>>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <
>>>>>> m...@clearstorydata.com> wrote:
>>>>>>
>>>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>>>
>>>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>>>
>>>>>>> sca

Re: RDD and Partition

2014-01-28 Thread Christopher Nguyen
Hence the qualification to determine whether it is necessary *and*
sufficient, depending on what David is trying to do overall :)

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 2:10 PM, "Mark Hamstra"  wrote:

> SparkContext#runJob is the basis of an RDD action, so the result of using
> runJob to call toUpperCase on the A-to-M partitions will be the uppercased
> strings materialized in the driver process, not a transformation of the
> original RDD.
>
>
> On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen wrote:
>
>> David,
>>
>> map() would iterate row by row, forcing an if on each row.
>>
>> mapPartitions*() allows you to have a conditional on the whole partition
>> first, as Mark suggests. That should usually be sufficient.
>>
>> SparkContext.runJob() allows you to specify which partitions to run on,
>> if you're sure it's necessary and sufficient, and not over optimization.
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Jan 28, 2014 1:30 PM, "Mark Hamstra"  wrote:
>>
>>> If I'm understanding you correctly, there's lots of ways you could do
>>> that.  Here's one, continuing from the previous example:
>>>
>>> // rdd26: RDD[String] split by first letter into 26 partitions
>>>
>>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>>
>>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>>> itr.map(_.toUpperCase) else itr }
>>>
>>>
>>>
>>>
>>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas wrote:
>>>
>>>> Thank you! That helps.
>>>>
>>>> A follow up question on this. How can I apply a function only on a
>>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>>> without running an 'if' condition on all the partitions in the cluster?
>>>>
>>>>
>>>>
>>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra 
>>>> wrote:
>>>>
>>>>> scala> import org.apache.spark.RangePartitioner
>>>>>
>>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>>
>>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>>>> at :15
>>>>>
>>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>>> res0)).values
>>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>>> :18
>>>>>
>>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>>>> s))).collect.foreach(println)
>>>>>
>>>>> (0,apple)
>>>>> (1,Ball)
>>>>> (2,cat)
>>>>> (3,dog)
>>>>> (4,Elephant)
>>>>> (5,fox)
>>>>> (6,gas)
>>>>> (7,horse)
>>>>> (8,index)
>>>>> (9,jet)
>>>>> (10,kitsch)
>>>>> (11,long)
>>>>> (12,moon)
>>>>> (13,Neptune)
>>>>> (14,ooze)
>>>>> (15,Pen)
>>>>> (16,quiet)
>>>>> (17,rose)
>>>>> (18,sun)
>>>>> (19,talk)
>>>>> (20,umbrella)
>>>>> (21,voice)
>>>>> (22,Walrus)
>>>>> (23,xeon)
>>>>> (24,Yam)
>>>>> (25,zebra)
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>>> nick.pentre...@gmail.com> wrote:
>>>>>
>>>>>> If you do something like:
>>>>>>
>>>>>> rdd.map{ str => (str.take(1), str) }
>>>>>>
>>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>>> character of the string. Now when you perform an operation that uses
>>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>>> receiving all the strings with A, the 2nd all the strings with B etc. 
>>>>>> Note
>>>>>> that you may not be able to enforce that each *machine* gets a
>>>>>> different letter, but in most cases that doesn't particularly matter as
>>>>>> long as you get "all values for a given key go to the same reducer"
>>>>>> behaviour.
>>>>>>
>>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>>> assistance.
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas wrote:
>>>>>>
>>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>>> starting with A gets collected on machine1, B on machine2 and so on.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Re: RDD and Partition

2014-01-28 Thread Christopher Nguyen
David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition
first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if
you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 1:30 PM, "Mark Hamstra"  wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
>
> // rdd26: RDD[String] split by first letter into 26 partitions
>
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> itr.map(_.toUpperCase) else itr }
>
>
>
>
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas wrote:
>
>> Thank you! That helps.
>>
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>>
>>
>>
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra wrote:
>>
>>> scala> import org.apache.spark.RangePartitioner
>>>
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at :15
>>>
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> :18
>>>
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>> s))).collect.foreach(println)
>>>
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>>
>>>
>>>
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 If you do something like:

 rdd.map{ str => (str.take(1), str) }

 you will have an RDD[(String, String)] where the key is the first
 character of the string. Now when you perform an operation that uses
 partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
 receiving all the strings with A, the 2nd all the strings with B etc. Note
 that you may not be able to enforce that each *machine* gets a
 different letter, but in most cases that doesn't particularly matter as
 long as you get "all values for a given key go to the same reducer"
 behaviour.

 Perhaps if you expand on your use case we can provide more detailed
 assistance.


 On Tue, Jan 28, 2014 at 9:35 PM, David Thomas wrote:

> Lets say I have an RDD of Strings and there are 26 machines in the
> cluster. How can I repartition the RDD in such a way that all strings
> starting with A gets collected on machine1, B on machine2 and so on.
>
>

>>>
>>
>


RE: Can I share the RDD between multiprocess

2014-01-25 Thread Christopher Nguyen
Kapil, that's right, your #2 is the pattern I was referring to. Of course
it could be Tomcat or something even lighter weight as long as you define
some suitable client/server protocol.

Sent while mobile. Pls excuse typos etc.
On Jan 25, 2014 6:03 AM, "Kapil Malik"  wrote:

>  Hi Christopher,
>
>
>
> “make a "server" out of that JVM, and serve up (via HTTP/THRIFT, etc.)
> some kind of reference to those RDDs to multiple clients of that server”
>
>
>
> Can you kindly hint at any starting points regarding your suggestion?
>
> In my understanding, SparkContext constructor creates an Akka actor system
> and starts a jetty UI server. So can we somehow use / tweak the same to
> serve to multiple clients? Or can we simply construct a spark context
> inside a Java server (like Tomcat) ?
>
>
>
> Regards,
>
>
>
> Kapil Malik | kma...@adobe.com | 33430 / 8800836581
>
>
>
> *From:* Christopher Nguyen [mailto:c...@adatao.com]
> *Sent:* 25 January 2014 12:00
> *To:* user@spark.incubator.apache.org
> *Subject:* Re: Can I share the RDD between multiprocess
>
>
>
> D.Y., it depends on what you mean by "multiprocess".
>
>
>
> RDD lifecycles are currently limited to a single SparkContext. So to
> "share" RDDs you need to somehow access the same SparkContext.
>
>
>
> This means one way to share RDDs is to make sure your accessors are in the
> same JVM that started the SparkContext.
>
>
>
> Another is to make a "server" out of that JVM, and serve up (via
> HTTP/THRIFT, etc.) some kind of reference to those RDDs to multiple clients
> of that server, even though there is only one SparkContext (held by the
> server). We have built a server product using this pattern so I know it can
> work well.
>
>
>   --
>
> Christopher T. Nguyen
>
> Co-founder & CEO, Adatao <http://adatao.com>
>
> linkedin.com/in/ctnguyen
>
>
>
>
>
> On Fri, Jan 24, 2014 at 6:06 PM, D.Y Feng  wrote:
>
> How can I share the RDD between multiprocess?
>
>
> --
>
>
> DY.Feng(叶毅锋)
> yyfeng88625@twitter
> Department of Applied Mathematics
> Guangzhou University,China
> dyf...@stu.gzhu.edu.cn
>
>
>
>


Re: Can I share the RDD between multiprocess

2014-01-24 Thread Christopher Nguyen
D.Y., it depends on what you mean by "multiprocess".

RDD lifecycles are currently limited to a single SparkContext. So to
"share" RDDs you need to somehow access the same SparkContext.

This means one way to share RDDs is to make sure your accessors are in the
same JVM that started the SparkContext.

Another is to make a "server" out of that JVM, and serve up (via
HTTP/THRIFT, etc.) some kind of reference to those RDDs to multiple clients
of that server, even though there is only one SparkContext (held by the
server). We have built a server product using this pattern so I know it can
work well.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Jan 24, 2014 at 6:06 PM, D.Y Feng  wrote:

> How can I share the RDD between multiprocess?
>
> --
>
>
> DY.Feng(叶毅锋)
> yyfeng88625@twitter
> Department of Applied Mathematics
> Guangzhou University,China
> dyf...@stu.gzhu.edu.cn
>
>


Re: Forcing RDD computation with something else than count() ?

2014-01-22 Thread Christopher Nguyen
Guillaume, this is RDD.count()

  /**

   * Return the number of elements in the RDD.

   */

  def count(): Long = {

sc.runJob(this, (iter: Iterator[T]) => {

  // Use a while loop to count the number of elements rather than
iter.size because

  // iter.size uses a for loop, which is slightly slower in current
version of Scala.

  var result = 0L

  while (iter.hasNext) {

result += 1L

iter.next()

  }

  result

}).sum

  }


so if you want something cheaper you could try


sc.runJob(rdd, (iter: Iterator[_]) => {})

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Wed, Jan 22, 2014 at 12:09 AM, Reynold Xin  wrote:

> You can also do
>
> rdd.foreach(a => Unit)
>
> I actually suspect count is even cheaper than this.
>
>
>
> On Tue, Jan 21, 2014 at 5:05 AM, Guillaume Pitel <
> guillaume.pi...@exensa.com> wrote:
>
>>  Thanks. So you mean that first() trigger the computation of the WHOLE
>> RDD ? That does not sound right, I thought it was lazy.
>>
>> Guillaume
>>
>> Hi,
>> You can call less expensive operations like first or  take to trigger the
>> computation.
>>
>>
>>
>>
>> --
>>[image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80
>>
>> eXenSa S.A.S. 
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<>

Re: Consistency between RDD's and Native File System

2014-01-17 Thread Christopher Nguyen
Sai, to be sure, what Mark said regarding lineage and recomputation is
exactly correct, so if it matters in your use case, you shouldn't ignore
this behavior, even as a side effect.

It just isn't what I think you were expecting in terms of RDD guarantees,
e.g., somehow there is a signal sent to your driver or workers that the
"original source" has changed. Further, there are no guarantees that Spark
hasn't decided to checkpoint the lineage somewhere and is no longer going
back to the "original source" to pick up the latest data. The recomputation
(read "journaling") design goal is reliability, not "data refresh".

Hope that is clear. I do sympathize with a possible reading of your design
goal; we are working on perhaps a similar design goal where streaming data
deltas are automatically reflected into a data structure on which the user
has a single reference (*)

(*) yep this is based on DStream/TD's work and will be available soon.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Thu, Jan 16, 2014 at 9:33 PM, Christopher Nguyen  wrote:

> Mark, that's precisely why I brought up lineage, in order to say I didn't
> want to muddy the issue there :)
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Thu, Jan 16, 2014 at 9:09 PM, Mark Hamstra wrote:
>
>> I don't agree entirely, Christopher.  Without persisting or checkpointing
>> RDDs, re-evaluation of the lineage will pick up source changes.  I'm not
>> saying that working this way is a good idea (in fact, it's generally not),
>> but you can do things like this:
>>
>> 1) Create file silliness.txt containing:
>>
>> one line
>> two line
>> red line
>> blue line
>>
>> 2) Fire up spark-shell and do this:
>>
>> scala> val lines = sc.textFile("silliness.txt")
>> scala> println(lines.collect.mkString(", "))
>> .
>> .
>> .
>> one line, two line, red line, blue line
>>
>> 3) Edit silliness.txt so that it is now:
>>
>> and now
>> for something
>> completely
>> different
>>
>> 4) Continue on with spark-shell:
>>
>> scala> println(lines.collect.mkString(", "))
>> .
>> .
>> .
>> and now, for something, completely, different
>>
>>
>> On Thu, Jan 16, 2014 at 7:53 PM, Christopher Nguyen wrote:
>>
>>> Sai, from your question, I infer that you have an interpretation that
>>> RDDs are somehow an in-memory/cached copy of the underlying data
>>> source---and so there is some expectation that there is some
>>> synchronization model between the two.
>>>
>>> That would not be what the RDD model is. RDDs are first-class,
>>> stand-alone (distributed, immutable) datasets. Once created, an RDD exists
>>> on its own and isn't expected to somehow automatically realize that some
>>> underlying source has changed. (There is the concept of lineage or
>>> provenance for recomputation of RDDs, but that's orthogonal to this
>>> interpretation so I won't muddy the issue here).
>>>
>>> If you're looking for a mutable data table model, we will soon be
>>> releasing to open source something called Distributed DataFrame (DDF, based
>>> on R's data.frame) on top of RDDs that allows you to, among other useful
>>> things, load a dataset, perform transformations on it, and save it back,
>>> all the while holding on to a single DDF reference.
>>>
>>> --
>>> Christopher T. Nguyen
>>> Co-founder & CEO, Adatao <http://adatao.com>
>>> linkedin.com/in/ctnguyen
>>>
>>>
>>>
>>> On Thu, Jan 16, 2014 at 7:33 PM, Sai Prasanna 
>>> wrote:
>>>
>>>> Thanks Patrick, but i think i dint put my question clearly...
>>>>
>>>> The question is Say in the native file system or HDFS, i have data
>>>> describing students who passed, failed and for whom results are with-held
>>>> for some reason.
>>>> *Time T1:*
>>>> x - Pass
>>>> y - Fail
>>>> z - With-held.
>>>>
>>>> *Time T2:*
>>>> So i create an RDD1 reflecting this data, run a query to find how many
>>>> candidates have passed.
>>>> RESULT = 1. RDD1 is cached or its stored in the file system depending
>>>> on the availability of space.
>>>>
>>>> *T

Re: Consistency between RDD's and Native File System

2014-01-16 Thread Christopher Nguyen
Mark, that's precisely why I brought up lineage, in order to say I didn't
want to muddy the issue there :)

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Thu, Jan 16, 2014 at 9:09 PM, Mark Hamstra wrote:

> I don't agree entirely, Christopher.  Without persisting or checkpointing
> RDDs, re-evaluation of the lineage will pick up source changes.  I'm not
> saying that working this way is a good idea (in fact, it's generally not),
> but you can do things like this:
>
> 1) Create file silliness.txt containing:
>
> one line
> two line
> red line
> blue line
>
> 2) Fire up spark-shell and do this:
>
> scala> val lines = sc.textFile("silliness.txt")
> scala> println(lines.collect.mkString(", "))
> .
> .
> .
> one line, two line, red line, blue line
>
> 3) Edit silliness.txt so that it is now:
>
> and now
> for something
> completely
> different
>
> 4) Continue on with spark-shell:
>
> scala> println(lines.collect.mkString(", "))
> .
> .
> .
> and now, for something, completely, different
>
>
> On Thu, Jan 16, 2014 at 7:53 PM, Christopher Nguyen wrote:
>
>> Sai, from your question, I infer that you have an interpretation that
>> RDDs are somehow an in-memory/cached copy of the underlying data
>> source---and so there is some expectation that there is some
>> synchronization model between the two.
>>
>> That would not be what the RDD model is. RDDs are first-class,
>> stand-alone (distributed, immutable) datasets. Once created, an RDD exists
>> on its own and isn't expected to somehow automatically realize that some
>> underlying source has changed. (There is the concept of lineage or
>> provenance for recomputation of RDDs, but that's orthogonal to this
>> interpretation so I won't muddy the issue here).
>>
>> If you're looking for a mutable data table model, we will soon be
>> releasing to open source something called Distributed DataFrame (DDF, based
>> on R's data.frame) on top of RDDs that allows you to, among other useful
>> things, load a dataset, perform transformations on it, and save it back,
>> all the while holding on to a single DDF reference.
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Thu, Jan 16, 2014 at 7:33 PM, Sai Prasanna wrote:
>>
>>> Thanks Patrick, but i think i dint put my question clearly...
>>>
>>> The question is Say in the native file system or HDFS, i have data
>>> describing students who passed, failed and for whom results are with-held
>>> for some reason.
>>> *Time T1:*
>>> x - Pass
>>> y - Fail
>>> z - With-held.
>>>
>>> *Time T2:*
>>> So i create an RDD1 reflecting this data, run a query to find how many
>>> candidates have passed.
>>> RESULT = 1. RDD1 is cached or its stored in the file system depending on
>>> the availability of space.
>>>
>>> *Time T3:*
>>> In the native file system, now that results of the z are out and
>>> declared passed. So HDFS will need to be modified.
>>> x - Pass
>>> y - Fail
>>> z - Pass.
>>> Say now i get the RDD1 that is there in file system or cached copy and
>>> run the same query, i get the RESULT = 1, but ideally RESULT is 2.
>>>
>>> So i was asking is there a way SPARK hints that RDD1 is no longer
>>> consistent with the file system or that its upto the programmer to recreate
>>> the RDD1 if the block from where RDD was created was changed at a later
>>> point of time.
>>> [T1 < T2 < T3 < T4]
>>>
>>> Thanks in advance...
>>>
>>>
>>> On Fri, Jan 17, 2014 at 1:42 AM, Patrick Wendell wrote:
>>>
>>>> RDD's are immutable, so there isn't really such a thing as modifying a
>>>> block in-place inside of an RDD. As a result, this particular
>>>> consistency issue doesn't come up in Spark.
>>>>
>>>> - Patrick
>>>>
>>>> On Thu, Jan 16, 2014 at 1:42 AM, SaiPrasanna 
>>>> wrote:
>>>> > Hello, i am a novice to SPARK
>>>> >
>>>> > Say that we have created an RDD1 from native file system/HDFS and
>>>> done some
>>>> > transformations and actions and that resulted in an RDD2. Lets assume
>>>> RDD1
>>>> > and RDD2 are persisted, cached in-memory. If the block from where
>>>> RDD1 was
>>>> > created was modified at time T1 and RDD1/RDD2 is accessed later at T2
>>>> > T1,
>>>> > is there a way either SPARK ensures consistency or it is upto the
>>>> programmer
>>>> > to make it explicit?
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Consistency-between-RDD-s-and-Native-File-System-tp583.html
>>>> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>>
>>>
>>>
>>>
>>> --
>>> *Sai Prasanna. AN*
>>> *II M.Tech (CS), SSSIHL*
>>>
>>>
>>> * Entire water in the ocean can never sink a ship, Unless it gets
>>> inside. All the pressures of life can never hurt you, Unless you let them
>>> in.*
>>>
>>
>>
>


Re: Consistency between RDD's and Native File System

2014-01-16 Thread Christopher Nguyen
Sai, from your question, I infer that you have an interpretation that RDDs
are somehow an in-memory/cached copy of the underlying data source---and so
there is some expectation that there is some synchronization model between
the two.

That would not be what the RDD model is. RDDs are first-class, stand-alone
(distributed, immutable) datasets. Once created, an RDD exists on its own
and isn't expected to somehow automatically realize that some underlying
source has changed. (There is the concept of lineage or provenance for
recomputation of RDDs, but that's orthogonal to this interpretation so I
won't muddy the issue here).

If you're looking for a mutable data table model, we will soon be releasing
to open source something called Distributed DataFrame (DDF, based on R's
data.frame) on top of RDDs that allows you to, among other useful things,
load a dataset, perform transformations on it, and save it back, all the
while holding on to a single DDF reference.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Jan 16, 2014 at 7:33 PM, Sai Prasanna wrote:

> Thanks Patrick, but i think i dint put my question clearly...
>
> The question is Say in the native file system or HDFS, i have data
> describing students who passed, failed and for whom results are with-held
> for some reason.
> *Time T1:*
> x - Pass
> y - Fail
> z - With-held.
>
> *Time T2:*
> So i create an RDD1 reflecting this data, run a query to find how many
> candidates have passed.
> RESULT = 1. RDD1 is cached or its stored in the file system depending on
> the availability of space.
>
> *Time T3:*
> In the native file system, now that results of the z are out and declared
> passed. So HDFS will need to be modified.
> x - Pass
> y - Fail
> z - Pass.
> Say now i get the RDD1 that is there in file system or cached copy and run
> the same query, i get the RESULT = 1, but ideally RESULT is 2.
>
> So i was asking is there a way SPARK hints that RDD1 is no longer
> consistent with the file system or that its upto the programmer to recreate
> the RDD1 if the block from where RDD was created was changed at a later
> point of time.
> [T1 < T2 < T3 < T4]
>
> Thanks in advance...
>
>
> On Fri, Jan 17, 2014 at 1:42 AM, Patrick Wendell wrote:
>
>> RDD's are immutable, so there isn't really such a thing as modifying a
>> block in-place inside of an RDD. As a result, this particular
>> consistency issue doesn't come up in Spark.
>>
>> - Patrick
>>
>> On Thu, Jan 16, 2014 at 1:42 AM, SaiPrasanna 
>> wrote:
>> > Hello, i am a novice to SPARK
>> >
>> > Say that we have created an RDD1 from native file system/HDFS and done
>> some
>> > transformations and actions and that resulted in an RDD2. Lets assume
>> RDD1
>> > and RDD2 are persisted, cached in-memory. If the block from where RDD1
>> was
>> > created was modified at time T1 and RDD1/RDD2 is accessed later at T2 >
>> T1,
>> > is there a way either SPARK ensures consistency or it is upto the
>> programmer
>> > to make it explicit?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Consistency-between-RDD-s-and-Native-File-System-tp583.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
> *Sai Prasanna. AN*
> *II M.Tech (CS), SSSIHL*
>
>
> * Entire water in the ocean can never sink a ship, Unless it gets inside.
> All the pressures of life can never hurt you, Unless you let them in.*
>


Re: WARN ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-01-14 Thread Christopher Nguyen
Aureliano, this sort of jar-hell is something we have to deal with, whether
Spark or elsewhere. How would you propose we fix this with Spark? Do you
mean that Spark's own scaffolding caused you to pull in both Protobuf 2.4
and 2.5? Or do you mean the error message should have been more helpful?

Sent while mobile. Pls excuse typos etc.
On Jan 14, 2014 9:27 AM, "Aureliano Buendia"  wrote:

>
>
>
> On Tue, Jan 14, 2014 at 5:07 PM, Archit Thakur 
> wrote:
>
>> How much memory you are setting for exector JVM.
>> This problem comes when either there is a communication problem between
>> Master/Worker. or you do not have any memory left. Eg, you specified 75G
>> for your executor and your machine has a memory of 70G.
>>
>
> This was not a memory problem. This could be considered a spark bug.
>
> Here is what happened: My app was using protobuf 2.5, while spark has a
> protobuf 2.4 dependency, and classpath was like this:
>
> my_app.jar:spark_assembly.jar:..
>
> This caused spark, (or a dependency, probably hadoop) to use protobuf 2.5,
> giving that misleading 'ensure that workers are registered and have
> sufficient memory' error.
>
> Regenerating this error is easy, just download protobuf 2.5 and put it at
> the beginning of your classpath for any app, you should get that error.
>
>
>>
>>
>> On Thu, Jan 9, 2014 at 11:27 PM, Aureliano Buendia 
>> wrote:
>>
>>> The java command worked when I set SPARK_HOME and SPARK_EXAMPLES_JAR
>>> values.
>>>
>>> There are many issues regarding the Initial job has not accepted any
>>> resources... error though:
>>>
>>>- When I put my assembly jar 
>>> *before*spark-assembly_2.9.3-0.8.1-incubating-hadoop1.0.4.jar, this error 
>>> happens.
>>>Moving my jar after the spark-assembly it works fine.
>>>In my case, I need to put my jar before spark-assembly, as my jar
>>>uses protobuf 2.5 and spark-assembly comes with protobuf 2.4.
>>>- Sometimes when this error happens the whole cluster server must be
>>>restarted, or even run-example script wouldn't work. It took me a while 
>>> to
>>>find this out, making debugging very time consuming.
>>>- The error message is absolutely irrelevant.
>>>
>>> I guess the problem should be somewhere with the spark context jar
>>> delivery part.
>>>
>>>
>>> On Thu, Jan 9, 2014 at 4:17 PM, Aureliano Buendia 
>>> wrote:
>>>



 On Thu, Jan 9, 2014 at 5:01 AM, Matei Zaharia 
 wrote:

> Just follow the docs at
> http://spark.incubator.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scalafor
>  how to run an application. Spark is designed so that you can simply run
> your application *without* any scripts whatsoever, and submit your JAR to
> the SparkContext constructor, which will distribute it. You can launch 
> your
> application with “scala”, “java”, or whatever tool you’d prefer.
>

 I'm afraid what you said about 'simply run your application *without*
 any scripts whatsoever' does not apply to spark at the moment, and it
 simply does not work.

 Try the simple Pi calculation this on a standard spark-ec2 instance:

 java -cp
 /root/spark/examples/target/spark-examples_2.9.3-0.8.1-incubating.jar:/root/spark/assembltarget/scala-2.9.3/spark-assembly_2.9.3-0.8.1-incubating-hadoop1.0.4.jar
 org.apache.spark.examples.SparkPi `cat spark-ec2/cluster-url`

 And you'll get the error:

 WARN cluster.ClusterScheduler: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory

 While the script way works:

 spark/run-example org.apache.spark.examples.SparkPi `cat
 spark-ec2/cluster-url`

 What am I missing in the above java command?


>
> Matei
>
> On Jan 8, 2014, at 8:26 PM, Aureliano Buendia 
> wrote:
>
>
>
>
> On Thu, Jan 9, 2014 at 4:11 AM, Matei Zaharia  > wrote:
>
>> Oh, you shouldn’t use spark-class for your own classes. Just build
>> your job separately and submit it by running it with “java” and creating 
>> a
>> SparkContext in it. spark-class is designed to run classes internal to 
>> the
>> Spark project.
>>
>
> Really? Apparently Eugen runs his jobs by:
>
>
> $SPARK_HOME/spark-class SPARK_CLASSPATH=PathToYour.jar com.myproject.MyJob
>
> , as he instructed me 
> hereto
>  do this.
>
> I have to say while spark documentation is not sparse, it does not
> address enough, and as you can see the community is confused.
>
> Are the spark users supposed to create something like run-example for
> their own jobs?
>
>
>>
>> Matei
>>
>> On Jan 8, 2014, at 8:06 PM, Aureliano Buendia 
>> wrote:
>>
>>
>>
>>
>> On Thu, Jan 9, 2014 at 3:59 AM, Mat

Re: what paper is the L2 regularization based on?

2014-01-08 Thread Christopher Nguyen
Walrus, given the question, this may be a good place for you to start.
There's some good discussion there as well as links to papers.

http://www.quora.com/Machine-Learning/What-is-the-difference-between-L1-and-L2-regularization

Sent while mobile. Pls excuse typos etc.
On Jan 8, 2014 2:24 PM, "Walrus theCat"  wrote:

> Hi,
>
> Can someone point me to the paper that algorithm is based on?
>
> Thanks
>


Re: Is spark-env.sh supposed to be stateless?

2014-01-03 Thread Christopher Nguyen
How about this: https://github.com/apache/incubator-spark/pull/326

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Jan 2, 2014 at 11:07 PM, Matei Zaharia wrote:

> I agree that it would be good to do it only once, if you can find a nice
> way of doing so.
>
> Matei
>
> On Jan 3, 2014, at 1:33 AM, Andrew Ash  wrote:
>
> In my spark-env.sh I append to the SPARK_CLASSPATH variable rather than
> overriding it, because I want to support both adding a jar to all instances
> of a shell (in spark-env.sh) and adding a jar to a single shell instance 
> (SPARK_CLASSPATH=/path/to/my.jar
> /path/to/spark-shell)
>
> That looks like this:
>
> # spark-env.sh
> export SPARK_CLASSPATH+=":/path/to/hadoop-lzo.jar"
>
> However when my Master and workers run, they have duplicates of the
> SPARK_CLASSPATH jars.  There are 3 copies of hadoop-lzo on the classpath, 2
> of which are unnecessary.
>
> The resulting command line in ps looks like this:
> /path/to/java -cp
> :/path/to/hadoop-lzo.jar:/path/to/hadoop-lzo.jar:/path/to/hadoop-lzo.jar:[core
> spark jars] ... -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
> spark://my-host:7077
>
> I tracked it down and the problem is that spark-env.sh is sourced 3 times:
> in spark-daemon.sh, in compute-classpath.sh, and in spark-class.  Each of
> those adds to the SPARK_CLASSPATH until its contents are in triplicate.
>
> Are all of those calls necessary?  Is it possible to edit the daemon
> scripts to only call spark-env.sh once?
>
> FYI I'm starting the daemons with ./bin/start-master.sh and
> ./bin/start-slave.sh 1 $SPARK_URL
>
> Thanks,
> Andrew
>
>
>


Re: Not able to understand Exception.

2014-01-01 Thread Christopher Nguyen
Archit, this occurs in the ResultTask phase, triggered by the call to
sortByKey. Prior to this, your RDD would have been serialized for, e.g.,
shuffling around.

So it looks like Kryo wasn't able to deserialize some part of the RDD for
some reason, possible due to formatting incompatibility. Did you say you
wrote your own serializers?

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Wed, Jan 1, 2014 at 8:22 AM, Archit Thakur wrote:

> I have recently moved to Kryo for serialization to get better performance.
> Have written some of the serializers for my custom DS.
> What could below exception be about: (I dont see any of my code line in
> the stack trace)
>
> java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.get(Unknown Source)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:26)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at
> org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:23)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
> at
> org.apache.spark.util.CompletionIterator.toBuffer(CompletionIterator.scala:23)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
> at
> org.apache.spark.util.CompletionIterator.toArray(CompletionIterator.scala:23)
> at
> org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:44)
> at
> org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:43)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at java.lang.Thread.run(Unknown Source)
>
> Any ideas? or Suggestions would help.
>
> Thanks,
> Archit.
>


Re: How to map each line to (line number, line)?

2013-12-31 Thread Christopher Nguyen
It's a reasonable ask (row indices) in some interactive use cases we've
come across. We're working on providing support for this at a higher level
of abstraction.

Sent while mobile. Pls excuse typos etc.
On Dec 31, 2013 11:34 AM, "Aureliano Buendia"  wrote:

>
>
>
> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash  wrote:
>
>> Hi Aureliano,
>>
>> It's very easy to get lines into (start byte number, line) using Hadoop's
>> TextInputFormat.  See how SparkContext's textFile() method does it here:
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>
>
> Thanks for pointing this. while start byte number provides a globally
> unique index for each line, my application needs the line number.
>
> It seems best to go with the source file containing the line numbers,
> instead of recreating this is in hadoop/spark.
>
>
>>
>> What is the use case where you must have the global line number in the
>> file, vs a global ordered unique identifier (my suggestion above) or a
>> partition-local line number (discussed extensively below)?
>>
>> Also if you have any way to do this in plain Hadoop, Spark can use that
>> as well.
>>
>> The fundamental difficulty is that knowing global line number breaks the
>> assumption Hadoop makes everywhere that each record is independent of all
>> the others.  Maybe you should consider adding a line number to the
>> beginning of every line on import time into HDFS instead of doing it
>> afterwards in Spark.
>>
>> Cheers!
>> Andrew
>>
>>
>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia > > wrote:
>>
>>> I assumed that number of lines in each partition, except the last
>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>> makes sense.
>>>
>>> All of these methods are inefficient. Spark needs to support this
>>> feature at lower level, as Michael suggested.
>>>
>>>
>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>> guillaume.pi...@exensa.com> wrote:
>>>
  You're assuming each partition has the same line count. I don't think
 it's true (actually, I'm almost certain it's false). And anyway your code
 also require two maps.

 In my code, the sorting as well as the other operations are performed
 on a very small dataset : one element per partition

 Guillaume




> Did you try the code I sent ? I think the sortBy is probably in the
> wrong direction, so change it with -i instead of i
>

  I'm confused why would need in memory sorting. We just use a loop
 like any other loops in spark. Why shouldn't this solve the problem?:

 val count = lines.count() // lines is the rdd
 val partitionLinesCount = count / rdd.partitions.length
 linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
   var i = pi * partitionLinesCount
   it.map {
 *line => (i, line)*
  i += 1
}
 }



 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

  eXenSa S.A.S. 
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

>>>
>>>
>>
>
<>

Re: Stateful RDD

2013-12-30 Thread Christopher Nguyen
Bao, to help clarify what TD is saying: Spark launches multiple workers on
multiple threads in parallel, running the same closure code in the same JVM
on the same machine, but operating on different rows of data.

Because of this parallelism, if that worker code weren't thread-safe for
some reason, you'd have a problem.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Mon, Dec 30, 2013 at 4:27 PM, Tathagata Das
wrote:

> No I did not mean that. What I meant was something more simple. Let's say
> the ScriptEngine maintains some internal state and the function
> ScriptEngine.eval(...) is not thread-safe. That is, calling
> ScriptEngine.eval simultaneously from multiple threads would cause race
> conditions in the internal state and eval() would give incorrect answers.
> That would be a problem if you use ScriptEngine in a map function, because
> multiple threads in a worker JVM may be running the map function
> simultaneously. Something you should be aware of when using static stateful
> objects within Spark.
>
> TD
>
>
> On Sun, Dec 29, 2013 at 7:32 PM, Bao  wrote:
>
>> Thanks guys, that's interesting. Though it looks like singleton object is
>> defined at driver, spark actually will serialize closure and send to
>> workers. The interesting thing is that ScriptEngine is NOT serializable,
>> but
>> till it hasn't been initialized spark can serialize the closure well. But
>> if
>> I force it initialize first then spark throws NotSerializeableException.
>>
>> Anyway, following Christopher's suggestion to avoid reference to outside
>> closure is better.
>>
>> TD, do you mean that Executors share the same SerializerInstance and there
>> is a case that more than 1 thread call the same closure instance?
>>
>> -Bao.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p97.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Stateful RDD

2013-12-27 Thread Christopher Nguyen
Bao, as described, your use case doesn't need to invoke anything like
custom RDDs or DStreams.

In a call like

val resultRdd = scripts.map(s => ScriptEngine.eval(s))

Spark will do its best to serialize/deserialize ScriptEngine to each of the
workers---if ScriptEngine is Serializable.

Now, if it makes no difference to you, consider instantiating ScriptEngine
within the closure itself, thus obviating the need for serdes of things
outside the closure.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Dec 27, 2013 at 7:56 PM, Bao  wrote:

> It looks like I need to use DStream instead...
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p85.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: multi-line elements

2013-12-24 Thread Christopher Nguyen
Phillip, if there are easily detectable line groups you might define your
own InputFormat. Alternatively you can consider using mapPartitions() to
get access to the entire data partition instead of row-at-a-time. You'd
still have to worry about what happens at the partition boundaries. A third
approach is indeed to pre-process with an appropriate mapper/reducer.

Sent while mobile. Pls excuse typos etc.
I have a file that consists of multi-line records.  Is it possible to read
in multi-line records with a method such as SparkContext.newAPIHadoopFile?
 Or do I need to pre-process the data so that all the data for one element
is in a single line?

Thanks,
Philip


Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Christopher Nguyen
Aureliano,

The idea is that you would only work on one large chunk at a time, the size
of which is done optimal trade-off between as big as the cluster can run
simultaneously, but small enough that you don't run into single-machine
memory limits, e.g., on the driver or reducers. It would almost certainly
not need to be the entire 1 billion columns.

On the sliding window, I completely missed the fact that the windows are
overlapping. But you could still partition the problem at one more layer,
that is, each partition would be, say, 100 windows rather than 1 as I've
pseudo-coded. So yes there would be some data duplication at the boundaries
of these window groups, but it is quite minimal and of little engineering
concern (here, about 1% at any given time).

Finally and perhaps orthogonally, it could be interesting for you to
examine what exactly is going on with the very-long-compute, as there may
be opportunities for parallel speed-ups there.

Sent while mobile. Pls excuse typos etc.
On Dec 20, 2013 2:56 PM, "Aureliano Buendia"  wrote:

>
>
>
> On Fri, Dec 20, 2013 at 10:34 PM, Christopher Nguyen wrote:
>
>> Aureliano, would something like this work? The red code is the only
>> place where you have to think parallel here.
>>
>>
>> while (thereIsStillDataToWorkOn) {
>>   bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
>> design variable
>>   bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
>> restructureIntoArraysOf5000Doubles(bigChunk)
>>   myRDD = sc
>> .parallelize(bigChunkAsArraysOf5000Doubles, 1)
>> .map(eachArrayOf5000Doubles =>
>> someVeryLongRunningTransformer(eachArrayOf5000Doubles))
>> .collect()
>> }
>>
>>
>> Next, pardon me if this is too basic, but in case it is helpful: this
>> code first runs on a single machine, called a Driver, which must have
>> access to the source data.
>>
>
> Thanks for the clear explanation.
>
>
>> When we call parallelize(), Spark handles all the partitioning of the
>> data into the available Workers, including serializing each data partition
>> to the Workers, and collecting the results back in one place.
>>
>
> This would create nearly 1 billion RDD's. Is that ok?
>
>
>> There is no data duplication other than the Worker's copy of the data
>> from the Driver.
>>
>
> Each out of boundary 50 column window shares 2*(49 + 48 + 47 + ... + 1)
> between the sliding windows on left and right. All of these columns are
> sent over the network many times. isn;t that duplication of data transfer?
>
>
>>
>> This indeed does not take advantage of all of the other available Spark
>> goodnesses that others have correctly pointed out on this thread, such as
>> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
>> partitions, etc. But it would be exactly the right thing to do if it best
>> fits your problem statement.
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia 
>> wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen wrote:
>>>
>>>> Aureliano, how would your production data be coming in and accessed?
>>>> It's possible that you can still think of that level as a serial operation
>>>> (outer loop, large chunks) first before worrying about parallelizing the
>>>> computation of the tiny chunks.
>>>>
>>>
>>> It's a batch processing of time series data. Perhaps a serial processing
>>> where each serial item is a set of parallel processes could be an option.
>>> Does spark have such option?
>>>
>>>
>>>>
>>>> And I'm reading that when you refer to "data duplication", you're
>>>> worried about that as a side-effect problem, not as a requirement, correct?
>>>>
>>>
>>> That's right. Data duplication is certainly not a requirement, we are
>>> not trying to avoid it, but if it's a side effect that leads to some
>>> considerable io overhead, it's not going to be good.
>>>
>>>
>>>> And if the former, I don't see that data duplication is a necessary
>>>> side effect. Unless I missed something in the thread, don't use broadcast.
>>>>
>>>
>>> I take it that overlapped partitions does not mean data duplication. I
>>> wasn't sure if partitions hold a 

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Christopher Nguyen
A couple of fixes inline.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 2:34 PM, Christopher Nguyen  wrote:

> Aureliano, would something like this work? The red code is the only place
> where you have to think parallel here.
>
>
> while (thereIsStillDataToWorkOn) {
>   bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
> design variable
>   bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
> restructureIntoArraysOf5000Doubles(bigChunk)
>   myRDD = sc
>

s/myRDD/myResult


> .parallelize(bigChunkAsArraysOf5000Doubles, 1)
>

That number of partitions shouldn't be 1, but some function of the size of
your cluster, or you can just let Spark decide.


> .map(eachArrayOf5000Doubles =>
> someVeryLongRunningTransformer(eachArrayOf5000Doubles))
> .collect()
>

collect() or reduce() etc., whatever is appropriate for your
transformation/operation.


> }
>
>
> Next, pardon me if this is too basic, but in case it is helpful: this code
> first runs on a single machine, called a Driver, which must have access to
> the source data. When we call parallelize(), Spark handles all the
> partitioning of the data into the available Workers, including serializing
> each data partition to the Workers, and collecting the results back in one
> place. There is no data duplication other than the Worker's copy of the
> data from the Driver.
>
> This indeed does not take advantage of all of the other available Spark
> goodnesses that others have correctly pointed out on this thread, such as
> broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
> partitions, etc. But it would be exactly the right thing to do if it best
> fits your problem statement.
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia 
> wrote:
>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen wrote:
>>
>>> Aureliano, how would your production data be coming in and accessed?
>>> It's possible that you can still think of that level as a serial operation
>>> (outer loop, large chunks) first before worrying about parallelizing the
>>> computation of the tiny chunks.
>>>
>>
>> It's a batch processing of time series data. Perhaps a serial processing
>> where each serial item is a set of parallel processes could be an option.
>> Does spark have such option?
>>
>>
>>>
>>> And I'm reading that when you refer to "data duplication", you're
>>> worried about that as a side-effect problem, not as a requirement, correct?
>>>
>>
>> That's right. Data duplication is certainly not a requirement, we are not
>> trying to avoid it, but if it's a side effect that leads to some
>> considerable io overhead, it's not going to be good.
>>
>>
>>> And if the former, I don't see that data duplication is a necessary side
>>> effect. Unless I missed something in the thread, don't use broadcast.
>>>
>>
>> I take it that overlapped partitions does not mean data duplication. I
>> wasn't sure if partitions hold a copy, or a reference.
>>
>>
>>>
>>> Put another way, I see the scale of this challenge as far more
>>> operational than logical (when squinted at from the right angle :)
>>>
>>>  --
>>> Christopher T. Nguyen
>>> Co-founder & CEO, Adatao <http://adatao.com>
>>> linkedin.com/in/ctnguyen
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia >> > wrote:
>>>
>>>> Also over thinking is appreciated in this problem, as my production
>>>> data is actually near 100 x 1000,000,000 and data duplication could get
>>>> messy with this.
>>>>
>>>> Sorry about the initial misinformation, I was thinking about my
>>>> development/test data.
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia <
>>>> buendia...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek wrote:
>>>>>
>>>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>>>> which would be a fraction of standard cluster.  But since overthinking 
>>>>>> is a

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Christopher Nguyen
Aureliano, would something like this work? The red code is the only place
where you have to think parallel here.


while (thereIsStillDataToWorkOn) {
  bigChunk: Array[Double] = readInTheNextNx100x50MatrixData() // N is a
design variable
  bigChunkAsArraysOf5000Doubles: Array[Array[Double]] =
restructureIntoArraysOf5000Doubles(bigChunk)
  myRDD = sc
.parallelize(bigChunkAsArraysOf5000Doubles, 1)
.map(eachArrayOf5000Doubles =>
someVeryLongRunningTransformer(eachArrayOf5000Doubles))
.collect()
}


Next, pardon me if this is too basic, but in case it is helpful: this code
first runs on a single machine, called a Driver, which must have access to
the source data. When we call parallelize(), Spark handles all the
partitioning of the data into the available Workers, including serializing
each data partition to the Workers, and collecting the results back in one
place. There is no data duplication other than the Worker's copy of the
data from the Driver.

This indeed does not take advantage of all of the other available Spark
goodnesses that others have correctly pointed out on this thread, such as
broadcasting, mapPartitions() vs map(), parallel data loading across HDFS
partitions, etc. But it would be exactly the right thing to do if it best
fits your problem statement.
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 2:01 PM, Aureliano Buendia wrote:

>
>
>
> On Fri, Dec 20, 2013 at 9:43 PM, Christopher Nguyen wrote:
>
>> Aureliano, how would your production data be coming in and accessed? It's
>> possible that you can still think of that level as a serial operation
>> (outer loop, large chunks) first before worrying about parallelizing the
>> computation of the tiny chunks.
>>
>
> It's a batch processing of time series data. Perhaps a serial processing
> where each serial item is a set of parallel processes could be an option.
> Does spark have such option?
>
>
>>
>> And I'm reading that when you refer to "data duplication", you're worried
>> about that as a side-effect problem, not as a requirement, correct?
>>
>
> That's right. Data duplication is certainly not a requirement, we are not
> trying to avoid it, but if it's a side effect that leads to some
> considerable io overhead, it's not going to be good.
>
>
>> And if the former, I don't see that data duplication is a necessary side
>> effect. Unless I missed something in the thread, don't use broadcast.
>>
>
> I take it that overlapped partitions does not mean data duplication. I
> wasn't sure if partitions hold a copy, or a reference.
>
>
>>
>> Put another way, I see the scale of this challenge as far more
>> operational than logical (when squinted at from the right angle :)
>>
>>  --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia 
>> wrote:
>>
>>> Also over thinking is appreciated in this problem, as my production data
>>> is actually near 100 x 1000,000,000 and data duplication could get messy
>>> with this.
>>>
>>> Sorry about the initial misinformation, I was thinking about my
>>> development/test data.
>>>
>>>
>>> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia >> > wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek wrote:
>>>>
>>>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>>>> which would be a fraction of standard cluster.  But since overthinking is 
>>>>> a
>>>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>>>> each window.  Now you only need to replicate data across the boundaries of
>>>>> each partition of windows, rather than each window.
>>>>>
>>>>
>>>> How can this be written in spark scala?
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen 
>>>>> wrote:
>>>>>
>>>>>> Are we over-thinking the problem here? Since the per-window compute
>>>>>> task is hugely expensive, stateless from window to window, and the 
>>>>>> original
>>>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>>>> distributing and schedul

Re: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered

2013-12-20 Thread Christopher Nguyen
MichaelY, this sort of thing where "it could be any of dozens of things"
can usually be resolved by asking someone share your screen with you for 5
minutes. It's far more productive than guessing over emails.

If @freeman is willing, you can send a private message to him to set that
up over Google Hangout.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 1:57 PM, Michael Kun Yang wrote:

> It's alive. I just restarted it, but it doesn't help.
>
>
> On Friday, December 20, 2013, Michael (Bach) Bui wrote:
>
>> Check if your worker is “alive”
>> Also take a look at your master log and see if there is error message
>> about worker.
>>
>> This usually can be fixed by restarting Spark.
>>
>>
>>
>>
>>
>> On Dec 20, 2013, at 3:12 PM, Michael Kun Yang 
>> wrote:
>>
>> Hi,
>>
>> I really need help, I went through previous posts on the mailing list but
>> still cannot resolve this problem.
>>
>> It works when I use local[n] option, but error is occurred when I use
>> spark://master.local:7077.
>>
>> I checked the UI, the workers are correctly registered and I set the
>> SPARK_MEM compatible with my machine.
>>
>> Best
>>
>>
>>


Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Christopher Nguyen
Aureliano, how would your production data be coming in and accessed? It's
possible that you can still think of that level as a serial operation
(outer loop, large chunks) first before worrying about parallelizing the
computation of the tiny chunks.

And I'm reading that when you refer to "data duplication", you're worried
about that as a side-effect problem, not as a requirement, correct? And if
the former, I don't see that data duplication is a necessary side effect.
Unless I missed something in the thread, don't use broadcast.

Put another way, I see the scale of this challenge as far more operational
than logical (when squinted at from the right angle :)

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 1:07 PM, Aureliano Buendia wrote:

> Also over thinking is appreciated in this problem, as my production data
> is actually near 100 x 1000,000,000 and data duplication could get messy
> with this.
>
> Sorry about the initial misinformation, I was thinking about my
> development/test data.
>
>
> On Fri, Dec 20, 2013 at 9:04 PM, Aureliano Buendia 
> wrote:
>
>>
>>
>>
>> On Fri, Dec 20, 2013 at 9:00 PM, Tom Vacek wrote:
>>
>>> Totally agree.  Even with a 50x data replication, that's only 40 GB,
>>> which would be a fraction of standard cluster.  But since overthinking is a
>>> lot of fun, how about this: do a mapPartitions with a threaded subtask for
>>> each window.  Now you only need to replicate data across the boundaries of
>>> each partition of windows, rather than each window.
>>>
>>
>> How can this be written in spark scala?
>>
>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 2:53 PM, Christopher Nguyen wrote:
>>>
>>>> Are we over-thinking the problem here? Since the per-window compute
>>>> task is hugely expensive, stateless from window to window, and the original
>>>> big matrix is just 1GB, the primary gain in using a parallel engine is in
>>>> distributing and scheduling these (long-running, isolated) tasks. I'm
>>>> reading that data loading and distribution are going to be a tiny fraction
>>>> of the overall compute time.
>>>>
>>>> If that's the case, it would make sense simply to start with a 1GB
>>>> Array[Double] on the driver, from that create an RDD comprising 20,000 rows
>>>> of 5,000 doubles each, map them out to the workers and have them interpret
>>>> what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
>>>> have a good fraction of several days to figure it out :)
>>>>
>>>> This would be a great load test for Spark's resiliency over
>>>> long-running computations.
>>>>
>>>> --
>>>> Christopher T. Nguyen
>>>> Co-founder & CEO, Adatao <http://adatao.com>
>>>> linkedin.com/in/ctnguyen
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui <
>>>> free...@adatao.com> wrote:
>>>>
>>>>> Hmm, I misread that you need a sliding window.
>>>>> I am thinking out loud here: one way of dealing with this is to
>>>>> improve NLineInputFormat so that partitions will have a small overlapping
>>>>> portion in this case the overlapping portion is 50 columns
>>>>> So let say the matrix is divided into overlapping partitions like this
>>>>> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
>>>>> assign each partition to a mapper to do mapPartition on it.
>>>>>
>>>>>
>>>>> 
>>>>> Michael (Bach) Bui, PhD,
>>>>> Senior Staff Architect, ADATAO Inc.
>>>>> www.adatao.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui 
>>>>> wrote:
>>>>>
>>>>> Here, Tom assumed that you have your big matrix already being loaded
>>>>> in one machine. Now if you want to distribute it to slave nodes you will
>>>>> need to broadcast it. I would expect this broadcasting will be done once 
>>>>> at
>>>>> the beginning of your algorithm and the computation time will dominate the
>>>>> overall execution time.
>>>>>
>>>>> On the other hand, a better way to deal with h

Re: How to access a sub matrix in a spark task?

2013-12-20 Thread Christopher Nguyen
Are we over-thinking the problem here? Since the per-window compute task is
hugely expensive, stateless from window to window, and the original big
matrix is just 1GB, the primary gain in using a parallel engine is in
distributing and scheduling these (long-running, isolated) tasks. I'm
reading that data loading and distribution are going to be a tiny fraction
of the overall compute time.

If that's the case, it would make sense simply to start with a 1GB
Array[Double] on the driver, from that create an RDD comprising 20,000 rows
of 5,000 doubles each, map them out to the workers and have them interpret
what the 5,000 doubles mean in terms of a [100 x 50] sub-matrix. They each
have a good fraction of several days to figure it out :)

This would be a great load test for Spark's resiliency over long-running
computations.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Dec 20, 2013 at 11:36 AM, Michael (Bach) Bui wrote:

> Hmm, I misread that you need a sliding window.
> I am thinking out loud here: one way of dealing with this is to improve
> NLineInputFormat so that partitions will have a small overlapping portion
> in this case the overlapping portion is 50 columns
> So let say the matrix is divided into overlapping partitions like this
> [100 x col[1, n*50] ] , [100 x col[(n-1)*50+1, (2n-1)*50] ] … then we can
> assign each partition to a mapper to do mapPartition on it.
>
>
> 
> Michael (Bach) Bui, PhD,
> Senior Staff Architect, ADATAO Inc.
> www.adatao.com
>
>
>
>
> On Dec 20, 2013, at 1:11 PM, Michael (Bach) Bui 
> wrote:
>
> Here, Tom assumed that you have your big matrix already being loaded in
> one machine. Now if you want to distribute it to slave nodes you will need
> to broadcast it. I would expect this broadcasting will be done once at the
> beginning of your algorithm and the computation time will dominate the
> overall execution time.
>
> On the other hand, a better way to deal with huge matrix is to store the
> data in hdfs and load data into each slaves partition-by-partition. This is
> fundamental data processing pattern in Spark/Hadoop world.
> If you opt to do this, you will have to use suitable InputFormat to make
> sure each partition has the right amount of row that you want.
> For example if you are lucky each HDFS partition have exact n*50 rows,
> then you can use rdd.mapPartition(func). Where func will take care of
> splitting n*50-row partition into n sub matrix
>
> However, HDFS TextInput or SequnceInputFormat format will not guarantee
> each partition has certain number of rows. What you want is
> NLineInputFormat, which I think currently has not been pulled into Spark
> yet.
> If everyone think this is needed, I can implement it quickly, it should be
> pretty easy.
>
>
> 
> Michael (Bach) Bui, PhD,
> Senior Staff Architect, ADATAO Inc.
> www.adatao.com
>
>
>
>
> On Dec 20, 2013, at 12:38 PM, Aureliano Buendia 
> wrote:
>
>
>
>
> On Fri, Dec 20, 2013 at 6:00 PM, Tom Vacek  wrote:
>
>> Oh, I see.  I was thinking that there was a computational dependency on
>> one window to the next.  If the computations are independent, then I think
>> Spark can help you out quite a bit.
>>
>> I think you would want an RDD where each element is a window of your
>> dense matrix.  I'm not aware of a way to distribute the windows of the big
>> matrix in a way that doesn't involve broadcasting the whole thing.  You
>> might have to tweak some config options, but I think it would work
>> straightaway.  I would initialize the data structure like this:
>> val matB = sc.broadcast(myBigDenseMatrix)
>> val distributedChunks = sc.parallelize(0 until
>> numWindows).mapPartitions(it => it.map(windowID => getWindow(matB.value,
>> windowID) ) )
>>
>
> Here broadcast is used instead of calling parallelize on myBigDenseMatrix.
> Is it okay to broadcast a huge amount of data? Does sharing a big data mean
> a big network io overhead comparing to calling parallelize, or is this
> overhead optimized due to the of partitioning?
>
>
>>
>> Then just apply your matrix ops as map on
>>
>> You maybe have your own tool for dense matrix ops, but I would suggest
>> Scala Breeze.  You'll have to use an old version of Breeze (current builds
>> are for 2.10).  Spark with Scala-2.10 is a little way off.
>>
>>
>> On Fri, Dec 20, 2013 at 11:40 AM, Aureliano Buendia > > wrote:
>>
>>>
>>>
>>>
>>> On Fri, Dec 20, 2013 at 5:21 PM, Tom Vacek wrote:
>>>
 If you use an RDD[Array[Double]] with a row decomposition of the
 matrix, you can index windows of the rows all you want, but you're limited
 to 100 concurrent tasks.  You could use a column decomposition and access
 subsets of the columns with a PartitionPruningRDD.  I have to say, though,
 if you're doing dense matrix operations, they will be 100s of times faster
 on a shared mem platform.  This particular matrix, at 

Re: DoubleMatrix vs Array[Array[Double]] : Question about debugging serialization performance issues

2013-12-19 Thread Christopher Nguyen
Guillaume seemed to be able to do this on a per-iteration basis, so it is
reasonable to expect that it can be done once. So it's a 50-50 call that it
may indeed be something that was "unknowingly changed". Also, are you
reading the data and parsing in on the slaves, or really serializing it
from one driver?

Guillaume can you post the relevant code so we can help stare at it and
consider what's happening where. We've done a lot of Spark-JBLAS code so
are reasonably familiar with the memory utilization patterns. It may also
be relevant whether you doing scalar, vector, or matrix-matrix operations,
although that bears more directly on native memory.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Dec 19, 2013 at 10:33 AM, Matei Zaharia wrote:

> Hi Guillaume,
>
> I haven’t looked at the serialization of DoubleMatrix but I believe it
> just creates one big Array[Double] instead of many ones, and stores all the
> rows contiguously in that. I don’t think that would be slower to serialize.
> However, because the object is bigger overall, it might need to get
> allocated in another part of the heap (e.g. instead of in the new
> generation), which causes more GC and may cause out-of-memory sooner. How
> big are these matrices? You might want to calculate what exactly is taking
> up memory.
>
> Matei
>
> On Dec 19, 2013, at 2:22 AM, Guillaume Pitel 
> wrote:
>
>  Hi Sparkers,
>
> A bit of context :
>
> I'm working on a Fast SVD method for inclusion in Mllib (to perform a
> Latent semantic analysis on a large corpora).
>
> I started using the same approach that the ALS algorithm, but this
> approach was unable to cope with the kind of data size I want to process
> (at least not on my little cluster of 5 nodes with 32GB ram). For now I'm
> working with the English Wikipedia corpus, which produces sparse matrices
> of 4.5M documents x 2.5M terms. I think that with the ALS approach it
> didn't even manage to finish a half-iteration, and simply preparing the
> blocked sparse matrix
>
> So I've rewritten the whole thing, changed the approach, and I've reached
> interesting performance (about 2 iterations in one hour).
>
> Then I realized that going from Array[Array[Double]] to jblas.DoubleMatrix
> actually created a copy of the Array, so I thought that I could gain a lot
> of memory and GC time by just working with DoubleMatrix and never going
> back and forth from/to Array[Array[Double]]
>
> But with this approach, the performance seems to be seriously degraded,
> and OOM errors happens while it didn't before. So my question is : is it
> possible that serializing DoubleMatrix instead of Array[Array[Double]]
> could really degrade the performance, or did I unknowingly changed
> something in my code ?
>
> How can I debug the size and time of the serialization ? I general, are
> there some guidelines right choices for datatypes used in outputs of RDD
> maps/reduce ?
>
> In case it can help, here is a stacktrace of the OOM error I got :
>
> java.lang.OutOfMemoryError: Java heap space
>   at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>   at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>   at 
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
>   at 
> org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:202)
>   at 
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
>   at 
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
>   at 
> org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
>   at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:74)
>   at 
> org.apache.spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:608)
>   at org.apache.spark.storage.BlockManager.put(BlockManager.scala:604)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
>
>
> Thanks in 

Re: Incremental Updates to an RDD

2013-12-10 Thread Christopher Nguyen
Wes, it depends on what you mean by "sliding window" as related to "RDD":

   1. Some operation over multiple rows of data within a single, large RDD,
   for which the operations are required to be temporally sequential. This may
   be the case where you're computing a running average over historical
   time-based data.
   2. Some operation over multiple rows of data within a single, large RDD,
   for which the operations may be run in parallel, even out of order. This
   may be the case where your RDD represents a two-dimensional geospatial map
   and you're computing something (e.g., population average) over a grid.
   3. Some operation on data streaming in, over a fixed-size window, and
   you would like the representation of that windowed data to be an RDD.

For #1 and #2, there's only one "static" RDD and the task is largely
bookkeeping: tracking which window you're working on when, and dealing with
partition boundaries (*mapPartitions* or *mapPartitionsWithIndex *would be
a useful interface here as it allows you to see multiple rows at a time, as
well as know what partition # you're working with at any given time).

For #3, that's what Spark Streaming does, and it does so by introducing a
higher-level concept of a DStream, which is a sequence of RDDs, where each
RDD is one data sample. Given that it is a collection of RDDs, the
windowing management task simply involves maintaining what RDDs are
contained that sequence.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Dec 10, 2013 at 12:01 PM, Wes Mitchell wrote:

> So, does that mean that if I want to do a sliding window, then I have to,
> in some fashion,
> build a stream from the RDD, push a new value on the head, filter out the
> oldest value, and
> re-persist as an RDD?
>
>
>
>
> On Fri, Dec 6, 2013 at 10:13 PM, Christopher Nguyen wrote:
>
>> Kyle, the fundamental contract of a Spark RDD is that it is immutable.
>> This follows the paradigm where data is (functionally) transformed into
>> other data, rather than mutated. This allows these systems to make certain
>> assumptions and guarantees that otherwise they wouldn't be able to.
>>
>> Now we've been able to get mutative behavior with RDDs---for fun,
>> almost---but that's implementation dependent and may break at any time.
>>
>> It turns out this behavior is quite appropriate for the analytic stack,
>> where you typically apply the same transform/operator to all data. You're
>> finding that transactional systems are the exact opposite, where you
>> typically apply a different operation to individual pieces of the data.
>> Incidentally this is also the dichotomy between column- and row-based
>> storage being optimal for each respective pattern.
>>
>> Spark is intended for the analytic stack. To use Spark as the persistence
>> layer of a transaction system is going to be very awkward. I know there are
>> some vendors who position their in-memory databases as good for both OLTP
>> and OLAP use cases, but when you talk to them in depth they will readily
>> admit that it's really optimal for one and not the other.
>>
>> If you want to make a project out of making a special Spark RDD that
>> supports this behavior, it might be interesting. But there will be no
>> simple shortcuts to get there from here.
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 6, 2013 at 10:56 PM, Kyle Ellrott wrote:
>>
>>> I'm trying to figure out if I can use an RDD to backend an interactive
>>> server. One of the requirements would be to have incremental updates to
>>> elements in the RDD, ie transforms that change/add/delete a single element
>>> in the RDD.
>>> It seems pretty drastic to do a full RDD filter to remove a single
>>> element, or do the union of the RDD with another one of size 1 to add an
>>> element. (Or is it?) Is there an efficient way to do this in Spark? Are
>>> there any example of this kind of usage?
>>>
>>> Thank you,
>>> Kyle
>>>
>>
>>
>


Re: Incremental Updates to an RDD

2013-12-09 Thread Christopher Nguyen
Kyle, many of your design goals are something we also want. Indeed it's
interesting you separate "resilient" from RDD, as I've suggested there
should be ways to boost performance if you're willing to give up some or
all of the "R" guarantees.

We haven't started looking into this yet due to other priorities. If
someone with similar design goals wants to get started that'd be great.

To be sure, a semi-shortcut to what you want may be found by looking at
Tachyon. It's fairly early days for Tachyon so I don't know what its actual
behavior would be under transactional loads.

Sent while mobile. Pls excuse typos etc.
On Dec 9, 2013 10:47 AM, "Kyle Ellrott"  wrote:

> I'd like to use Spark as an analytical stack, the only difference is that
> I would like find the best way to connect it to a dataset that I'm actively
> working on. Perhaps saying 'updates to an RDD' is a bit of a loaded term, I
> don't need the 'resilient', just a distributed data set.
> Right now, the best way I can think of doing that is working with the data
> in a distributed system, like HBase, then when I want to do my analytics, I
> use the HadoopInputFormat readers to transfer the data from the HBase
> system to Spark and then do my analytics. Of course, then I have the
> overhead of serialization/deserialization and network transfer before I can
> even start my calculations. If I already held the dataset in the Spark
> processes, then I could start calculations immediately.
> So is there is a 'better' way to manage a distributed data set, which
> would then serve as an input to Spark RDDs?
>
> Kyle
>
>
>
>
> On Fri, Dec 6, 2013 at 10:13 PM, Christopher Nguyen wrote:
>
>> Kyle, the fundamental contract of a Spark RDD is that it is immutable.
>> This follows the paradigm where data is (functionally) transformed into
>> other data, rather than mutated. This allows these systems to make certain
>> assumptions and guarantees that otherwise they wouldn't be able to.
>>
>> Now we've been able to get mutative behavior with RDDs---for fun,
>> almost---but that's implementation dependent and may break at any time.
>>
>> It turns out this behavior is quite appropriate for the analytic stack,
>> where you typically apply the same transform/operator to all data. You're
>> finding that transactional systems are the exact opposite, where you
>> typically apply a different operation to individual pieces of the data.
>> Incidentally this is also the dichotomy between column- and row-based
>> storage being optimal for each respective pattern.
>>
>> Spark is intended for the analytic stack. To use Spark as the persistence
>> layer of a transaction system is going to be very awkward. I know there are
>> some vendors who position their in-memory databases as good for both OLTP
>> and OLAP use cases, but when you talk to them in depth they will readily
>> admit that it's really optimal for one and not the other.
>>
>> If you want to make a project out of making a special Spark RDD that
>> supports this behavior, it might be interesting. But there will be no
>> simple shortcuts to get there from here.
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Fri, Dec 6, 2013 at 10:56 PM, Kyle Ellrott wrote:
>>
>>> I'm trying to figure out if I can use an RDD to backend an interactive
>>> server. One of the requirements would be to have incremental updates to
>>> elements in the RDD, ie transforms that change/add/delete a single element
>>> in the RDD.
>>> It seems pretty drastic to do a full RDD filter to remove a single
>>> element, or do the union of the RDD with another one of size 1 to add an
>>> element. (Or is it?) Is there an efficient way to do this in Spark? Are
>>> there any example of this kind of usage?
>>>
>>> Thank you,
>>> Kyle
>>>
>>
>>
>


Re: Writing an RDD to Hive

2013-12-08 Thread Christopher Nguyen
Philip, fwiw we do go with including Shark as a dependency for our needs,
making a fat jar, and it works very well. It was quite a bit of pain what
with the Hadoop/Hive transitive dependencies, but for us it was worth it.

I hope that serves as an existence proof that says Mt Everest has been
climbed, likely by more than just ourselves. Going forward this should be
getting easier.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Dec 6, 2013 at 7:06 PM, Philip Ogren wrote:

>  I have a simple scenario that I'm struggling to implement.  I would like
> to take a fairly simple RDD generated from a large log file, perform some
> transformations on it, and write the results out such that I can perform a
> Hive query either from Hive (via Hue) or Shark.  I'm having troubles with
> the last step.  I am able to write my data out to HDFS and then execute a
> Hive create table statement followed by a load data statement as a separate
> step.  I really dislike this separate manual step and would like to be able
> to have it all accomplished in my Spark application.  To this end, I have
> investigated two possible approaches as detailed below - it's probably too
> much information so I'll ask my more basic question first:
>
> Does anyone have a basic recipe/approach for loading data in an RDD to a
> Hive table from a Spark application?
>
> 1) Load it into HBase via PairRDDFunctions.saveAsHadoopDataset.  There is
> a nice detailed email on how to do this 
> here.
> I didn't get very far thought because as soon as I added an hbase
> dependency (corresponding to the version of hbase we are running) to my
> pom.xml file, I had an slf4j dependency conflict that caused my current
> application to explode.  I tried the latest released version and the slf4j
> dependency problem went away but then the deprecated class
> TableOutputFormat no longer exists.  Even if loading the data into hbase
> were trivially easy (and the detailed email suggests otherwise) I would
> then need to query HBase from Hive which seems a little clunky.
>
> 2) So, I decided that Shark might be an easier option.  All the examples
> provided in their documentation seem to assume that you are using Shark as
> an interactive application from a shell.  Various threads I've seen seem to
> indicate that Shark isn't really intended to be used as dependency in your
> Spark code (see 
> thisand
> that.)
> It follows then that one can't add a Shark dependency to a pom.xml file
> because Shark isn't released via Maven Central (that I can tell perhaps
> it's in some other repo?)  Of course, there are ways of creating a local
> dependency in maven but it starts to feel very hacky.
>
> I realize that I've given sufficient detail to expose my ignorance in a
> myriad of ways.  Please feel free to shine light on any of my
> misconceptions!
>
> Thanks,
> Philip
>
>


Re: Incremental Updates to an RDD

2013-12-06 Thread Christopher Nguyen
Kyle, the fundamental contract of a Spark RDD is that it is immutable. This
follows the paradigm where data is (functionally) transformed into other
data, rather than mutated. This allows these systems to make certain
assumptions and guarantees that otherwise they wouldn't be able to.

Now we've been able to get mutative behavior with RDDs---for fun,
almost---but that's implementation dependent and may break at any time.

It turns out this behavior is quite appropriate for the analytic stack,
where you typically apply the same transform/operator to all data. You're
finding that transactional systems are the exact opposite, where you
typically apply a different operation to individual pieces of the data.
Incidentally this is also the dichotomy between column- and row-based
storage being optimal for each respective pattern.

Spark is intended for the analytic stack. To use Spark as the persistence
layer of a transaction system is going to be very awkward. I know there are
some vendors who position their in-memory databases as good for both OLTP
and OLAP use cases, but when you talk to them in depth they will readily
admit that it's really optimal for one and not the other.

If you want to make a project out of making a special Spark RDD that
supports this behavior, it might be interesting. But there will be no
simple shortcuts to get there from here.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Fri, Dec 6, 2013 at 10:56 PM, Kyle Ellrott  wrote:

> I'm trying to figure out if I can use an RDD to backend an interactive
> server. One of the requirements would be to have incremental updates to
> elements in the RDD, ie transforms that change/add/delete a single element
> in the RDD.
> It seems pretty drastic to do a full RDD filter to remove a single
> element, or do the union of the RDD with another one of size 1 to add an
> element. (Or is it?) Is there an efficient way to do this in Spark? Are
> there any example of this kind of usage?
>
> Thank you,
> Kyle
>


Re: Benchmark numbers for terabytes of data

2013-12-04 Thread Christopher Nguyen
Matt, we've done 1TB linear models in 2-3 minutes on 40 node clusters
(30GB/node, just enough to hold all partitions simultaneously in memory).
You can do with fewer nodes if you're willing to slow things down.

Some of our TB benchmark numbers are available in my Spark Summit slides.
Sorry I'm on a plane now but you should be able to find the slides fairly
easily.

Re your other comment: monolithic 100-node analytic clusters are not
unusual, but not yet common outside of large companies. I'd eduguesstimate
it to be at the top 5%ile among companies with less than $500MM revenues,
with selection bias among Silicon Valley companies.

Sent while mobile. Pls excuse typos etc.
On Dec 4, 2013 11:06 AM, "Matt Cheah"  wrote:

>  I'm reading the paper now, thanks. It states 100-node clusters were
> used. Is this typical in the field to have 100 node clusters for the 1TB
> scale? We were expecting to be using ~10 nodes.
>
>  I'm still pretty new to cluster computing, so just not sure how people
> have set these up.
>
>  -Matt Cheah
>
>   From: Matei Zaharia 
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Wednesday, December 4, 2013 10:53 AM
> To: "user@spark.incubator.apache.org" 
> Cc: Mingyu Kim 
> Subject: Re: Benchmark numbers for terabytes of data
>
>   Yes, check out the Shark paper for example:
> https://amplab.cs.berkeley.edu/publication/shark-sql-and-rich-analytics-at-scale/
>
>  The numbers on that benchmark are for Shark.
>
>  Matei
>
>  On Dec 3, 2013, at 3:50 PM, Matt Cheah  wrote:
>
>  Hi everyone,
>
>  I notice the benchmark page for AMPLab provides some numbers on Gbs of
> data: https://amplab.cs.berkeley.edu/benchmark/ I was wondering if
> similar benchmark numbers existed for even larger data sets, in the
> terabytes if possible.
>
>  Also, are there any for just raw spark, i.e. No shark?
>
>  Thanks,
>
>  -Matt Chetah
>
>
>


Re: Using Spark/Shark for massive time series data storage/processing

2013-11-22 Thread Christopher Nguyen
Yizheng, one thing you can do is to use Spark Streaming to ingest the
incoming data and store it in HDFS. You can use Shark to impose a schema on
this data. Spark/Shark can easily handle 30GB/day.

For visualization and analysis, you may want to take a look at Adatao
pAnalytics for R, which is built on top of Spark/Shark.

Sent while mobile. Pls excuse typos etc.
On Nov 21, 2013 3:08 PM, "Yizheng Liao"  wrote:

> Hi, everyone:
>
> I am new to the Spark Project. We are working on a wireless sensor
> network. We hope to know if Spark/Shark is good for time series data
> storage and processing. The maximum data input we have is about 30GB/day.
> Also, we hope to visualize the collected data in real-time.
>
> In addition, I hope to know if there is a solution for using R and
> Spark/Shark. We know there is a R library for Hive. Is there a plan for
> Spark/Shark project to provide R API/library?
>
> Thanks!
>
> Yizheng
>


Re: Does spark RDD has a partitionedByKey

2013-11-15 Thread Christopher Nguyen
Jiacheng, if you're OK with using the Shark layer above Spark (and I think
for many use cases the answer would be "yes"), then you can take advantage
of Shark's co-partitioning. Or do something like
https://github.com/amplab/shark/pull/100/commits

Sent while mobile. Pls excuse typos etc.
On Nov 16, 2013 2:48 AM, "guojc"  wrote:

> Hi Meisam,
>  What I want to achieve here is a bit tricky. Basically, I'm try to
> implement PerSplit SemiJoin in a iterative algorithm with Spark. It's a
> very efficient join strategy for high in-balanced data set and provide huge
> gain against normal join in that situation.,
>
>  Let's say we have two large rdd, a: RDD[X] and b: RDD[Y,Z] and both
> of them load directly from hdfs. So both of them will has a partitioner of
> Nothing. And X is a large complicate struct contain a set of join key Y.
>  First for each partition of a , I extract join key Y from every ins of X
> in that parition and construct a hash set of join key Y and paritionID. Now
> I have a new rdd c :RDD[Y,PartionID ] and join it with b on Y and then
> construct a rdd d:RDD[PartitionID,Map[Y,Z]] by groupby on PartitionID and
> constructing map of Y and Z.  As for each partition of a, I want to
> repartiion it according to its partition id, and it becomes a rdd
>  e:RDD[PartitionID,X]. As both d and e will same partitioner and same key,
> they will be joined very efficiently.
>
> The key ability I want to have here is the ability to cache rdd c with
> same partitioner of rdd b and cache e. So later join with b and d will be
> efficient, because the value of b will be updated from time to time and d's
> content will change accordingly. And It will be nice to have the ability to
> repartition a with its original paritionid without actually shuffle across
> network.
>
> You can refer to
> http://researcher.watson.ibm.com/researcher/files/us-ytian/hadoopjoin.pdf for
> PerSplit SemiJoin's details.
>
> Best Regards,
> Jiacheng Guo
>
>
> On Sat, Nov 16, 2013 at 3:02 AM, Meisam Fathi wrote:
>
>> Hi guojc,
>>
>> It is not cleat for me what problem you are trying to solve. What do
>> you want to do with the result of your
>> groupByKey(myPartitioner).flatMapValues( x=>x)? Do you want to use it
>> in a join? Do you want to save it to your file system? Or do you want
>> to do something else with it?
>>
>> Thanks,
>> Meisam
>>
>> On Fri, Nov 15, 2013 at 12:56 PM, guojc  wrote:
>> > Hi Meisam,
>> > Thank you for response. I know each rdd has a partitioner. What I
>> want
>> > to achieved here is re-partition a piece of data according to my custom
>> > partitioner. Currently I do that by
>> groupByKey(myPartitioner).flatMapValues(
>> > x=>x). But I'm a bit worried whether this will create additional temp
>> object
>> > collection, as result is first made into Seq the an collection of
>> tupples.
>> > Any suggestion?
>> >
>> > Best Regards,
>> > Jiahcheng Guo
>> >
>> >
>> > On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi 
>> > wrote:
>> >>
>> >> Hi Jiacheng,
>> >>
>> >> Each RDD has a partitioner. You can define your own partitioner if the
>> >> default partitioner does not suit your purpose.
>> >> You can take a look at this
>> >>
>> >>
>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
>> .
>> >>
>> >> Thanks,
>> >> Meisam
>> >>
>> >> On Fri, Nov 15, 2013 at 6:54 AM, guojc  wrote:
>> >> > Hi,
>> >> >   I'm wondering whether spark rdd can has a partitionedByKey
>> function?
>> >> > The
>> >> > use of this function is to have a rdd distributed by according to a
>> >> > cerntain
>> >> > paritioner and cache it. And then further join performance by rdd
>> with
>> >> > same
>> >> > partitoner will a great speed up. Currently, we only have a
>> >> > groupByKeyFunction and generate a Seq of desired type , which is not
>> >> > very
>> >> > convenient.
>> >> >
>> >> > Btw, Sorry for last empty body email. I mistakenly hit the send
>> >> > shortcut.
>> >> >
>> >> >
>> >> > Best Regards,
>> >> > Jiacheng Guo
>> >
>> >
>>
>
>


Re: DataFrame RDDs

2013-11-15 Thread Christopher Nguyen
Sure, Shay. Let's connect offline.

Sent while mobile. Pls excuse typos etc.
On Nov 16, 2013 2:27 AM, "Shay Seng"  wrote:

> Nice, any possibility of sharing this code in advance?
>
>
> On Fri, Nov 15, 2013 at 11:22 AM, Christopher Nguyen wrote:
>
>> Shay, we've done this at Adatao, specifically a big data frame in RDD
>> representation and subsetting/projections/data mining/machine learning
>> algorithms on that in-memory table structure.
>>
>> We're planning to harmonize that with the MLBase work in the near future.
>> Just a matter of prioritization on limited resources. If there's enough
>> interest we'll accelerate that.
>>
>> Sent while mobile. Pls excuse typos etc.
>> On Nov 16, 2013 1:11 AM, "Shay Seng"  wrote:
>>
>>> Hi,
>>>
>>> Is there some way to get R-style Data.Frame data structures into RDDs?
>>> I've been using RDD[Seq[]] but this is getting quite error-prone and the
>>> code gets pretty hard to read especially after a few joins, maps etc.
>>>
>>> Rather than access columns by index, I would prefer to access them by
>>> name.
>>> e.g. instead of writing:
>>> myrdd.map(l => Seq(l(0), l(1), l,(4), l(9))
>>> I would prefer to write
>>> myrdd.map(l => DataFrame(l.id, l.entryTime, l.exitTime, l.cost))
>>>
>>> Also joins are particularly irritating. Currently I have to first
>>> construct a pair:
>>> somePairRdd.join(myrdd.map(l=> (l(1),l(2)), (l(0),l(1),l(2),l(3)))
>>> Now I have to unzip away the join-key and remap the values into a seq
>>>
>>> instead I would rather write
>>> someDataFrame.join(myrdd , l=> l.entryTime && l.exitTime)
>>>
>>>
>>> The question is this:
>>> (1) I started writing a DataFrameRDD class that kept track of the column
>>> names and column values, and some optional attributes common to the entire
>>> dataframe. However I got a little muddled when trying to figure out what
>>> happens when a dataframRDD is chained with other operations and get
>>> transformed to other types of RDDs. The Value part of the RDD is obvious,
>>> but I didn't know the best way to pass on the "column and attribute"
>>> portions of the DataFrame class.
>>>
>>> I googled around for some documentation on how to write RDDs, but only
>>> found a pptx slide presentation with very vague info. Is there a better
>>> source of info on how to write RDDs?
>>>
>>> (2) Even better than info on how to write RDDs, has anyone written an
>>> RDD that functions as a DataFrame? :-)
>>>
>>> tks
>>> shay
>>>
>>
>


Re: DataFrame RDDs

2013-11-15 Thread Christopher Nguyen
Shay, we've done this at Adatao, specifically a big data frame in RDD
representation and subsetting/projections/data mining/machine learning
algorithms on that in-memory table structure.

We're planning to harmonize that with the MLBase work in the near future.
Just a matter of prioritization on limited resources. If there's enough
interest we'll accelerate that.

Sent while mobile. Pls excuse typos etc.
On Nov 16, 2013 1:11 AM, "Shay Seng"  wrote:

> Hi,
>
> Is there some way to get R-style Data.Frame data structures into RDDs?
> I've been using RDD[Seq[]] but this is getting quite error-prone and the
> code gets pretty hard to read especially after a few joins, maps etc.
>
> Rather than access columns by index, I would prefer to access them by name.
> e.g. instead of writing:
> myrdd.map(l => Seq(l(0), l(1), l,(4), l(9))
> I would prefer to write
> myrdd.map(l => DataFrame(l.id, l.entryTime, l.exitTime, l.cost))
>
> Also joins are particularly irritating. Currently I have to first
> construct a pair:
> somePairRdd.join(myrdd.map(l=> (l(1),l(2)), (l(0),l(1),l(2),l(3)))
> Now I have to unzip away the join-key and remap the values into a seq
>
> instead I would rather write
> someDataFrame.join(myrdd , l=> l.entryTime && l.exitTime)
>
>
> The question is this:
> (1) I started writing a DataFrameRDD class that kept track of the column
> names and column values, and some optional attributes common to the entire
> dataframe. However I got a little muddled when trying to figure out what
> happens when a dataframRDD is chained with other operations and get
> transformed to other types of RDDs. The Value part of the RDD is obvious,
> but I didn't know the best way to pass on the "column and attribute"
> portions of the DataFrame class.
>
> I googled around for some documentation on how to write RDDs, but only
> found a pptx slide presentation with very vague info. Is there a better
> source of info on how to write RDDs?
>
> (2) Even better than info on how to write RDDs, has anyone written an RDD
> that functions as a DataFrame? :-)
>
> tks
> shay
>


Re: Not caching rdds, spark.storage.memoryFraction setting

2013-11-08 Thread Christopher Nguyen
Grega, the way to think about this setting is that it sets the maximum
amount of memory Spark is allowed to use for caching RDDs before it must
expire or spill them to disk. Spark in principle knows at all times how
many RDDs are kept in memory and their total sizes, so it can for example
persist then free older RDDs when it's allocating space for new RDDs, when
this limit is hit.

There's otherwise no partitioning of the heap to reserve for RDDs vs.
"normal" objects. The entire heap is still managed by the JVM, accessible
to your client code. Of course you do want to be careful with and minimize
your own memory use to avoid OOMEs.

Sent while mobile. Pls excuse typos etc.
On Nov 8, 2013 8:02 AM, "Grega Kešpret"  wrote:

> Hi,
>
> The docs say: Fraction of Java heap to use for Spark's memory cache. This
> should not be larger than the "old" generation of objects in the JVM, which
> by default is given 2/3 of the heap, but you can increase it if you
> configure your own old generation size.
>
> if we are not caching any RDDs, does it mean that we only have
> 1-memoryFraction heap available for "normal" JVM objects? Would it make
> sense then to set memoryFraction to 0?
>
> Thanks,
>
> Grega
> --
> [image: Inline image 1]
> *Grega Kešpret*
> Analytics engineer
>
> Celtra — Rich Media Mobile Advertising
> celtra.com  | 
> @celtramobile
>
<>

Re: almost sorted data

2013-10-28 Thread Christopher Nguyen
Nathan: why iterator semantics could be more appropriate than a
materialized list: the partition data could be sitting on disk, which could
be streamed into RAM upon access, and which could be left untouched if the
algo decided by some condidtional logic that it didn't need it. And you
could always get to a list from an iterator.

Best,
--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Mon, Oct 28, 2013 at 7:44 AM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> I'm not sure what you're asking.
>
> At some level, all RDDs only do partition-wise operations - they all only
> operate on one partition at a time.
>
> I suspect what you're looking for is something where you could just write:
>
> data.mapPartitions(_.sortBy(...))
>
> If that's what you want, then no - but only because Iterator has no sortBy
> method.  I'm not sure why mapPartitions hands one an iterator rather than a
> list.  Presumably so one can avoid having to have the whole partition in
> memory at once - but equally presumably, one already has the whole
> partition in memory at once, so that seems odd to me.  Anyone know why?
> Perhaps to allow for worst-case scenarios?
>
>  -Nathan
>
>
>
> On Mon, Oct 28, 2013 at 4:54 AM, Arun Kumar  wrote:
>
>> I will try using per partition sorted data. Can I also use groupBy and
>> join per partition? Basically I want to restrict the computation per
>> partition like using this 
>> data.mapPartitions(_.toList.sortBy(...).toIterator).
>> Is there a more direct way to create a RDD that does partition wise
>> operations?
>>
>>
>> On Sat, Oct 26, 2013 at 3:50 AM, Aaron Davidson wrote:
>>
>>> Currently, our sortByKey should be using Java's native Timsort
>>> implementation, which is an adaptive sort. That should also mean sorting is
>>> very fast for almost-sorted data. The overhead you're seeing might be
>>> caused by reshuffling everything during the range partitioning step *before
>>> *the sort, which has to serialize all your data.
>>>
>>> Nathan's solution might then work out nicely for you, as it will avoid
>>> shuffling the data.
>>>
>>>
>>> On Fri, Oct 25, 2013 at 9:18 AM, Josh Rosen wrote:
>>>
 Adaptive sorting algorithms (
 https://en.wikipedia.org/wiki/Adaptive_sort) can benefit from
 presortedness in their inputs, so that might be a helpful search term
 for researching this problem.


 On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
 nkronenf...@oculusinfo.com> wrote:

> I suspect from his description the difference is negligible for his
> case.  However, there are ways around that anyway.
>
> Assuming a fixed data set (as opposed to something like a streaming
> example, where there is no last element), one can take 3 passes to:
>
>1. get the last element of each partition
>2. take elements from each partition that fall before the last
>element of the previous partition, separate them from the rest of their
>partition
>3. and add them to the previous (whichever previous is
>appropriate, in really degenerate cases, which it sounds like he 
> doesn't
>have) in the right location
>
>
>
>
> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter 
> wrote:
>
>> Using a local sort per partition only gives a correct result if the
>> data
>> is already range partitioned.
>>
>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>> > Since no one else has answered...
>> > I assume:
>> >
>> > data.mapPartitions(_.toList.sortBy(...).toIterator)
>> >
>> > would work, but I also suspect there's a better way.
>> >
>> >
>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar 
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> I am trying to process some logs and the data is sorted(*almost*)
>> by
>> >> timestamp.
>> >> If I do a full sort it takes a lot of time. Is there some way to
>> sort more
>> >> efficiently (like restricting sort to per partition).
>> >>
>> >> Thanks in advance
>> >>
>> >
>> >
>> >
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


>>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


Re: Visitor function to RDD elements

2013-10-22 Thread Christopher Nguyen
Ah, this is a slightly different problem statement, in that you may still
gain in taking advantage of Spark's parallelization for the data
transformation.

If you want to avoid the serdes+network overhead of sending the results
back to the driver, and instead have a consumer/sink to send the result set
to, you might consider having a single reducer that streams the rows out to
a local temporary file(s), then have the final reduce send (or trigger an
external send) of that result set to your consumer. 2GB files are fairly
small relative to TB disk sizes, and can easily stream within 10+ seconds
for 100MB/s local disk or network bandwidths.

If the original transformation would have taken minutes or longer
sequentially then this approach is still a win in performance.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 3:48 PM, Matt Cheah  wrote:

>  Thanks everyone – I think we're going to go with collect() and kick out
> things that attempt to obtain overly large sets.
>
>  However, I think my original concern still stands. Some reading online
> shows that Microsoft Excel, for example, supports displaying something on
> the order of 2-4 GB sized spreadsheets (
> http://social.technet.microsoft.com/Forums/office/en-US/60bf34fb-5f02-483a-a54b-645cc810b30f/excel-2013-file-size-limits-powerpivot?forum=officeitpro).
> If there is a 2GB RDD however streaming it all back to the driver seems
> wasteful where in reality we could fetch chunks of it at a time and load
> only parts in driver memory, as opposed to using 2GB of RAM on the driver.
> In fact I don't know what the maximum frame size that can be set would be
> via spark.akka.framesize.
>
>  -Matt Cheah
>
>   From: Mark Hamstra 
> Reply-To: "user@spark.incubator.apache.org" <
> user@spark.incubator.apache.org>
> Date: Tuesday, October 22, 2013 3:32 PM
> To: user 
>
> Subject: Re: Visitor function to RDD elements
>
>   Correct; that's the completely degenerate case where you can't do
> anything in parallel.  Often you'll also want your iterator function to
> send back some information to an accumulator (perhaps just the result
> calculated with the last element of the partition) which is then fed back
> into the operation on the next partition as either a broadcast variable or
> part of the closure.
>
>
>
> On Tue, Oct 22, 2013 at 3:25 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> You shouldn't have to fly data around
>>
>>  You can just run it first on partition 0, then on partition 1, etc...
>>  I may have the name slightly off, but something approximately like:
>>
>>  for (p <- 0 until numPartitions)
>>   data.mapPartitionsWithIndex((i, iter) => if (0 == p) iter.map(fcn) else
>> List().iterator)
>>
>>  should work... BUT that being said, you've now really lost the point of
>> using Spark to begin with.
>>
>>
>


Re: Visitor function to RDD elements

2013-10-22 Thread Christopher Nguyen
For better precision,

s/Or to be able to handle very large data sets ("big memory")/Or to be able
to hold very large data sets in one place ("big memory")/g

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen  wrote:

> Matt, it would be useful to back up one level to your problem statement.
> If it is strictly restricted as described, then you have a sequential
> problem that's not parallelizable. What is the primary design goal here? To
> complete the operation in the shortest time possible ("big compute")? Or to
> be able to handle very large data sets ("big memory")?  Or to ensure that
> the operation completes in a fault-tolerant manner ("reliability")?
>
> There are two paths from here:
>
>1. Finding parallelizable opportunities: there may be ways to squint
>at the problem in just the right way that provides a way to parallelize it:
>   - Maybe you can come up with some algebra or approximations that
>   allows for associativity, so that different partitions of the data can 
> be
>   operated on in parallel.
>   - Perhaps the data is a time series where weekly or monthly chunks
>   can be summarized in parallel and the sequential logic can be brought up
>   several hierarchical levels.
>   - Perhaps the statefulness of the visitor has a finite memory of
>   past visits that you can take advantage of.
>   2. Finding alternatives: it's important to realize that Spark's
>strength is in "big compute" and not in "big memory". It's only 1 of the 13
>dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
>(cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
>It's a very successful model, but one that sometimes requires a refactoring
>of the algorithm/data to make it applicable. So if #1 above isn't at all
>possible, you might look into a "big memory" approach, such as Tachyon, or
>memcached, or even just reading a big file sequentially and applying your
>visitor to each data row, depending critically on what bottleneck you are
>engineering against.
>
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah  wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.s

Re: Visitor function to RDD elements

2013-10-22 Thread Christopher Nguyen
Matt, it would be useful to back up one level to your problem statement. If
it is strictly restricted as described, then you have a sequential problem
that's not parallelizable. What is the primary design goal here? To
complete the operation in the shortest time possible ("big compute")? Or to
be able to handle very large data sets ("big memory")?  Or to ensure that
the operation completes in a fault-tolerant manner ("reliability")?

There are two paths from here:

   1. Finding parallelizable opportunities: there may be ways to squint at
   the problem in just the right way that provides a way to parallelize it:
  - Maybe you can come up with some algebra or approximations that
  allows for associativity, so that different partitions of the data can be
  operated on in parallel.
  - Perhaps the data is a time series where weekly or monthly chunks
  can be summarized in parallel and the sequential logic can be brought up
  several hierarchical levels.
  - Perhaps the statefulness of the visitor has a finite memory of past
  visits that you can take advantage of.
  2. Finding alternatives: it's important to realize that Spark's
   strength is in "big compute" and not in "big memory". It's only 1 of the 13
   dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
   (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
   It's a very successful model, but one that sometimes requires a refactoring
   of the algorithm/data to make it applicable. So if #1 above isn't at all
   possible, you might look into a "big memory" approach, such as Tachyon, or
   memcached, or even just reading a big file sequentially and applying your
   visitor to each data row, depending critically on what bottleneck you are
   engineering against.


--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah  wrote:

>  Hi everyone,
>
>  I have a driver holding a reference to an RDD. The driver would like to
> "visit" each item in the RDD in order, say with a visitor object that
> invokes visit(item) to modify that visitor's internal state. The visiting
> is not commutative (e.g. Visiting item A then B makes a different internal
> state from visiting item B then item A). Items in the RDD also are not
> necessarily distinct.
>
>  I've looked into accumulators which don't work because they require the
> operation to be commutative. Collect() will not work because the RDD is too
> large; in general, bringing the whole RDD into one partition won't work
> since the RDD is too large.
>
>  Is it possible to iterate over the items in an RDD in order without
> bringing the entire dataset into a single JVM at a time, and/or obtain
> chunks of the RDD in order on the driver? We've tried using the internal
> iterator() method. In some cases, we get a stack trace (running locally
> with 3 threads). I've included the stack trace below.
>
>  Thanks,
>
>  -Matt Cheah
>
>  org.apache.spark.SparkException: Error communicating with
> MapOutputTracker
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
> at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
> at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
> at
> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
> at
> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> 

Re: A program that works in local mode but fails in distributed mode

2013-10-16 Thread Christopher Nguyen
Markus, I hear you. Sometimes things just behave the way we don't expect.

http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4018748

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Wed, Oct 16, 2013 at 12:30 PM, Markus Losoi wrote:

> > Markus, I'm guessing at your config which is probably one or more workers
> running on the same node as
> > your master, and the same node as your spark-shell. Which is why you're
> expecting the workers to be
> > able to read the same relative path?
>
> > If that's the case, the reason it doesn't work out as you expect is
> because the workers have
> > different working directories.
>
> Yes, you are in right. The problem was caused by the use of relative paths.
> (I remember starting the worker and the driver program from the same path
> though.) However, the system could have been a bit more elaborate that it
> didn't find the files. Now I got the impression that everything went fine
> until I got unexpected results. Perhaps there was something alarming in the
> flood of log messages.
>
> Best regards,
> Markus Losoi (markus.lo...@gmail.com)
>
>


Re: A program that works in local mode but fails in distributed mode

2013-10-15 Thread Christopher Nguyen
Markus, I'm guessing at your config which is probably one or more workers
running on the same node as your master, and the same node as your
spark-shell. Which is why you're expecting the workers to be able to read
the same relative path?

If that's the case, the reason it doesn't work out as you expect is because
the workers have different working directories.

Instead, this should work:

> val fileSizes = fileList.map(file => new
File("/the/absolute/path/to/spark/data/" + file).length)

This would also be interesting to you:

> val workingDirs = fileList.map(x => System.getProperty("user.dir"))
> workingDirs.collect



--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, Oct 15, 2013 at 11:17 AM, Markus Losoi wrote:

> > You need to put your data files on a distributed file system (e.g.
> hdfs/s3) for the distributed spark
> > to work. Otherwise, the workers cannot read files from a single node.
>
> I'm not sure if it makes a difference, but the same files were in the same
> folder on both the master and the worker. However, next time I'll try with
> HDFS. In fact, I already tried it once but faced a Hadoop configuration
> problem. I should be able to solve this problem though with the plentiful
> amount of Hadoop related material available.
>
> > The reason first works is because for very short actions like first /
> take, Spark alternatively
> > launches the action on the master.
>
>


Re: Drawback of Spark memory model as compared to Hadoop?

2013-10-13 Thread Christopher Nguyen
Howard,

> - node failure?
> - not able to handle if intermediate data > memory size of a node
> - cost

Spark uses recomputation, aka "journaling" to provide resiliency in case of
node failure, thus providing node-level recovery behavior much as Hadoop
MapReduce, except much faster recovery in most cases. Spark is also
designed to allow spill-to-disk if a given node doesn't have enough RAM to
hold its data partitions, thus provides graceful degradation to disk-based
data handling. As for cost, at $5/GB street RAM prices, meaning you can
have up to 1TB RAM for about $5K, memory is becoming a smaller fraction of
total node cost.

If your larger question is "Hadoop MR or Spark?", or more generally,
"disk-based or RAM-based distributed computing?", the correct answer is "It
depends." And the variables "it" depends on are dynamically changing over
time.

A way to think about this is to see that there is a cost-benefit crossover
point for every unique organization/business-use-case combination, before
which disk is preferred, and beyond which, RAM is preferred. For many
Wall-Street mission critical apps, where milliseconds can mean millions of
dollars, many of these crossover points were passed in the mid-2000's. At
Google, a large organization with large datasets and high productivity
($1.2M/employee-year), you can see similar crossovers in the late
2000's/early 2010's (cf. PowerDrill). The general industry is undergoing
similar evaluations.

The next question to ask is "how are the underlying variables changing?"
Consider for example how latencies are evolving across technologies in your
compute path, even as each is getting cheaper per Moore's Law. For RAM
outside the L1/L2 caches, we're in the 60ns regime going down to 30-40ns.
Network latencies are 100us going down to the 10us range. In contrast, disk
latencies have bottomed out at 4-5ms, and the trend of SSD reads is
actually going up from 20us to 30-40us (to get higher densities). You could
do similar projections of bandwidths. Certainly, these storage technologies
have their place, but the point is that whatever your cost-benefit equation
for in-memory vs disk-based use cases is this year, next year it will shift
more in favor of memory, and inexorably so the year after that.

So trends clearly favor in-memory techniques like Spark. These industry
trends have reinforcing positive feedback: as more organizations adopt
in-memory technologies, it will become uncompetitive for laggards to sit on
the sidelines for the same use cases. A final thing to keep in mind is that
having affordable high performance enables use cases that were not at all
possible before, such as interactive data science with huge datasets.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Sat, Oct 12, 2013 at 8:24 PM, howard chen  wrote:

> Hello,
>
> I am new to Spark and have only used Hadoop in the past.
>
> I understand Spark is in memory as compare to Hadoop who use disk for
> intermediate storage. From the practical term, the benefit must be
> performance, but what would be the drawbacks?
>
> e.g.
> - node failure?
> - not able to handle if intermediate data > memory size of a node
> - cost
>
> I would like to hear your experience when using Spark to handle big data,
> and what is the work around in the above cases?
>
> Thanks.
>
>
>


Re: Output to a single directory with multiple files rather multiple directories ?

2013-10-10 Thread Christopher Nguyen
Ramkumar, it sounds like you can consider a file-parallel approach rather
than a strict data-parallel parsing of the problem. In other words,
separate the file copying task from the file parsing task. Have the driver
program D handle the directory scan, which then parallelizes the file list
into N slaves S[1 .. N]. The file contents themselves can be either passed
from driver D to slaves S as (a) a serialized data structure, (b) copied by
the driver D into HDFS, or (c) copied via other distributed filesystem such
as NFS. When the slave processing is complete, it writes the result back
out to HDFS, which is then picked up by D and copied to your desired output
directory structure.

This is admittedly a bit of file copying back and forth over the network,
but if your input structure is some file system, and output structure is
the same, then you'd incur that cost at some point anyway. And if the file
parsing is much more expensive than file transfer, then you do get
significant speed gains in parallelizing the parsing task.

It's also quite conducive to getting to code complete in a hour or less.
KISS.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Oct 10, 2013 at 4:30 PM, Ramkumar Chokkalingam <
ramkumar...@gmail.com> wrote:

> Hey,
>
> Thanks for the mail, Matei. Since, I need to have the output  directory
> structure to be same as the input directory structure with some changes
> made to the content of those files while parsing [ replacing certain fields
> with its encrypted value]. I wouldn't want the union to combine few of the
> input files into a single file.
>
> Is there some API which would treat each file as independent and write to
> a output file ? That would've been great.
>
> If it doesn't work, then I have to write them each to a folder and process
> each of them (using some script) to match my input directory structure.
>
>
>
>