Re: Shuffle on joining two RDDs

2015-02-13 Thread Imran Rashid
Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran

Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Imran Rashid
unfortunately this is a known issue: https://issues.apache.org/jira/browse/SPARK-1476 as Sean suggested, you need to think of some other way of doing the same thing, even if its just breaking your one big broadcast var into a few smaller ones On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen

Re: Counters in Spark

2015-02-13 Thread Imran Rashid
this is more-or-less the best you can do now, but as has been pointed out, accumulators don't quite fit the bill for counters. There is an open issue to do something better, but no progress on that so far https://issues.apache.org/jira/browse/SPARK-603 On Fri, Feb 13, 2015 at 11:12 AM, Mark

Re: Master dies after program finishes normally

2015-02-12 Thread Imran Rashid
The important thing here is the master's memory, that's where you're getting the GC overhead limit. The master is updating its UI to include your finished app when your app finishes, which would cause a spike in memory usage. I wouldn't expect the master to need a ton of memory just to serve the

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized cached somewhere? eg., if you just did

Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Imran Rashid
You need to import the implicit conversions to PairRDDFunctions with import org.apache.spark.SparkContext._ (note that this requirement will go away in 1.3: https://issues.apache.org/jira/browse/SPARK-4397) On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Hi. I

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
to the mailing list first. On 2015-02-12 16:48, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those

Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Imran Rashid
Hi Michael, judging from the logs, it seems that those tasks are just working a really long time. If you have long running tasks, then you wouldn't expect the driver to output anything while those tasks are working. What is unusual is that there is no activity during all that time the tasks are

Re: Spark SQL taking long time to print records from a table

2015-02-04 Thread Imran Rashid
Many operations in spark are lazy -- most likely your collect() statement is actually forcing evaluation of severals steps earlier in the pipeline. The logs the UI might give you some info about all the stages that are being run when you get to collect(). I think collect() is just fine if you

Re: 2GB limit for partitions?

2015-02-04 Thread Imran Rashid
used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work

Re: Sort based shuffle not working properly?

2015-02-04 Thread Imran Rashid
I think you are interested in secondary sort, which is still being worked on: https://issues.apache.org/jira/browse/SPARK-3655 On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same

Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Imran Rashid
I'm not an expert on streaming, but I think you can't do anything like this right now. It seems like a very sensible use case, though, so I've created a jira for it: https://issues.apache.org/jira/browse/SPARK-5467 On Wed, Jan 28, 2015 at 8:54 AM, YaoPau jonrgr...@gmail.com wrote: The

Re: Aggregations based on sort order

2015-01-23 Thread Imran Rashid
I'm not sure about this, but I suspect the answer is: spark doesn't guarantee a stable sort, nor does it plan to in the future, so the implementation has more flexibility. But you might be interested in the work being done on secondary sort, which could give you the guarantees you want:

Re: reading a csv dynamically

2015-01-22 Thread Imran Rashid
Spark can definitely process data with optional fields. It kinda depends on what you want to do with the results -- its more of a object design / knowing scala types question. Eg., scala has a built in type Option specifically for handling optional data, which works nicely in pattern matching

Re: sparkcontext.objectFile return thousands of partitions

2015-01-22 Thread Imran Rashid
I think you should also just be able to provide an input format that never splits the input data. This has come up before on the list, but I couldn't find it.* I think this should work, but I can't try it out at the moment. Can you please try and let us know if it works? class

Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?

2015-01-16 Thread Imran Rashid
I'm not positive, but I think this is very unlikely to work. First, when you call sc.objectFile(...), I think the *driver* will need to know something about the file, eg to know how many tasks to create. But it won't even be able to see the file, since it only lives on the local filesystem of

Re: Accumulators

2015-01-15 Thread Imran Rashid
You're understanding is basically correct. Each task creates it's own local accumulator, and just those results get merged together. However, there are some performance limitations to be aware of. First you need enough memory on the executors to build up whatever those intermediate results are.

Re: Reading one partition at a time

2015-01-13 Thread Imran Rashid
this looks reasonable to me. As you've done, the important thing is just to make isSplittable return false. this shares a bit in common with the sc.wholeTextFile method. It sounds like you really want something much simpler than what that is doing, but you might be interested in looking at that

Re: Multiple Filter Effiency

2014-12-16 Thread Imran Rashid
I think accumulators do exactly what you want. (Scala syntax below, I'm just not familiar with the Java equivalent ...) val f1counts = sc.accumulator (0) val f2counts = sc.accumulator (0) val f3counts = sc.accumulator (0) textfile.foreach { s = if(f1matches) f1counts += 1 ... } Note that

Re: NumberFormatException

2014-12-16 Thread Imran Rashid
wow, really weird. My intuition is the same as everyone else's, some unprintable character. Here's a couple more debugging tricks I've used in the past: //set up an accumulator to catch the bad rows as a side-effect val nBadRows = sc.accumulator(0) val nGoodRows = sc.accumulator(0) val badRows

Re: what is the best way to implement mini batches?

2014-12-15 Thread Imran Rashid
I'm a little confused by some of the responses. It seems like there are two different issues being discussed here: 1. How to turn a sequential algorithm into something that works on spark. Eg deal with the fact that data is split into partitions which are processed in parallel (though within a

Re: what is the best way to implement mini batches?

2014-12-11 Thread Imran Rashid
Minor correction: I think you want iterator.grouped(10) for non-overlapping mini batches On Dec 11, 2014 1:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can just do mapPartitions on the whole RDD, and then called sliding() on the iterator in each one to get a sliding window. One

Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-05 Thread Imran Rashid
It's an easy mistake to make... I wonder if an assertion could be implemented that makes sure the type parameter is present. We could use the NotNothing pattern http://blog.evilmonkeylabs.com/2012/05/31/Forcing_Compiler_Nothing_checks/ but I wonder if it would just make the method signature

Re: optimize multiple filter operations

2014-11-29 Thread Imran Rashid
Rishi's approach will work, but its worth mentioning that because all of the data goes into only two groups, you will only process the resulting data with two tasks and so you're losing almost all parallelism. Presumably you're processing a lot of data, since you only want to do one pass, so I

Re: Percentile

2014-11-29 Thread Imran Rashid
Hi Franco, As a fast approximate way to get probability distributions, you might be interested in t-digests: https://github.com/tdunning/t-digest In one pass, you could make a t-digest for each variable, to get its distribution. And after that, you could make another pass to map each data

Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
Hi Art, I have some advice that isn't spark-specific at all, so it doesn't *exactly* address your questions, but you might still find helpful. I think using an implicit to add your retyring behavior might be useful. I can think of two options: 1. enriching RDD itself, eg. to add a

Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
on input 0, try 2 with java.lang.ArithmeticException: / by zero failed on input 0, try 3 with java.lang.ArithmeticException: / by zero 1 --- 10 2 --- 5 On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote: Hi Art, I have some advice that isn't spark-specific at all, so

<    1   2