Re: Forcing RDD computation with something else than count() ?

2014-01-22 Thread Reynold Xin
You can also do rdd.foreach(a => Unit) I actually suspect count is even cheaper than this. On Tue, Jan 21, 2014 at 5:05 AM, Guillaume Pitel wrote: > Thanks. So you mean that first() trigger the computation of the WHOLE > RDD ? That does not sound right, I thought it was lazy. > > Guillaume

Re: Lazy evaluation of RDD data transformation

2014-01-22 Thread Reynold Xin
The map computation output is never fully materialized in memory. Internally, it is simply an iterator interface that streams through the input and produces an iterator that can be consumed in a similar streaming fashion. Only when .cache/persist is set on a RDD would result in the content produce

Re: Spark streaming vs. spark usage

2013-12-18 Thread Reynold Xin
On Wed, Dec 18, 2013 at 12:17 PM, Nathan Kronenfeld < nkronenf...@oculusinfo.com> wrote: > > > Since many of the functions exist in parallel between the two, I guess I > would expect something like: > > trait BasicRDDFunctions { > def map... > def reduce... > def filter... > def foreach... > } > >

Re: GroupingComparator in Spark.

2013-12-04 Thread Reynold Xin
Spark's expressiveness allows you to do this fairly easily on your own. sortByKey is implemented in a few lines of code. It would be fairly easy to implement your own sortByKey to do that. Replace the partitioner in sortByKey with a hash partitioner on the key, and then add define a separate way t

Re: suppressing logging in REPL

2013-11-06 Thread Reynold Xin
Are you sure you put the log4j file in the right place? I just tried this with your configuration file, and this is what I see: rxin @ rxin-air : /scratch/rxin/incubator-spark > ./spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/

Re: JdbcRDD usage

2013-11-06 Thread Reynold Xin
The RDD actually takes care of closing the jdbc connection at the end of the iterator. See the code here: https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala#L107 The explicit close you saw in the JDBCSuite is to close the test program's own

Re: Performance drop / unstable in 0.8 release

2013-11-06 Thread Reynold Xin
I don't even think task stealing / speculative execution is turned on by default. Do you know what snapshot version you used for 0.8 previously? On Mon, Nov 4, 2013 at 12:03 PM, Wenlei Xie wrote: > Hi, > > I have some iterative program written in Spark and have been tested under > a snapshot ve

Re: java.io.NotSerializableException on RDD count() in Java

2013-11-03 Thread Reynold Xin
Yea so every inner class actually contains a field referencing the outer class. In your case, the anonymous class DoubleFlatMapFunction actually has a this$0 field referencing the outer class AnalyticsEngine, and thus why Spark is trying to serialize AnalyticsEngine. In the Scala API, the closure

Re: examples of map-side join of two hadoop sequence files

2013-10-21 Thread Reynold Xin
ing HDFS file corresponds to the > partition being iterated upon in mapPartitions? > > Ameet > > > > > On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin wrote: > >> How about the following: >> >> val smallFile = sc.sequenceFile().collect() >> val la

Re: examples of map-side join of two hadoop sequence files

2013-10-20 Thread Reynold Xin
How about the following: val smallFile = sc.sequenceFile().collect() val largeFile = sc.sequenceFile(...) val small = sc.broadcast(smallFile) largeFile.mapPartitions { iter => // build up a hash table for small. called it smallTable iter.filter(row => smallTable.contains(row.joinKey)).map

Shark 0.8.0 release

2013-10-18 Thread Reynold Xin
We are happy to announce Shark 0.8.0, which is a major release the brings many new capabilities and performance improvements. You can download the release here: https://github.com/amplab/shark/releases Shuffle Performance for Large Aggregations and Joins We’ve implemented a new data serialization

Re: Another instance of Derby may have already booted the database /opt/spark/shark/bin/metastore_db.

2013-10-17 Thread Reynold Xin
Yes - you can configure mysql as the metastore in Hive and Shark should pick it up: https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin Make sure you have the mysql connector jar in hive/lib. On Tue, Oct 15, 2013 at 3:05 AM, vinayak navale wrote: > Hi, > > i am getting

Re: A program that works in local mode but fails in distributed mode

2013-10-15 Thread Reynold Xin
You need to put your data files on a distributed file system (e.g. hdfs/s3) for the distributed spark to work. Otherwise, the workers cannot read files from a single node. The reason first works is because for very short actions like first / take, Spark alternatively launches the action on the mas

Re: Spark REPL produces error on a piece of scala code that works in pure Scala REPL

2013-10-11 Thread Reynold Xin
This is a known problem and has to do with peculiarity of the Scala shell: https://groups.google.com/forum/#!searchin/spark-users/error$3A$20type$20mismatch|sort:relevance/spark-users/bwAmbUgxWrA/HwP4Nv4adfEJ On Fri, Oct 11, 2013 at 6:01 PM, Aaron Davidson wrote: > Playing around with this a l

Re: Loss was due to com.esotericsoftware.kryo.KryoException: Buffer overflow.

2013-10-05 Thread Reynold Xin
r the design of this program. If you are just using Spark to get 10G of data to a single node, maybe you can also try run the whole thing on a single node. On Sat, Oct 5, 2013 at 5:05 PM, Ryan Compton wrote: > I have 128g for each node > > On Sat, Oct 5, 2013 at 4:58 PM, Reynold Xin

Re: Loss was due to com.esotericsoftware.kryo.KryoException: Buffer overflow.

2013-10-05 Thread Reynold Xin
You probably shouldn't be collecting a 10g dataset, because that is going to put all the 10g to the driver node ... On Fri, Oct 4, 2013 at 6:53 PM, Ryan Compton wrote: > Some hints: I'm doing collect() on a large (~10g??) dataset. If I > shrink that down, I have no problems. Ive tried > > Syste

Re: Naive Bayes Classifier with mllib

2013-10-04 Thread Reynold Xin
It does not yet support naive bayes. On Friday, October 4, 2013, Aslan Bekirov wrote: > Hi All, > > While I was examining MLBase documentation, I could not see whether it > supports naive bayes classifier or not? > > Does it support naive bayes classifier, if yes , any example will be very > help

Re: Accessing broadcast variables by name

2013-10-02 Thread Reynold Xin
I still don't fully understand your use case, but how about extending SparkContext yourself and add a hash map from string to broadcast variable. Then you can change the broadcast function to return the name? -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Wed, Oct 2, 2013 at 9:

Re: Accessing broadcast variables by name

2013-10-02 Thread Reynold Xin
t; > // On the main method > val mMap = sc.broadcast(getMap(...)) > val bname = mMap.name() > > ... > > // On the external resource > val mMap = sc.broadcastVariable(bname) > > > Thanks, > > Elmer > > > > > > > > > > -- -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org

Re: Wrong result with mapPartitions example

2013-09-27 Thread Reynold Xin
0.172441 s [625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625] -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Thu, Sep 26, 2013 at 10:08 PM, Shangyu Luo wrote: > I can see the test for ParallelCollectionRDD.slice(). > But how to explain the result of m

Re: No access to pairRDDFunctions

2013-09-26 Thread Reynold Xin
You can do a cast val rdd = some RDD[SomeData] rdd.asInstanceOf[RDD[Tuple2[Int, Data]]].reduceByKey(...) It's invariant because of historic reasons I think. It is fairly hard to change it now. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Thu, Sep 26, 2013 at 6:25 AM, H

Re: how to avoid reading the first line of dataframe?

2013-09-24 Thread Reynold Xin
Note that drops all partition's first line. You probably want to use an index to drop only the first partition. i.e. data.mapPartitionsWithIndex { case (iter, index) => if (index == 0) iter.drop(1) else iter } -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Tue, Sep 24, 20

Re: How set how many cpu can be used by spark

2013-09-23 Thread Reynold Xin
Part of it can be the disk cannot keep up with your CPU, and the other is stragglers. Some partitions might be bigger, etc. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Mon, Sep 23, 2013 at 8:50 PM, Xiang Huo wrote: > What I am doing is splitting a large RDD into several sm

Re: A chain of lazy operations starts running tasks

2013-09-23 Thread Reynold Xin
The reason is sortByKey triggers a sample operation to determine the range partitioner. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Mon, Sep 23, 2013 at 5:47 PM, Mahdi Namazifar wrote: > Hi, > > I think I might be missing something but here is what I observe

Re: StackOverflow still after implementing custom serializers when working with large data set

2013-09-23 Thread Reynold Xin
Hi Gary, I am really confused here - what does your custom serializer do? Do you have some data structure that is having a giant nested structure? -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Tue, Sep 17, 2013 at 1:40 PM, Gary Malouf wrote: > We ultimately solved this by putt

Re: RemoteClientError@akka://spark@10.232.35.179:44283: Error[java.net.ConnectException:Connection refused

2013-09-23 Thread Reynold Xin
You will need to look into the worker's log. You can ssh to the worker machine, and look at the work folder in Spark. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Sat, Sep 21, 2013 at 12:30 PM, Shay Seng wrote: > Hey, > I've been struggling to set up a work flow

Re: How set how many cpu can be used by spark

2013-09-23 Thread Reynold Xin
leep(1000); iter }.count And see if you have 20 tasks being launched. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Sun, Sep 22, 2013 at 9:48 PM, Xiang Huo wrote: > Hi all, > > I am trying to run a spark program on a server. It is not a cluster but > only a server. I w

Re: java.lang.OutOfMemoryError: Map failed

2013-09-20 Thread Reynold Xin
It looks like it is actually legitimately running out of memory. What does application job do? -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Fri, Sep 20, 2013 at 11:57 AM, Reynold Xin wrote: > > Maybe you have a single key or a single partition that is extremely large? >

Re: java.lang.OutOfMemoryError: Map failed

2013-09-20 Thread Reynold Xin
Maybe you have a single key or a single partition that is extremely large? Can you try log the gc information to see what else is going on? -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Thu, Sep 19, 2013 at 8:43 AM, Grega Kešpret wrote: > Hi! > > I have a simple job,

Re: N00b alert - spark-ec2 script failure

2013-09-13 Thread Reynold Xin
You will need to do sbt/sbt publish-local in Spark for this to work. The reason is that the spark packages haven't been published to maven yet. They will be when they are released. -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Fri, Sep 13, 2013 at 1:09 PM, Venkat Krishnam

Spark meetup: Spark 0.8 update and Spark at Bizo on Mon Sep 30 in SF

2013-09-13 Thread Reynold Xin
neering sub-teams at Bizo excited about using Spark. Finally I'll cover some common-pitfalls & caveats we've encountered - especially with regards of translating some of our older Hive jobs to Spark & how we go about debugging failed Spark jobs. We would like to thank Tagged for p

Re: File broadcasting

2013-09-11 Thread Reynold Xin
Spark provides an abstraction called broadcast variables. It has multiple underlying implementations, and can be much more convenient that Hadoop distributed cache. http://spark.incubator.apache.org/docs/0.7.3/scala-programming-guide.html#broadcast-variables -- Reynold Xin, AMPLab, UC Berkeley

Re: Bagel programming guide inconsistency

2013-09-10 Thread Reynold Xin
Thanks. Looks like the documentation is outdated. Do you mind filing a jira ticket about this at https://spark-project.atlassian.net/ ? -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Mon, Sep 9, 2013 at 9:18 PM, Matteo Ceccarello < matteo.ceccare...@gmail.com> wrote: > Hi a

Re: Getting the partition position of cached RDD?

2013-09-02 Thread Reynold Xin
cify those constraints. Some semi-pseudocode: class LocalityConstraintRDD[T: ClassManifest](prev: RDD[T], locs: Array[String]) { override def compute = prev.compute _ override def getPreferredLocations(split: Partition): Seq[String] = { List(locs(split.index)) } } -- Reynold X

Re: Getting the partition position of cached RDD?

2013-09-02 Thread Reynold Xin
Does this help you? https://github.com/mesos/spark/pull/832 -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Mon, Sep 2, 2013 at 3:24 PM, Wenlei Xie wrote: > Hi, > > I am wondering if it is possible to get the partition position of cached > RDD? I am asking this because I

Re: Saving compressed sequence files

2013-08-28 Thread Reynold Xin
I don't think it's a system property. There is support for adding compression to the save function in the latest 0.8 code: https://github.com/mesos/spark/blob/master/core/src/main/scala/spark/PairRDDFunctions.scala#L609 You can take a look at how that is done. -- Reynold Xin,

Re: Shark Queries on Streams?

2013-08-27 Thread Reynold Xin
://github.com/amplab/shark/pull/136 -- Reynold Xin, AMPLab, UC Berkeley http://rxin.org On Tue, Aug 27, 2013 at 11:03 AM, Paul Snively wrote: > Hi everyone! > > I'm continuing to investigate the Spark/Shark ecosystem and am fascinated > by the potential. In noticing that I ca

Next Spark user meetup: Shark in Yahoo's Advertising Data Platforms on Tue, Aug 27

2013-08-15 Thread Reynold Xin
ome of our use cases that involve advanced algorithms and how we implement these algorithms on top of Spark and Shark to provide interactive, insightful analytics to our data scientists. You can sign up for the meetup at http://www.meetup.com/spark-users/events/134582432/ -- Reynold Xin,