[ANNOUNCE] Announcing Spark 1.5.1

2015-10-01 Thread Reynold Xin
Hi All,

Spark 1.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-1.5 maintenance branch of Spark. We
*strongly recommend* all 1.5.0 users to upgrade to this release.

The full list of bug fixes is here: http://s.apache.org/spark-1.5.1

http://spark.apache.org/releases/spark-release-1-5-1.html


(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links, but everything should be in maven
central already)


Re: Null Value in DecimalType column of DataFrame

2015-09-21 Thread Reynold Xin
+dev list

Hi Dirceu,

The answer to whether throwing an exception is better or null is better
depends on your use case. If you are debugging and want to find bugs with
your program, you might prefer throwing an exception. However, if you are
running on a large real-world dataset (i.e. data is dirty) and your query
can take a while (e.g. 30 mins), you then might prefer the system to just
assign null values to the dirty data that could lead to runtime exceptions,
because otherwise you could be spending days just to clean your data.

Postgres throws exceptions here, but I think that's mainly because it is
used for OLTP, and in those cases queries are short-running. Most other
analytic databases I believe just return null. The best we can do is to
provide a config option to indicate behavior for exception handling.


On Fri, Sep 18, 2015 at 8:15 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Yin, I got that part.
> I just think that instead of returning null, throwing an exception would
> be better. In the exception message we can explain that the DecimalType
> used can't fit the number that is been converted due to the precision and
> scale values used to create it.
> It would be easier for the user to find the reason why that error is
> happening, instead of receiving an NullPointerException in another part of
> his code. We can also make a better documentation of DecimalType classes to
> explain this behavior, what do you think?
>
>
>
>
> 2015-09-17 18:52 GMT-03:00 Yin Huai :
>
>> As I mentioned before, the range of values of DecimalType(10, 10) is [0,
>> 1). If you have a value 10.5 and you want to cast it to DecimalType(10,
>> 10), I do not think there is any better returned value except of null.
>> Looks like DecimalType(10, 10) is not the right type for your use case. You
>> need a decimal type that has precision - scale >= 2.
>>
>> On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>>>
>>> Hi Yin, posted here because I think it's a bug.
>>> So, it will return null and I can get a nullpointerexception, as I was
>>> getting. Is this really the expected behavior? Never seen something
>>> returning null in other Scala tools that I used.
>>>
>>> Regards,
>>>
>>>
>>> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>>>
 btw, move it to user list.

 On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:

> A scale of 10 means that there are 10 digits at the right of the
> decimal point. If you also have precision 10, the range of your data will
> be [0, 1) and casting "10.5" to DecimalType(10, 10) will return null, 
> which
> is expected.
>
> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
>> It seems that there was some changes in org.apache.spark.sql.types.
>> DecimalType
>>
>> This ugly code is a little sample to reproduce the error, don't use
>> it into your project.
>>
>> test("spark test") {
>>   val file = 
>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
>> Row.fromSeq({
>> val values = f.split(",")
>> 
>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
>> values.tail.tail.tail.head)}))
>>
>>   val structType = StructType(Seq(StructField("id", IntegerType, false),
>> StructField("int2", IntegerType, false), StructField("double",
>>
>>  DecimalType(10,10), false),
>>
>>
>> StructField("str2", StringType, false)))
>>
>>   val df = context.sqlContext.createDataFrame(file,structType)
>>   df.first
>> }
>>
>> The content of the file is:
>>
>> 1,5,10.5,va
>> 2,1,0.1,vb
>> 3,8,10.0,vc
>>
>> The problem resides in DecimalType, before 1.5 the scala wasn't
>> required. Now when using  DecimalType(12,10) it works fine, but
>> using DecimalType(10,10) the Decimal values
>> 10.5 became null, and the 0.1 works.
>>
>> Is there anybody working with DecimalType for 1.5.1?
>>
>> Regards,
>> Dirceu
>>
>>
>

>>>
>>>
>>
>


Re: in joins, does one side stream?

2015-09-20 Thread Reynold Xin
We do - but I don't think it is feasible to duplicate every single
algorithm in DF and in RDD.

The only way for this to work is to make one underlying implementation work
for both. Right now DataFrame knows how to serialize individual elements
well and can manage memory that way -- the RDD API doesn't give us enough
information for that.

https://issues.apache.org/jira/browse/SPARK-




On Sun, Sep 20, 2015 at 11:48 AM, Koert Kuipers  wrote:

> sorry that was a typo. i meant to say:
>
> why do we have these features (broadcast join and sort-merge join) in
> DataFrame but not in RDD?
>
> they don't seem specific to structured data analysis to me.
>
> thanks! koert
>
> On Sun, Sep 20, 2015 at 2:46 PM, Koert Kuipers  wrote:
>
>> why dont we want these (broadcast join and sort-merge join) in DataFrame
>> but not in RDD?
>>
>> they dont seem specific to structured data analysis to me.
>>
>> On Sun, Sep 20, 2015 at 2:41 AM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> Got it..thnx Reynold..
>>> On 20 Sep 2015 07:08, "Reynold Xin"  wrote:
>>>
>>>> The RDDs themselves are not materialized, but the implementations can
>>>> materialize.
>>>>
>>>> E.g. in cogroup (which is used by RDD.join), it materializes all the
>>>> data during grouping.
>>>>
>>>> In SQL/DataFrame join, depending on the join:
>>>>
>>>> 1. For broadcast join, only the smaller side is materialized in memory
>>>> as a hash table.
>>>>
>>>> 2. For sort-merge join, both sides are sorted & streamed through --
>>>> however, one of the sides need to buffer all the rows having the same join
>>>> key in order to perform the join.
>>>>
>>>>
>>>>
>>>> On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra <
>>>> rishi80.mis...@gmail.com> wrote:
>>>>
>>>>> Hi Reynold,
>>>>> Can you please elaborate on this. I thought RDD also opens only an
>>>>> iterator. Does it get materialized for joins?
>>>>>
>>>>> Rishi
>>>>>
>>>>> On Saturday, September 19, 2015, Reynold Xin 
>>>>> wrote:
>>>>>
>>>>>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>>>>>> streams.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers 
>>>>>> wrote:
>>>>>>
>>>>>>> in scalding we join with the smaller side on the left, since the
>>>>>>> smaller side will get buffered while the bigger side streams through the
>>>>>>> join.
>>>>>>>
>>>>>>> looking at CoGroupedRDD i do not get the impression such a
>>>>>>> distiction is made. it seems both sided are put into a map that can 
>>>>>>> spill
>>>>>>> to disk. is this correct?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
>


Re: in joins, does one side stream?

2015-09-19 Thread Reynold Xin
The RDDs themselves are not materialized, but the implementations can
materialize.

E.g. in cogroup (which is used by RDD.join), it materializes all the data
during grouping.

In SQL/DataFrame join, depending on the join:

1. For broadcast join, only the smaller side is materialized in memory as a
hash table.

2. For sort-merge join, both sides are sorted & streamed through --
however, one of the sides need to buffer all the rows having the same join
key in order to perform the join.



On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra  wrote:

> Hi Reynold,
> Can you please elaborate on this. I thought RDD also opens only an
> iterator. Does it get materialized for joins?
>
> Rishi
>
> On Saturday, September 19, 2015, Reynold Xin  wrote:
>
>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>> streams.
>>
>>
>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers 
>> wrote:
>>
>>> in scalding we join with the smaller side on the left, since the smaller
>>> side will get buffered while the bigger side streams through the join.
>>>
>>> looking at CoGroupedRDD i do not get the impression such a distiction is
>>> made. it seems both sided are put into a map that can spill to disk. is
>>> this correct?
>>>
>>> thanks
>>>
>>
>>


Re: in joins, does one side stream?

2015-09-18 Thread Reynold Xin
Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
streams.


On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers  wrote:

> in scalding we join with the smaller side on the left, since the smaller
> side will get buffered while the bigger side streams through the join.
>
> looking at CoGroupedRDD i do not get the impression such a distiction is
> made. it seems both sided are put into a map that can spill to disk. is
> this correct?
>
> thanks
>


Re: How to avoid shuffle errors for a large join ?

2015-09-16 Thread Reynold Xin
Only SQL and DataFrame for now.

We are thinking about how to apply that to a more general distributed
collection based API, but it's not in 1.5.

On Sat, Sep 5, 2015 at 11:56 AM, Gurvinder Singh  wrote:

> On 09/05/2015 11:22 AM, Reynold Xin wrote:
> > Try increase the shuffle memory fraction (by default it is only 16%).
> > Again, if you run Spark 1.5, this will probably run a lot faster,
> > especially if you increase the shuffle memory fraction ...
> Hi Reynold,
>
> Does the 1.5 has better join/cogroup performance for RDD case too or
> only for SQL.
>
> - Gurvinder
> >
> > On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  > <mailto:tom...@gmail.com>> wrote:
> >
> > While it works with sort-merge-join, it takes about 12h to finish
> > (with 1 shuffle partitions). My hunch is that the reason for
> > that is this:
> >
> > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB
> > to disk (62 times so far)
> >
> > (and lots more where this comes from).
> >
> > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  > <mailto:r...@databricks.com>> wrote:
> >
> > Can you try 1.5? This should work much, much better in 1.5 out
> > of the box.
> >
> > For 1.4, I think you'd want to turn on sort-merge-join, which is
> > off by default. However, the sort-merge join in 1.4 can still
> > trigger a lot of garbage, making it slower. SMJ performance is
> > probably 5x - 1000x better in 1.5 for your case.
> >
> >
> > On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak
> > mailto:tom...@gmail.com>> wrote:
> >
> > I'm getting errors like "Removing executor with no recent
> > heartbeats" & "Missing an output location for shuffle"
> > errors for a large SparkSql join (1bn rows/2.5TB joined with
> > 1bn rows/30GB) and I'm not sure how to configure the job to
> > avoid them.
> >
> > The initial stage completes fine with some 30k tasks on a
> > cluster with 70 machines/10TB memory, generating about 6.5TB
> > of shuffle writes, but then the shuffle stage first waits
> > 30min in the scheduling phase according to the UI, and then
> > dies with the mentioned errors.
> >
> > I can see in the GC logs that the executors reach their
> > memory limits (32g per executor, 2 workers per machine) and
> > can't allocate any more stuff in the heap. Fwiw, the top 10
> > in the memory use histogram are:
> >
> > num #instances #bytes  class name
> > --
> >1: 24913959511958700560
> >  scala.collection.immutable.HashMap$HashMap1
> >2: 251085327 8034730464 
> >  scala.Tuple2
> >3: 243694737 5848673688  java.lang.Float
> >4: 231198778 5548770672  java.lang.Integer
> >5:  72191585 4298521576
> >  [Lscala.collection.immutable.HashMap;
> >6:  72191582 2310130624
> >  scala.collection.immutable.HashMap$HashTrieMap
> >7:  74114058 1778737392  java.lang.Long
> >8:   6059103  779203840  [Ljava.lang.Object;
> >9:   5461096  174755072
> >  scala.collection.mutable.ArrayBuffer
> >   10: 34749   70122104  [B
> >
> > Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
> >
> > spark.core.connection.ack.wait.timeout 600
> > spark.executor.heartbeatInterval   60s
> > spark.executor.memory  32g
> > spark.mesos.coarse false
> > spark.network.timeout  600s
> > spark.shuffle.blockTransferService netty
> > spark.shuffle.consolidateFiles true
> > spark.shuffle.file.buffer  1m
> > spark.shuffle.io.maxRetries6
> > spark.shuffle.manager  sort
> >
> > The join is currently configured with
> > spark.sql.shuffle.partitions=1000 but that doesn't seem to
> > help. Would increasing the partitions help ? Is there a
> > formula to determine an approximate partitions number value
> > for a join ?
> > Any help with this job would be appreciated !
> >
> > cheers,
> > Tom
> >
> >
> >
> >
>
>


Re: Perf impact of BlockManager byte[] copies

2015-09-10 Thread Reynold Xin
This is one problem I'd like to address soon - providing a binary block
management interface for shuffle (and maybe other things) that avoids
serialization/copying.


On Fri, Feb 27, 2015 at 3:39 PM, Paul Wais  wrote:

> Dear List,
>
> I'm investigating some problems related to native code integration
> with Spark, and while picking through BlockManager I noticed that data
> (de)serialization currently issues lots of array copies.
> Specifically:
>
> - Deserialization: BlockManager marshals all deserialized bytes
> through a spark.util. ByteBufferInputStream, which necessitates
> copying data into an intermediate temporary byte[] .  The temporary
> byte[] might be reused between deserialization of T instances, but
> nevertheless the bytes must be copied (and likely in a Java loop).
>
> - Serialization: BlockManager buffers all serialized bytes into a
> java.io.ByteArrayOutputStream, which maintains an internal byte[]
> buffer and grows/re-copies the buffer like a vector as the buffer
> fills.  BlockManager then retrieves the internal byte[] buffer, wraps
> it in a ByteBuffer, and sends it off to be stored (e.g. in
> MemoryStore, DiskStore, Tachyon, etc).
>
> When an individual T is somewhat large (e.g. a feature vector, an
> image, etc), or blocks are megabytes in size, these copies become
> expensive (especially for large written blocks), right?  Does anybody
> have any measurements of /how/ expensive they are?  If not, is there
> serialization benchmark code (e.g. for KryoSerializer ) that might be
> helpful here?
>
>
> As part of my investigation, I've found that one might be able to
> sidestep these issues by extending Spark's SerializerInstance API to
> offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
> extension including a ByteBuffer API would furthermore have many
> benefits for native code.  A major downside of this API addition is
> that it wouldn't interoperate (nontrivially) with compression, so
> shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
> deduce when use of this ByteBuffer API is possible and leverage it.
>
> Cheers,
> -Paul
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
It's likely that with codegen, you need a bigger permgen space. Also I
found that Java 7 doesn't do very well w.r.t. flushing code cache. As a
result, Spark SQL and DataFrames now run much better under Java 8, because
it flushes code cache better.


On Wed, Sep 9, 2015 at 2:12 PM, Sandy Ryza  wrote:

> Java 7.
>
> FWIW I was just able to get it to work by increasing MaxPermSize to 256m.
>
> -Sandy
>
> On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin  wrote:
>
>> Java 7 / 8?
>>
>> On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza 
>> wrote:
>>
>>> I just upgraded the spark-timeseries
>>> <https://github.com/cloudera/spark-timeseries> project to run on top of
>>> 1.5, and I'm noticing that tests are failing with OOMEs.
>>>
>>> I ran a jmap -histo on the process and discovered the top heap items to
>>> be:
>>>1:163428   22236064  
>>>2:163428   21112648  
>>>3: 12638   14459192  
>>>4: 12638   13455904  
>>>5: 105397642528  
>>>
>>> Not sure whether this is suspicious.  Any ideas?
>>>
>>> -Sandy
>>>
>>
>>
>


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
Java 7 / 8?

On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza  wrote:

> I just upgraded the spark-timeseries
>  project to run on top of
> 1.5, and I'm noticing that tests are failing with OOMEs.
>
> I ran a jmap -histo on the process and discovered the top heap items to be:
>1:163428   22236064  
>2:163428   21112648  
>3: 12638   14459192  
>4: 12638   13455904  
>5: 105397642528  
>
> Not sure whether this is suspicious.  Any ideas?
>
> -Sandy
>


[ANNOUNCE] Announcing Spark 1.5.0

2015-09-09 Thread Reynold Xin
Hi All,

Spark 1.5.0 is the sixth release on the 1.x line. This release represents
1400+ patches from 230+ contributors and 80+ institutions. To download
Spark 1.5.0 visit the downloads page.

A huge thanks go to all of the individuals and organizations involved in
development and testing of this release.

Visit the release notes [1] to read about the new features, or download [2]
the release today.

For errata in the contributions or release notes, please e-mail me
*directly* (not on-list).

Thanks to everyone who helped work on this release!

[1] http://spark.apache.org/releases/spark-release-1-5-0.html
[2] http://spark.apache.org/downloads.html


Re: Best way to import data from Oracle to Spark?

2015-09-09 Thread Reynold Xin
Using the JDBC data source is probably the best way.
http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#jdbc-to-other-databases

On Tue, Sep 8, 2015 at 10:11 AM, Cui Lin  wrote:

> What's the best way to import data from Oracle to Spark? Thanks!
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-07 Thread Reynold Xin
On Wed, Sep 2, 2015 at 12:03 AM, Anders Arpteg  wrote:

>
> BTW, is it possible (or will it be) to use Tungsten with dynamic
> allocation and the external shuffle manager?
>
>
Yes - I think this already works. There isn't anything specific here
related to Tungsten.


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Reynold Xin
Try increase the shuffle memory fraction (by default it is only 16%).
Again, if you run Spark 1.5, this will probably run a lot faster,
especially if you increase the shuffle memory fraction ...

On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak  wrote:

> While it works with sort-merge-join, it takes about 12h to finish (with
> 1 shuffle partitions). My hunch is that the reason for that is this:
>
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to
> disk (62 times so far)
>
> (and lots more where this comes from).
>
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin  wrote:
>
>> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>>
>> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
>> default. However, the sort-merge join in 1.4 can still trigger a lot of
>> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
>> 1.5 for your case.
>>
>>
>> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak  wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num #instances #bytes  class name
>>> --
>>>1: 24913959511958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>2: 251085327 8034730464  scala.Tuple2
>>>3: 243694737 5848673688  java.lang.Float
>>>4: 231198778 5548770672  java.lang.Integer
>>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>>6:  72191582 2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>7:  74114058 1778737392  java.lang.Long
>>>8:   6059103  779203840  [Ljava.lang.Object;
>>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>>   10: 34749   70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval   60s
>>> spark.executor.memory  32g
>>> spark.mesos.coarse false
>>> spark.network.timeout  600s
>>> spark.shuffle.blockTransferService netty
>>> spark.shuffle.consolidateFiles true
>>> spark.shuffle.file.buffer  1m
>>> spark.shuffle.io.maxRetries6
>>> spark.shuffle.manager  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>>
>


Re: How to avoid shuffle errors for a large join ?

2015-08-29 Thread Reynold Xin
Can you try 1.5? This should work much, much better in 1.5 out of the box.

For 1.4, I think you'd want to turn on sort-merge-join, which is off by
default. However, the sort-merge join in 1.4 can still trigger a lot of
garbage, making it slower. SMJ performance is probably 5x - 1000x better in
1.5 for your case.


On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak  wrote:

> I'm getting errors like "Removing executor with no recent heartbeats" &
> "Missing an output location for shuffle" errors for a large SparkSql join
> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
> configure the job to avoid them.
>
> The initial stage completes fine with some 30k tasks on a cluster with 70
> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
> the shuffle stage first waits 30min in the scheduling phase according to
> the UI, and then dies with the mentioned errors.
>
> I can see in the GC logs that the executors reach their memory limits (32g
> per executor, 2 workers per machine) and can't allocate any more stuff in
> the heap. Fwiw, the top 10 in the memory use histogram are:
>
> num #instances #bytes  class name
> --
>1: 24913959511958700560
>  scala.collection.immutable.HashMap$HashMap1
>2: 251085327 8034730464  scala.Tuple2
>3: 243694737 5848673688  java.lang.Float
>4: 231198778 5548770672  java.lang.Integer
>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>6:  72191582 2310130624
>  scala.collection.immutable.HashMap$HashTrieMap
>7:  74114058 1778737392  java.lang.Long
>8:   6059103  779203840  [Ljava.lang.Object;
>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>   10: 34749   70122104  [B
>
> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>
> spark.core.connection.ack.wait.timeout 600
> spark.executor.heartbeatInterval   60s
> spark.executor.memory  32g
> spark.mesos.coarse false
> spark.network.timeout  600s
> spark.shuffle.blockTransferService netty
> spark.shuffle.consolidateFiles true
> spark.shuffle.file.buffer  1m
> spark.shuffle.io.maxRetries6
> spark.shuffle.manager  sort
>
> The join is currently configured with spark.sql.shuffle.partitions=1000
> but that doesn't seem to help. Would increasing the partitions help ? Is
> there a formula to determine an approximate partitions number value for a
> join ?
> Any help with this job would be appreciated !
>
> cheers,
> Tom
>


Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Reynold Xin
You've probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-7180

It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to
false and see if it goes away.


On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov 
wrote:

> Hi,
>
> I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
> trying to save my data frame to parquet.
> The issue I'm stuck looks like serialization tries to do pretty weird
> thing: tries to write to an empty array.
>
> The last (through stack trace) line of spark code that leads to exception
> is in method SerializationDebugger.visitSerializable(o: Object, stack:
> List[String]): List[String].
> desc.getObjFieldValues(finalObj, objFieldValues)
>
> The reason it does so, is because finalObj is
> org.apache.spark.sql.execution.Project and objFieldValues is an empty
> array! As a result there are two fields to read from the Project instance
> object (happens in java.io.ObjectStreamClass), but there is an empty array
> to read into.
>
> A little bit of code with debug info:
> private def visitSerializable(o: Object, stack: List[String]):
> List[String] = {
> val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: "Project",
> "desc: "org.apache.spark.sql.execution.Project"
>   val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
> SparkPlan, 1: Project]
>   var i = 0 //i: 0
>   while (i < slotDescs.length) {
> val slotDesc = slotDescs(i) //slotDesc:
> "org.apache.spark.sql.execution.SparkPlan"
> if (slotDesc.hasWriteObjectMethod) {
>   // TODO: Handle classes that specify writeObject method.
> } else {
>   val fields: Array[ObjectStreamField] = slotDesc.getFields
> //fields: java.io.ObjectStreamField[1] [0: "Z codegenEnabled"]
>   val objFieldValues: Array[Object] = new
> Array[Object](slotDesc.getNumObjFields) //objFieldValues:
> java.lang.Object[0]
>   val numPrims = fields.length - objFieldValues.length //numPrims:
> 1
>   desc.getObjFieldValues(finalObj, objFieldValues) //leads to
> exception
>
> So it looks like it gets objFieldValues array from the SparkPlan object,
> but uses it to receive values from Project object.
>
> Here is the schema of my data frame
> root
>  |-- Id: long (nullable = true)
>  |-- explodes: struct (nullable = true)
>  ||-- Identifiers: array (nullable = true)
>  |||-- element: struct (containsNull = true)
>  ||||-- Type: array (nullable = true)
>  |||||-- element: string (containsNull = true)
>  |-- Identifiers: struct (nullable = true)
>  ||-- Type: array (nullable = true)
>  |||-- element: string (containsNull = true)
>  |-- Type2: string (nullable = true)
>  |-- Type: string (nullable = true)
>
> Actual stack trace is:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
> at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
> at
> com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
> at com.intellij.rt.exe

Re: Tungsten and sun.misc.Unsafe

2015-08-21 Thread Reynold Xin
I'm actually somewhat involved with the Google Docs you linked to.

I don't think Oracle will remove Unsafe in JVM 9. As you said, JEP 260
already proposes making Unsafe available. Given the widespread use of
Unsafe for performance and advanced functionalities, I don't think Oracle
can just remove it in one release. If they do, there will be strong
backlash and the act would significantly undermine the credibility of the
JVM as a long-term platform.

Note that for Spark itself, we move pretty fast and can replace all the use
of Unsafe with a newer alternative in one release if absolutely necessary
(the actual coding takes only a day or two).



On Fri, Aug 21, 2015 at 5:29 AM, Marek Kolodziej 
wrote:

> Hello,
>
> I attended the Tungsten-related presentations at Spark Summit (by Josh
> Rosen) and at Big Data Scala (by Matei Zaharia). Needless to say, this
> project holds great promise for major performance improvements.
>
> At Josh's talk, I heard about the use of sun.misc.Unsafe as a way of
> achieving some of these optimizations (e.g. slides 11-17 of Josh's
> presentation:
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen).
> I have no problems with the use of Unsafe in the code itself (I've done it
> before myself, too), however I think there is a considerable risk
> associated with beginning the use of Unsafe now, because Oracle is
> determined to limit access to APIs such as Unsafe starting in Java 9.
>
> JEP 260  was filed specifically to
> limit access to internal JDK APIs that were "never intended for external
> use, including "sun.misc.*" The JEP does say that the functionality of
> sun.misc.Unsafe is to remain available even as other internal APIs are
> blocked for non-JDK use, however, it also says that "the functionality of
> many methods of this class is now available via *variable handles (JEP
> 193 ).*" If the direct access to
> sun.misc.Unsafe is blocked and only the variable handles access remains,
> this may mean more than just a need for code refactoring - functionality
> such as doing "malloc" from Spark core may be restricted.
>
> JEP 260 has evolved quite a bit over time and the wording available now
> (after the Aug. 4, 2015) seems more reasonable than before. Nevertheless,
> Hazelcast and other companies whose technologies depend on the availability
> of Unsafe started a Google doc here
> 
> .
>
> I doubt that Oracle would want to make life difficult for everyone. In
> addition to Spark's code base, projects such as Akka, Cassandra, Hibernate,
> Netty, Neo4j and Spring (among many others) depend on Unsafe. Still, there
> are tons of posts about this issue in the Java community (e.g. here
> 's a
> Hazelcast interview, also from Aug. 3, the day before the latest update to
> JEP 260). There are tons of concerned posts on the blogosphere, too (e.g.
> here
> 
> ).
>
> Have the leaders of the Spark community been following these
> Unsafe-related developments and if so, what's Spark's plan of handling
> whatever Oracle throws our way?
>
> Marek
>


Re: Memory allocation error with Spark 1.5

2015-08-05 Thread Reynold Xin
In Spark 1.5, we have a new way to manage memory (part of Project
Tungsten). The default unit of memory allocation is 64MB, which is way too
high when you have 1G of memory allocated in total and have more than 4
threads.

We will reduce the default page size before releasing 1.5.  For now, you
can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m).

https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125

On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin 
wrote:

> Hi,
>
> I'm receiving a memory allocation error with a recent build of Spark 1.5:
>
> java.io.IOException: Unable to acquire 67108864 bytes of memory
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
> at
> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
> at
> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)
>
>
> The issue appears when joining 2 datasets. One with 6084 records, the
> other one with 200 records. I'm expecting to receive 200 records in the
> result.
>
> I'm using a homemade build prepared from "branch-1.5" with commit ID
> "eedb996". I have run "mvn -DskipTests clean install" to generate that
> build.
>
> Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.
>
> I've prepared a test case that can be built and executed very easily (data
> files are included in the repo):
> https://github.com/aseigneurin/spark-testcase
>
> One thing to note is that the issue arises when the master is set to
> "local[*]" but not when set to "local". Both options work without problem
> with Spark 1.4, though.
>
> Any help will be greatly appreciated!
>
> Many thanks,
> Alexis
>


Re: Grouping runs of elements in a RDD

2015-06-30 Thread Reynold Xin
Try mapPartitions, which gives you an iterator, and you can produce an
iterator back.


On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling  wrote:

> Hi all,
>
> I have a problem where I have a RDD of elements:
>
> Item1 Item2 Item3 Item4 Item5 Item6 ...
>
> and I want to run a function over them to decide which runs of elements to
> group together:
>
> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>
> Technically, I could use aggregate to do this, but I would have to use a
> List of List of T which would produce a very large collection in memory.
>
> Is there an easy way to accomplish this?  e.g.,, it would be nice to have
> a version of aggregate where the combination function can return a complete
> group that is added to the new RDD and an incomplete group which is passed
> to the next call of the reduce function.
>
> Thanks,
> RJ
>


Re: Building scaladoc using "build/sbt unidoc" failure

2015-06-12 Thread Reynold Xin
Try build/sbt clean first.


On Tue, May 26, 2015 at 4:45 PM, Justin Yip  wrote:

> Hello,
>
> I am trying to build scala doc from the 1.4 branch. But it failed due to
> [error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
> List(object package$DebugNode, object package$DebugNode)
>
> I followed the instruction on github
>  and used the
> following command:
>
> $ build/sbt unidoc
>
> Please see attachment for detailed error. Did I miss anything?
>
> Thanks.
>
> Justin
>
>
> *unidoc_error.txt* (30K) Download Attachment
> 
>
> --
> View this message in context: Building scaladoc using "build/sbt unidoc"
> failure
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Exception when using CLUSTER BY or ORDER BY

2015-06-12 Thread Reynold Xin
Tom,

Can you file a JIRA and attach a small reproducible test case if possible?


On Tue, May 19, 2015 at 1:50 PM, Thomas Dudziak  wrote:

> Under certain circumstances that I haven't yet been able to isolate, I get
> the following error when doing a HQL query using HiveContext (Spark 1.3.1
> on Mesos, fine-grained mode). Is this a known problem or should I file a
> JIRA for it ?
>
>
> org.apache.spark.SparkException: Can only zip RDDs with same number of 
> elements in each partition
>   at 
> org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
>   at 
> org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
>   at 
> org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
>   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Reynold Xin
I'm not sure if it is possible to overload the map function twice, once for
just KV pairs, and another for K and V separately.


On Fri, May 22, 2015 at 10:26 AM, Justin Pihony 
wrote:

> This ticket  improved
> the RDD API, but it could be even more discoverable if made available via
> the API directly. I assume this was originally an omission that now needs
> to be kept for backwards compatibility, but would any of the repo owners be
> open to making this more discoverable to the point of API docs and tab
> completion (while keeping both binary and source compatibility)?
>
>
> class PairRDD extends RDD{
>   pair methods
> }
>
> RDD{
>   def map[K: ClassTag, V: ClassTag](f: T => (K,V)):PairRDD[K,V]
> }
>
> As long as the implicits remain, then compatibility remains, but now it is
> explicit in the docs on how to get a PairRDD and in tab completion.
>
> Thoughts?
>
> Justin Pihony
>


Re: rdd.sample() methods very slow

2015-05-21 Thread Reynold Xin
You can do something like this:

val myRdd = ...

val rddSampledByPartition = PartitionPruningRDD.create(myRdd, i =>
Random.nextDouble() < 0.1)  // this samples 10% of the partitions

rddSampledByPartition.mapPartitions { iter => iter.take(10) }  // take the
first 10 elements out of each partition



On Thu, May 21, 2015 at 11:36 AM, Sean Owen  wrote:

> If sampling whole partitions is sufficient (or a part of a partition),
> sure you could mapPartitionsWithIndex and decide whether to process a
> partition at all based on its # and skip the rest. That's much faster.
>
> On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV)
>  wrote:
> > I don't need to be 100% randome. How about randomly pick a few
> partitions and return all docs in those partitions? Is
> > rdd.mapPartitionsWithIndex() the right method to use to just process a
> small portion of partitions?
> >
> > Ningjun
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame Column Alias problem

2015-05-21 Thread Reynold Xin
In 1.4 it actually shows col1 by default.

In 1.3, you can add "col1" to the output, i.e.

df.groupBy($"col1").agg($"col1", count($"col1").as("c")).show()


On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu  wrote:

> However this returns a single column of c, without showing the original
> col1.
> ​
>
> On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha 
> wrote:
>
>> df.groupBy($"col1").agg(count($"col1").as("c")).show
>>
>> On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu 
>> wrote:
>>
>>> Hi Spark Users Group,
>>>
>>> I’m doing groupby operations on my DataFrame *df* as following, to get
>>> count for each value of col1:
>>>
>>> > df.groupBy("col1").agg("col1" -> "count").show // I don't know if I 
>>> > should write like this.
>>> col1   COUNT(col1#347)
>>> aaa2
>>> bbb4
>>> ccc4
>>> ...
>>> and more...
>>>
>>> As I ‘d like to sort by the resulting count, with
>>> .sort("COUNT(col1#347)"), but the column name of the count result
>>> obviously cannot be retrieved in advance. Intuitively one might consider
>>> acquire column name by column index in a fashion of R’s DataFrame, except
>>> Spark doesn’t support. I have Googled *spark agg alias* and so forth,
>>> and checked DataFrame.as in Spark API, neither helped on this. Am I the
>>> only one who had ever got stuck on this issue or anything I have missed?
>>>
>>> REGARDS,
>>> Todd Leo
>>> ​
>>>
>>
>>


Re: Is the AMP lab done next February?

2015-05-11 Thread Reynold Xin
Relaying an answer from AMP director Mike Franklin:

"One year into the lab we got a 5 yr Expeditions in Computing Award as part
of the White House Big Data initiative in 2012, so we extend the lab for a
year.   We intend to start winding it down at the end of 2016, while
supporting existing projects and students who will be finishing up.   The
AMPLab faculty are starting discussions this summer about what research
challenges we'd like to tackle next, and how best to organize to do so.

An interesting thing to note is that the Spark project started at about
this point in the AMPLab predecessor project (RADLab) so we have a track
record of being able to make these transitions."


On Sat, May 9, 2015 at 8:43 PM, Justin Pihony 
wrote:

> From  my StackOverflow question
> <
> https://stackoverflow.com/questions/29593139/is-the-amp-lab-done-next-february
> >
> :
>
> Is there a way to track whether Berkeley's AMP lab will indeed shutdown
> next
> year?
>
> From their about site:
>
> The AMPLab is a five-year collaborative effort at UC Berkeley and it
> was
> started in February 2011.
>
> So, I was curious if this was a hard date, or if it will be extended (or
> has
> already been extended?)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-the-AMP-lab-done-next-February-tp22832.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Reynold Xin
Thanks for catching this. I didn't read carefully enough.

It'd make sense to have the udaf result be non-nullable, if the exprs are
indeed non-nullable.

On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot  wrote:

> Hi Haopu,
> actually here `key` is nullable because this is your input's schema :
>
> scala> result.printSchema
> root
> |-- key: string (nullable = true)
> |-- SUM(value): long (nullable = true)
>
> scala> df.printSchema
> root
> |-- key: string (nullable = true)
> |-- value: long (nullable = false)
>
> I tried it with a schema where the key is not flagged as nullable, and the
> schema is actually respected. What you can argue however is that SUM(value)
> should also be not nullable since value is not nullable.
>
> @rxin do you think it would be reasonable to flag the Sum aggregation
> function as nullable (or not) depending on the input expression's schema ?
>
> Regards,
>
> Olivier.
> Le lun. 11 mai 2015 à 22:07, Reynold Xin  a écrit :
>
>> Not by design. Would you be interested in submitting a pull request?
>>
>> On Mon, May 11, 2015 at 1:48 AM, Haopu Wang  wrote:
>>
>>> I try to get the result schema of aggregate functions using DataFrame
>>> API.
>>>
>>> However, I find the result field of groupBy columns are always nullable
>>> even the source field is not nullable.
>>>
>>> I want to know if this is by design, thank you! Below is the simple code
>>> to show the issue.
>>>
>>> ==
>>>
>>>   import sqlContext.implicits._
>>>   import org.apache.spark.sql.functions._
>>>   case class Test(key: String, value: Long)
>>>   val df = sc.makeRDD(Seq(Test("k1",2),Test("k1",1))).toDF
>>>
>>>   val result = df.groupBy("key").agg($"key", sum("value"))
>>>
>>>   // From the output, you can see the "key" column is nullable, why??
>>>   result.printSchema
>>> //root
>>> // |-- key: string (nullable = true)
>>> // |-- SUM(value): long (nullable = true)
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Reynold Xin
Not by design. Would you be interested in submitting a pull request?

On Mon, May 11, 2015 at 1:48 AM, Haopu Wang  wrote:

> I try to get the result schema of aggregate functions using DataFrame
> API.
>
> However, I find the result field of groupBy columns are always nullable
> even the source field is not nullable.
>
> I want to know if this is by design, thank you! Below is the simple code
> to show the issue.
>
> ==
>
>   import sqlContext.implicits._
>   import org.apache.spark.sql.functions._
>   case class Test(key: String, value: Long)
>   val df = sc.makeRDD(Seq(Test("k1",2),Test("k1",1))).toDF
>
>   val result = df.groupBy("key").agg($"key", sum("value"))
>
>   // From the output, you can see the "key" column is nullable, why??
>   result.printSchema
> //root
> // |-- key: string (nullable = true)
> // |-- SUM(value): long (nullable = true)
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Reynold Xin
Looks like it is spending a lot of time doing hash probing. It could be a
number of the following:

1. hash probing itself is inherently expensive compared with rest of your
workload

2. murmur3 doesn't work well with this key distribution

3. quadratic probing (triangular sequence) with a power-of-2 hash table
works really badly for this workload.

One way to test this is to instrument changeValue function to store the
number of probes in total, and then log it. We added this probing
capability to the new Bytes2Bytes hash map we built. We should consider
just having it being reported as some built-in metrics to facilitate
debugging.

https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






On Mon, May 11, 2015 at 4:21 AM, Michal Haris 
wrote:

> This is the stack trace of the worker thread:
>
>
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
>
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> org.apache.spark.scheduler.Task.run(Task.scala:64)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
> On 8 May 2015 at 22:12, Josh Rosen  wrote:
>
>> Do you have any more specific profiling data that you can share?  I'm
>> curious to know where AppendOnlyMap.changeValue is being called from.
>>
>> On Fri, May 8, 2015 at 1:26 PM, Michal Haris 
>> wrote:
>>
>>> +dev
>>> On 6 May 2015 10:45, "Michal Haris"  wrote:
>>>
>>> > Just wanted to check if somebody has seen similar behaviour or knows
>>> what
>>> > we might be doing wrong. We have a relatively complex spark application
>>> > which processes half a terabyte of data at various stages. We have
>>> profiled
>>> > it in several ways and everything seems to point to one place where
>>> 90% of
>>> > the time is spent:  AppendOnlyMap.changeValue. The job scales and is
>>> > relatively faster than its map-reduce alternative but it still feels
>>> slower
>>> > than it should be. I am suspecting too much spill but I haven't seen
>>> any
>>> > improvement by increasing number of partitions to 10k. Any idea would
>>> be
>>> > appreciated.
>>> >
>>> > --
>>> > Michal Haris
>>> > Technical Architect
>>> > direct line: +44 (0) 207 749 0229
>>> > www.visualdna.com | t: +44 (0) 207 734 7033,
>>> >
>>>
>>
>>
>
>
> --
> Michal Haris
> Technical Architect
> direct line: +44 (0) 207 749 0229
> www.visualdna.com | t: +44 (0) 207 734 7033,
>


[ANNOUNCE] Ending Java 6 support in Spark 1.5 (Sep 2015)

2015-05-05 Thread Reynold Xin
Hi all,

We will drop support for Java 6 starting Spark 1.5, tentative scheduled to
be released in Sep 2015. Spark 1.4, scheduled to be released in June 2015,
will be the last minor release that supports Java 6. That is to say:

Spark 1.4.x (~ Jun 2015): will work with Java 6, 7, 8.

Spark 1.5+ (~ Sep 2015): will NOT work with Java 6, but work with Java 7, 8.


PS: Oracle ended Java 6 updates in Feb 2013.


Re: How to distribute Spark computation recipes

2015-04-27 Thread Reynold Xin
The code themselves are the "recipies", no?


On Mon, Apr 27, 2015 at 2:49 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> I know that any RDD is related to its SparkContext and the associated
> variables (broadcast, accumulators), but I'm looking for a way to
> serialize/deserialize full RDD computations ?
>
> @rxin Spark SQL is, in a way, already doing this but the parsers are
> private[sql], is there any way to reuse this work to get Logical/Physical
> Plans in & out of Spark ?
>
> Regards,
>
> Olivier.
>


Re: Updating a Column in a DataFrame

2015-04-21 Thread Reynold Xin
You can use

df.withColumn("a", df.b)

to make column a having the same value as column b.


On Mon, Apr 20, 2015 at 3:38 PM, ARose  wrote:

> In my Java application, I want to update the values of a Column in a given
> DataFrame. However, I realize DataFrames are immutable, and therefore
> cannot
> be updated by conventional means. Is there a workaround for this sort of
> transformation? If so, can someone provide an example?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Column renaming after DataFrame.groupBy

2015-04-21 Thread Reynold Xin
You can use the more verbose syntax:

d.groupBy("_1").agg(d("_1"), sum("_1").as("sum_1"), sum("_2").as("sum_2"))

On Tue, Apr 21, 2015 at 1:06 AM, Justin Yip  wrote:

> Hello,
>
> I would like rename a column after aggregation. In the following code, the
> column name is "SUM(_1#179)", is there a way to rename it to a more
> friendly name?
>
> scala> val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
> scala> d.groupBy("_1").sum().printSchema
> root
>  |-- _1: integer (nullable = false)
>  |-- SUM(_1#179): long (nullable = true)
>  |-- SUM(_2#180): long (nullable = true)
>
> Thanks.
>
> Justin
>
> --
> View this message in context: Column renaming after DataFrame.groupBy
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: how to make a spark cluster ?

2015-04-20 Thread Reynold Xin
Actually if you only have one machine, just use the Spark local mode.

Just download the Spark tarball, untar it, set master to local[N], where N
= number of cores. You are good to go. There is no setup of job tracker or
Hadoop.


On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak  wrote:

> Thank you :)
>
> On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke  wrote:
>
>> Hi, If you have just one physical machine then I would try out Docker
>> instead of a full VM (would be waste of memory and CPU).
>>
>> Best regards
>> Le 20 avr. 2015 00:11, "hnahak"  a écrit :
>>
>>> Hi All,
>>>
>>> I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I
>>> just
>>> need to know what should be the best solution to make a spark cluster?
>>>
>>> If I need to process TBs of data then
>>> 1. Only one machine, which contain driver, executor, job tracker and task
>>> tracker everything.
>>> 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM
>>> 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each
>>>
>>> please give me your views/suggestions
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
>
> --
> {{{H2N}}}-(@:
>


Re: dataframe can not find fields after loading from hive

2015-04-17 Thread Reynold Xin
This is strange. cc the dev list since it might be a bug.



On Thu, Apr 16, 2015 at 3:18 PM, Cesar Flores  wrote:

> Never mind. I found the solution:
>
> val newDataFrame = hc.createDataFrame(hiveLoadedDataFrame.rdd,
> hiveLoadedDataFrame.schema)
>
> which translate to convert the data frame to rdd and back again to data
> frame. Not the prettiest solution, but at least it solves my problems.
>
>
> Thanks,
> Cesar Flores
>
>
>
> On Thu, Apr 16, 2015 at 11:17 AM, Cesar Flores  wrote:
>
>>
>> I have a data frame in which I load data from a hive table. And my issue
>> is that the data frame is missing the columns that I need to query.
>>
>> For example:
>>
>> val newdataset = dataset.where(dataset("label") === 1)
>>
>> gives me an error like the following:
>>
>> ERROR yarn.ApplicationMaster: User class threw exception: resolved
>> attributes label missing from label, user_id, ...(the rest of the fields of
>> my table
>> org.apache.spark.sql.AnalysisException: resolved attributes label missing
>> from label, user_id, ... (the rest of the fields of my table)
>>
>> where we can see that the label field actually exist. I manage to solve
>> this issue by updating my syntax to:
>>
>> val newdataset = dataset.where($"label" === 1)
>>
>> which works. However I can not make this trick in all my queries. For
>> example, when I try to do a unionAll from two subsets of the same data
>> frame the error I am getting is that all my fields are missing.
>>
>> Can someone tell me if I need to do some post processing after loading
>> from hive in order to avoid this kind of errors?
>>
>>
>> Thanks
>> --
>> Cesar Flores
>>
>
>
>
> --
> Cesar Flores
>


Re: Why does the HDFS parquet file generated by Spark SQL have different size with those on Tachyon?

2015-04-17 Thread Reynold Xin
It's because you did a repartition -- which rearranges all the data.

Parquet uses all kinds of compression techniques such as dictionary
encoding and run-length encoding, which would result in the size difference
when the data is ordered different.

On Fri, Apr 17, 2015 at 4:51 AM, zhangxiongfei 
wrote:

> Hi,
> I did some tests on Parquet Files with Spark SQL DataFrame API.
> I generated 36 gzip compressed parquet files by Spark SQL and stored them
> on Tachyon,The size of each file is about  222M.Then read them with below
> code.
> val tfs
> =sqlContext.parquetFile("tachyon://datanode8.bitauto.dmp:19998/apps/tachyon/adClick");
> Next,I just save this DataFrame onto HDFS with below code.It will generate
> 36 parquet files too,but the size of each file is about 265M
>
> tfs.repartition(36).saveAsParquetFile("/user/zhangxf/adClick-parquet-tachyon");
> My question is Why the files on HDFS has different size with those on
> Tachyon even though they come from the same original data?
>
>
> Thanks
> Zhang Xiongfei
>
>


Re: [Spark1.3] UDF registration issue

2015-04-13 Thread Reynold Xin
You can do this:

strLen = udf((s: String) => s.length())
cleanProcessDF.withColumn("dii",strLen(col("di")))

(You might need to play with the type signature a little bit to get it to
compile)


On Fri, Apr 10, 2015 at 11:30 AM, Yana Kadiyska 
wrote:

> Hi, I'm running into some trouble trying to register a UDF:
>
> scala> sqlContext.udf.register("strLen", (s: String) => s.length())
> res22: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(,IntegerType)
>
> scala> cleanProcessDF.withColumn("dii",strLen(col("di")))
> :33: error: not found: value strLen
>   cleanProcessDF.withColumn("dii",strLen(col("di")))
>
> ​
>
> Where cleanProcessDF is a dataframe
> Is my syntax wrong? Or am I missing an import of some sort?
>


Re: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Reynold Xin
I think what happened was applying the narrowest possible type. Type
widening is required, and as a result, the narrowest type is string between
a string and an int.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144


On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip  wrote:

> Hello,
>
> I am experimenting with DataFrame. I tried to construct two DataFrames
> with:
> 1. case class A(a: Int, b: String)
> scala> adf.printSchema()
> root
>  |-- a: integer (nullable = false)
>  |-- b: string (nullable = true)
>
> 2. case class B(a: String, c: Int)
> scala> bdf.printSchema()
> root
>  |-- a: string (nullable = true)
>  |-- c: integer (nullable = false)
>
>
> Then I unioned the these two DataFrame with the unionAll function, and I
> get the following schema. It is kind of a mixture of A and B.
>
> scala> val udf = adf.unionAll(bdf)
> scala> udf.printSchema()
> root
>  |-- a: string (nullable = false)
>  |-- b: string (nullable = true)
>
> The unionAll documentation says it behaves like the SQL UNION ALL
> function. However, unioning incompatible types is not well defined for SQL.
> Is there any expected behavior for unioning incompatible data frames?
>
> Thanks.
>
> Justin
>


Manning looking for a co-author for the GraphX in Action book

2015-04-13 Thread Reynold Xin
Hi all,

Manning (the publisher) is looking for a co-author for the GraphX in Action
book. The book currently has one author (Michael Malak), but they are
looking for a co-author to work closely with Michael and improve the
writings and make it more consumable.

Early access page for the book: http://www.manning.com/malak/

Let me know if you are interested in that. Cheers.


Re: ArrayBuffer within a DataFrame

2015-04-03 Thread Reynold Xin
There is already an explode function on DataFrame btw

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L712

I think something like this would work. You might need to play with the
type.

df.explode("arrayBufferColumn") { x => x }



On Fri, Apr 3, 2015 at 6:43 AM, Denny Lee  wrote:

> Thanks Dean - fun hack :)
>
> On Fri, Apr 3, 2015 at 6:11 AM Dean Wampler  wrote:
>
>> A hack workaround is to use flatMap:
>>
>> rdd.flatMap{ case (date, array) => for (x <- array) yield (date, x) }
>>
>> For those of you who don't know Scala, the for comprehension iterates
>> through the ArrayBuffer, named "array" and yields new tuples with the date
>> and each element. The case expression to the left of the => pattern matches
>> on the input tuples.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Thu, Apr 2, 2015 at 10:45 PM, Denny Lee  wrote:
>>
>>> Thanks Michael - that was it!  I was drawing a blank on this one for
>>> some reason - much appreciated!
>>>
>>>
>>> On Thu, Apr 2, 2015 at 8:27 PM Michael Armbrust 
>>> wrote:
>>>
 A lateral view explode using HiveQL.  I'm hopping to add explode
 shorthand directly to the df API in 1.4.

 On Thu, Apr 2, 2015 at 7:10 PM, Denny Lee 
 wrote:

> Quick question - the output of a dataframe is in the format of:
>
> [2015-04, ArrayBuffer(A, B, C, D)]
>
> and I'd like to return it as:
>
> 2015-04, A
> 2015-04, B
> 2015-04, C
> 2015-04, D
>
> What's the best way to do this?
>
> Thanks in advance!
>
>
>

>>


Re: Can I call aggregate UDF in DataFrame?

2015-04-01 Thread Reynold Xin
You totally can.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L792

There is also an attempt at adding stddev here already:
https://github.com/apache/spark/pull/5228



On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang  wrote:

> Specifically there are only 5 aggregate functions in class
> org.apache.spark.sql.GroupedData: sum/max/min/mean/count.
>
> Can I plugin a function to calculate stddev?
>
> Thank you!
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Build fails on 1.3 Branch

2015-03-29 Thread Reynold Xin
I pushed a hotfix to the branch. Should work now.


On Sun, Mar 29, 2015 at 9:23 AM, Marty Bower  wrote:

> Yes, that worked - thank you very much.
>
>
>
> On Sun, Mar 29, 2015 at 9:05 AM Ted Yu  wrote:
>
>> Jenkins build failed too:
>>
>>
>> https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=centos/326/consoleFull
>>
>> For the moment, you can apply the following change:
>>
>> diff --git
>> a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
>> b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpression
>> index a53ae97..7ae4b38 100644
>> ---
>> a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
>> +++
>> b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
>> @@ -18,7 +18,7 @@
>>  package org.apache.spark.sql
>>
>>  import org.apache.spark.sql.catalyst.expressions.NamedExpression
>> -import org.apache.spark.sql.catalyst.plans.logical.{Project, NoRelation}
>> +import org.apache.spark.sql.catalyst.plans.logical.{Project}
>>  import org.apache.spark.sql.functions._
>>  import org.apache.spark.sql.test.TestSQLContext
>>  import org.apache.spark.sql.test.TestSQLContext.implicits._
>>
>> Cheers
>>
>> On Sun, Mar 29, 2015 at 8:48 AM, mjhb  wrote:
>>
>>> I tried pulling the source and building for the first time, but cannot
>>> get
>>> past the "object NoRelation is not a member of package
>>> org.apache.spark.sql.catalyst.plans.logical" error below on the 1.3
>>> branch.
>>> I can build the 1.2 branch.
>>>
>>> I have tried with both -Dscala-2.11 and 2.10 (after running the
>>> appropriate
>>> change-version-to-2.1#.sh), and with different combinations of hadoop
>>> flags.
>>>
>>> Relevant excerpts below - full build output available at
>>> http://mjhb.com/tmp/build-spark-1.3.out or
>>> http://mjhb.com/tmp/build-spark-1.3.out.gz
>>>
>>> $ git branch
>>> * branch-1.3
>>> $ build/mvn -e -X -DskipTests clean package
>>> Apache Maven 3.0.5
>>> Maven home: /usr/share/maven
>>> Java version: 1.7.0_76, vendor: Oracle Corporation
>>> Java home: /usr/lib/jvm/java-7-oracle/jre
>>> Default locale: en_US, platform encoding: UTF-8
>>> OS name: "linux", version: "3.16.0-33-generic", arch: "amd64", family:
>>> "unix"
>>>
>>> 
>>>
>>> [error]
>>>
>>> /home/marty/work/spark-1.3-maint/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala:21:
>>> object NoRelation is not a member of package
>>> org.apache.spark.sql.catalyst.plans.logical
>>> [error] import org.apache.spark.sql.catalyst.plans.logical.{Project,
>>> NoRelation}
>>> [error]^
>>> [error] one error found
>>> [debug] Compilation failed (CompilerInterface)
>>> [error] Compile failed at Mar 29, 2015 7:52:54 AM [5.793s]
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Build-fails-on-1-3-Branch-tp22275.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: spark disk-to-disk

2015-03-23 Thread Reynold Xin
Maybe implement a very simple function that uses the Hadoop API to read in
based on file names (i.e. parts)?

On Mon, Mar 23, 2015 at 10:55 AM, Koert Kuipers  wrote:

> there is a way to reinstate the partitioner, but that requires
> sc.objectFile to read exactly what i wrote, which means sc.objectFile
> should never split files on reading (a feature of hadoop file inputformat
> that gets in the way here).
>
> On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers  wrote:
>
>> i just realized the major limitation is that i lose partitioning info...
>>
>> On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin  wrote:
>>
>>>
>>> On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers 
>>> wrote:
>>>
>>>> so finally i can resort to:
>>>> rdd.saveAsObjectFile(...)
>>>> sc.objectFile(...)
>>>> but that seems like a rather broken abstraction.
>>>>
>>>>
>>> This seems like a fine solution to me.
>>>
>>>
>>
>


Re: spark disk-to-disk

2015-03-22 Thread Reynold Xin
On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers  wrote:

> so finally i can resort to:
> rdd.saveAsObjectFile(...)
> sc.objectFile(...)
> but that seems like a rather broken abstraction.
>
>
This seems like a fine solution to me.


Re: SchemaRDD: SQL Queries vs Language Integrated Queries

2015-03-10 Thread Reynold Xin
They should have the same performance, as they are compiled down to the
same execution plan.

Note that starting in Spark 1.3, SchemaRDD is renamed DataFrame:

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html



On Tue, Mar 10, 2015 at 2:13 PM, Cesar Flores  wrote:

>
> I am new to the SchemaRDD class, and I am trying to decide in using SQL
> queries or Language Integrated Queries (
> https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
> ).
>
> Can someone tell me what is the main difference between the two
> approaches, besides using different syntax? Are they interchangeable? Which
> one has better performance?
>
>
> Thanks a lot
> --
> Cesar Flores
>


Help vote for Spark talks at the Hadoop Summit

2015-02-24 Thread Reynold Xin
Hi all,

The Hadoop Summit uses community choice voting to decide which talks to
feature. It would be great if the community could help vote for Spark talks
so that Spark has a good showing at this event. You can make three votes on
each track. Below I've listed 3 talks that are important to Spark's
roadmap. Please give 3 votes to each of the following talks.

Committer Track: Lessons from Running Ultra Large Scale Spark Workloads on
Hadoop
https://hadoopsummit.uservoice.com/forums/283260-committer-track/suggestions/7074016

Data Science track: DataFrames: large-scale data science on Hadoop data
with Spark
https://hadoopsummit.uservoice.com/forums/283261-data-science-and-hadoop/suggestions/7074147

Future of Hadoop track: Online Approximate OLAP in SparkSQL
https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074424


Thanks!


Re: Spark 1.3 dataframe documentation

2015-02-24 Thread Reynold Xin
The official documentation will be posted when 1.3 is released (early
March).

Right now, you can build the docs yourself by running "jekyll build" in
docs. Alternatively, just look at dataframe,py as Ted pointed out.


On Tue, Feb 24, 2015 at 6:56 AM, Ted Yu  wrote:

> Have you looked at python/pyspark/sql/dataframe.py ?
>
> Cheers
>
> On Tue, Feb 24, 2015 at 6:12 AM, poiuytrez 
> wrote:
>
>> Hello,
>>
>> I have built Spark 1.3. I can successfully use the dataframe api.
>> However, I
>> am not able to find its api documentation in Python. Do you know when the
>> documentation will be available?
>>
>> Best Regards,
>> poiuytrez
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-dataframe-documentation-tp21789.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: New guide on how to write a Spark job in Clojure

2015-02-24 Thread Reynold Xin
Thanks for sharing, Chris.

On Tue, Feb 24, 2015 at 4:39 AM, Christian Betz <
christian.b...@performance-media.de> wrote:

>  Hi all,
>
>  Maybe some of you are interested: I wrote a new guide on how to start
> using Spark from Clojure. The tutorial covers
>
>- setting up a project,
>- doing REPL- or Test Driven Development of Spark jobs
>- Running Spark jobs locally.
>
> Just read it on
> https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html.
>
>  Comments (and Pull requests) are very welcome.
>
>  Sincerly
>
>  Chris
>
>


Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Reynold Xin
BTW we merged this today: https://github.com/apache/spark/pull/4640

This should allow us in the future to address column by name in a Row.


On Mon, Feb 16, 2015 at 11:39 AM, Michael Armbrust 
wrote:

> I can unpack the code snippet a bit:
>
> caper.select('ran_id) is the same as saying "SELECT ran_id FROM table" in
> SQL.  Its always a good idea to explicitly request the columns you need
> right before using them.  That way you are tolerant of any changes to the
> schema that might happen upstream.
>
> The next part .map { case Row(ranId: String) => ... } is doing an
> extraction to pull out the values of the row into typed variables.  This is
> the same as doing .map(row => row(0).asInstanceOf[String]) or .map(row =>
> row.getString(0)), but I find this syntax easier to read since it lines
> up nicely with the select clause that comes right before it.  It's also
> less verbose especially when pulling out a bunch of columns.
>
> Regarding the differences between python and java/scala, part of this is
> just due to the nature of these language.  Since java/scala are statically
> typed, you will always have to explicitly say the type of the column you
> are extracting (the bonus here is they are much faster than python due to
> optimizations this strictness allows).  However, since its already a little
> more verbose, we decided not to have the more expensive ability to look up
> columns in a row by name, and instead go with a faster ordinal based API.
> We could revisit this, but its not currently something we are planning to
> change.
>
> Michael
>
> On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell  wrote:
>
>>  I am just learning scala so I don't actually understand what your code
>> snippet is doing but thank you, I will learn more so I can figure it out.
>>
>> I am new to all of this and still trying to make the mental shift from
>> normal programming to distributed programming, but it seems to me that the
>> row object would know its own schema object that it came from and be able
>> to ask its schema to transform a name to a column number. Am I missing
>> something or is this just a matter of time constraints and this one just
>> hasn't gotten into the queue yet?
>>
>> Baring that, do the schema classes provide methods for doing this? I've
>> looked and didn't see anything.
>>
>> I've just discovered that the python implementation for SchemaRDD does in
>> fact allow for referencing by name and column. Why is this provided in the
>> python implementation but not scala or java implementations?
>>
>> Thanks,
>>
>> --eric
>>
>>
>>
>> On 02/16/2015 10:46 AM, Michael Armbrust wrote:
>>
>> For efficiency the row objects don't contain the schema so you can't get
>> the column by name directly.  I usually do a select followed by pattern
>> matching. Something like the following:
>>
>>  caper.select('ran_id).map { case Row(ranId: String) => }
>>
>> On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell  wrote:
>>
>>> Is it possible to reference a column from a SchemaRDD using the column's
>>> name instead of its number?
>>>
>>> For example, let's say I've created a SchemaRDD from an avro file:
>>>
>>> val sqlContext = new SQLContext(sc)
>>> import sqlContext._
>>> val caper=sqlContext.avroFile("hdfs://localhost:9000/sma/raw_avro/caper")
>>> caper.registerTempTable("caper")
>>>
>>> scala> caper
>>> res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
>>> SchemaRDD.scala:108
>>> == Query Plan ==
>>> == Physical Plan ==
>>> PhysicalRDD
>>> [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
>>> scala>
>>>
>>> Now I want to access fields, and of course the normal thing to do is to
>>> use a field name, not a field number.
>>>
>>> scala> val kv = caper.map(r => (r.ran_id, r))
>>> :23: error: value ran_id is not a member of
>>> org.apache.spark.sql.Row
>>>val kv = caper.map(r => (r.ran_id, r))
>>>
>>> How do I do this?
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


Re: Spark ML pipeline

2015-02-11 Thread Reynold Xin
Yes. Next release (Spark 1.3) is coming out end of Feb / early Mar.

On Wed, Feb 11, 2015 at 7:22 AM, Jianguo Li 
wrote:

> Hi,
>
> I really like the pipeline in the spark.ml in Spark1.2 release. Will
> there be more machine learning algorithms implemented for the pipeline
> framework in the next major release? Any idea when the next major release
> comes out?
>
> Thanks,
>
> Jianguo
>


Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark

2015-02-10 Thread Reynold Xin
I think we made the binary protocol compatible across all versions, so you
should be fine with using any one of them. 1.2.1 is probably the best since
it is the most recent stable release.

On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang 
wrote:

> Hi,
>
> I need to use branch-1.2 and sometimes master builds of Spark for my
> project. However the officially supported Spark version by our Hadoop admin
> is only 1.2.0.
>
> So, my question is which version/build of spark-yarn-shuffle.jar should I
> use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0)
>
> Thanks,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: 2GB limit for partitions?

2015-02-03 Thread Reynold Xin
cc dev list


How are you saving the data? There are two relevant 2GB limits:

1. Caching

2. Shuffle


For caching, a partition is turned into a single block.

For shuffle, each map partition is partitioned into R blocks, where R =
number of reduce tasks. It is unlikely a shuffle block > 2G, although it
can still happen.

I think the 2nd problem is easier to fix than the 1st, because we can
handle that in the network transport layer. It'd require us to divide the
transfer of a very large block into multiple smaller blocks.



On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid  wrote:

> Michael,
>
> you are right, there is definitely some limit at 2GB.  Here is a trivial
> example to demonstrate it:
>
> import org.apache.spark.storage.StorageLevel
> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
> d.count()
>
> It gives the same error you are observing.  I was under the same
> impression as Sean about the limits only being on blocks, not partitions --
> but clearly that isn't the case here.
>
> I don't know the whole story yet, but I just wanted to at least let you
> know you aren't crazy :)
> At the very least this suggests that you might need to make smaller
> partitions for now.
>
> Imran
>
>
> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
> m_albert...@yahoo.com.invalid> wrote:
>
>> Greetings!
>>
>> Thanks for the response.
>>
>> Below is an example of the exception I saw.
>> I'd rather not post code at the moment, so I realize it is completely
>> unreasonable to ask for a diagnosis.
>> However, I will say that adding a "partitionBy()" was the last change
>> before this error was created.
>>
>>
>> Thanks for your time and any thoughts you might have.
>>
>> Sincerely,
>>  Mike
>>
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>
>>   --
>>  *From:* Sean Owen 
>> *To:* Michael Albert 
>> *Cc:* "user@spark.apache.org" 
>> *Sent:* Monday, February 2, 2015 10:13 PM
>> *Subject:* Re: 2GB limit for partitions?
>>
>> The limit is on blocks, not partitions. Partitions have many blocks.
>>
>> It sounds like you are creating very large values in memory, but I'm
>> not sure given your description. You will run into problems if a
>> single object is more than 2GB, of course. More of the stack trace
>> might show what is mapping that much memory.
>>
>> If you simply want data into 1000 files it's a lot simpler. Just
>> repartition into 1000 partitions and save the data. If you need more
>> control over what goes into which partition, use a Partitioner, yes.
>>
>>
>>
>> On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
>>  wrote:
>> > Greetings!
>> >
>> > SPARK-1476 says that there is a 2G limit for "blocks".
>> > Is this the same as a 2G limit for partitions (or approximately so?)?
>> >
>> >
>> > What I had been attempting to do is the following.
>> > 1) Start with a moderately large data set (currently about 100GB, but
>> > growing).
>> > 2) Create about 1,000 files (yes, files) each representing a subset of
>> the
>> > data.
>> >
>> > The current attempt I am working on is something like this.
>> > 1) Do a "map" whose output key indicates which of the 1,000 files it
>> will go
>> > into and whose value is what I will want to stick into the file.
>> > 2) Partition the data and use the body of mapPartition to open a file
>> and
>> > save the data.
>> >
>> > My apologies, this is actually embedded in a bigger mess, so I won't
>> post
>> >

Re: How to access OpenHashSet in my standalone program?

2015-01-14 Thread Reynold Xin
The reason is fairly simple actually - we don't want to commit to
maintaining the specific APIs exposed. If we expose OpenHashSet, we will
have to always keep that in Spark and not change the API.

On Tue, Jan 13, 2015 at 12:39 PM, Tae-Hyuk Ahn  wrote:

> Thank, Josh and Reynold. Yes, I can incorporate it to my package and
> use it. But I am still wondering why you designed such useful
> functions as private.
>
> On Tue, Jan 13, 2015 at 3:33 PM, Reynold Xin  wrote:
> > It is not meant to be a public API. If you want to use it, maybe copy the
> > code out of the package and put it in your own project.
> >
> > On Fri, Jan 9, 2015 at 7:19 AM, Tae-Hyuk Ahn  wrote:
> >>
> >> Hi,
> >>
> >> I would like to use OpenHashSet
> >> (org.apache.spark.util.collection.OpenHashSet) in my standalone
> program. I
> >> can import it without error as:
> >>
> >> import org.apache.spark.util.collection.OpenHashSet
> >>
> >> However, when I try to access it, I am getting an error as:
> >>
> >> object OpenHashSet in package collection cannot be accessed in package
> >> org.apache.spark.util.collection
> >>
> >> I suspect this error is caused by private object. I am wondering how I
> can
> >> use this object in my standalone program.
> >>
> >> Thanks,
> >>
> >> Ted
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>


Re: How to access OpenHashSet in my standalone program?

2015-01-13 Thread Reynold Xin
It is not meant to be a public API. If you want to use it, maybe copy the
code out of the package and put it in your own project.

On Fri, Jan 9, 2015 at 7:19 AM, Tae-Hyuk Ahn  wrote:

> Hi,
>
> I would like to use OpenHashSet
> (org.apache.spark.util.collection.OpenHashSet) in my standalone program. I
> can import it without error as:
>
> import org.apache.spark.util.collection.OpenHashSet
>
> However, when I try to access it, I am getting an error as:
>
> object OpenHashSet in package collection cannot be accessed in package
> org.apache.spark.util.collection
>
> I suspect this error is caused by private object. I am wondering how I can
> use this object in my standalone program.
>
> Thanks,
>
> Ted
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Creating RDD from only few columns of a Parquet file

2015-01-13 Thread Reynold Xin
What query did you run? Parquet should have predicate and column pushdown,
i.e. if your query only needs to read 3 columns, then only 3 will be read.

On Mon, Jan 12, 2015 at 10:20 PM, Ajay Srivastava <
a_k_srivast...@yahoo.com.invalid> wrote:

> Hi,
> I am trying to read a parquet file using -
>
> val parquetFile = sqlContext.parquetFile("people.parquet")
>
> There is no way to specify that I am interested in reading only some columns 
> from disk. For example, If the parquet file has 10 columns and want to read 
> only 3 columns from disk.
>
> We have done an experiment -
> Table1 - Parquet file containing 10 columns
> Table2 - Parquet file containing only 3 columns which were used in query
>
> The time taken by query on table1 and table2 shows huge difference. Query on 
> Table1 takes more than double of time taken on table2 which makes me think 
> that spark is reading all the columns from disk in case of table1 when it 
> needs only 3 columns.
>
> How should I make sure that it reads only 3 of 10 columns from disk ?
>
>
> Regards,
> Ajay
>
>


Re: saveAsTextFile just uses toString and Row@37f108

2015-01-13 Thread Reynold Xin
It is just calling RDD's saveAsTextFile. I guess we should really override
the saveAsTextFile in SchemaRDD (or make Row.toString comma separated).

Do you mind filing a JIRA ticket and copy me?


On Tue, Jan 13, 2015 at 12:03 AM, Kevin Burton  wrote:

> This is almost funny.
>
> I want to dump a computation to the filesystem.  It’s just the result of a
> Spark SQL call reading the data from Cassandra.
>
> The problem is that it looks like it’s just calling toString() which is
> useless.
>
> The example is below.
>
> I assume this is just a (bad) bug.
>
> org.apache.spark.sql.api.java.Row@37f108
>
> org.apache.spark.sql.api.java.Row@d0426773
>
> org.apache.spark.sql.api.java.Row@38c9d3
>
>
> --
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
> 
>
>


Re: Spark on teradata?

2015-01-08 Thread Reynold Xin
Depending on your use cases. If the use case is to extract small amount of
data out of teradata, then you can use the JdbcRDD and soon a jdbc input
source based on the new Spark SQL external data source API.



On Wed, Jan 7, 2015 at 7:14 AM, gen tang  wrote:

> Hi,
>
> I have a stupid question:
> Is it possible to use spark on Teradata data warehouse, please? I read
> some news on internet which say yes. However, I didn't find any example
> about this issue
>
> Thanks in advance.
>
> Cheers
> Gen
>
>


Re: Confused why I'm losing workers/executors when writing a large file to S3

2014-11-13 Thread Reynold Xin
Darin,

You might want to increase these config options also:

spark.akka.timeout 300
spark.storage.blockManagerSlaveTimeoutMs 30

On Thu, Nov 13, 2014 at 11:31 AM, Darin McBeath  wrote:

> For one of my Spark jobs, my workers/executors are dying and leaving the
> cluster.
>
> On the master, I see something like the following in the log file.  I'm
> surprised to see the '60' seconds in the master log below because I
> explicitly set it to '600' (or so I thought) in my spark job (see below).
> This is happening at the end of my job when I'm trying to persist a large
> RDD (probably around 300+GB) back to S3 (in 256 partitions).  My cluster
> consists of 6 r3.8xlarge machines.  The job successfully works when I'm
> outputting 100GB or 200GB.
>
> If  you have any thoughts/insights, it would be appreciated.
>
> Thanks.
>
> Darin.
>
> Here is where I'm setting the 'timeout' in my spark job.
>
> SparkConf conf = new SparkConf()
> .setAppName("SparkSync Application")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.rdd.compress","true")
> .set("spark.core.connection.ack.wait.timeout","600");
> ​
> On the master, I see the following in the log file.
>
> 4/11/13 17:20:39 WARN master.Master: Removing
> worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no
> heartbeat in 60 seconds
> 14/11/13 17:20:39 INFO master.Master: Removing worker
> worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on
> ip-10-35-184-232.ec2.internal:51877
> 14/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2
>
> On a worker, I see something like the following in the log file.
>
> 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>   at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.result(package.scala:107)
>   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
>   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)
> 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception
> (java.net.SocketException) caught when processing request: Broken pipe
> 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception
> (java.net.SocketException) caught when processing request: Broken pipe
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
> (java.io.IOException) caught when processing request: Resetting to invalid
> mark
> 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
> 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which
> exceeds the maximum retry count of 5
> 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which
> exceeds the maximum retry count of 5
> 14/11/13 17:22:57 WARN util.AkkaUtils: Error sending message in 1 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
>


Re: Breaking the previous large-scale sort record with Spark

2014-11-05 Thread Reynold Xin
Hi all,

We are excited to announce that the benchmark entry has been reviewed by
the Sort Benchmark committee and Spark has officially won the Daytona
GraySort contest in sorting 100TB of data.

Our entry tied with a UCSD research team building high performance systems
and we jointly set a new world record. This is an important milestone for
the project, as it validates the amount of engineering work put into Spark
by the community.

As Matei said, "For an engine to scale from these multi-hour petabyte batch
jobs down to 100-millisecond streaming and interactive queries is quite
uncommon, and it's thanks to all of you folks that we are able to make this
happen."

Updated blog post:
http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html




On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia 
wrote:

> Hi folks,
>
> I interrupt your regularly scheduled user / dev list to bring you some
> pretty cool news for the project, which is that we've been able to use
> Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
> faster on 10x fewer nodes. There's a detailed writeup at
> http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
> Summary: while Hadoop MapReduce held last year's 100 TB world record by
> sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
> 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.
>
> I want to thank Reynold Xin for leading this effort over the past few
> weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
> Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
> providing the machines to make this possible. Finally, this result would of
> course not be possible without the many many other contributions, testing
> and feature requests from throughout the community.
>
> For an engine to scale from these multi-hour petabyte batch jobs down to
> 100-millisecond streaming and interactive queries is quite uncommon, and
> it's thanks to all of you folks that we are able to make this happen.
>
> Matei
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Reynold Xin
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com <
arthur.hk.c...@gmail.com> wrote:

>
> Hi,
>
> FYI as follows.  Could you post your heap size settings as well your Spark
> app code?
>
> Regards
> Arthur
>
> 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
> message Requested array size exceeds VM limit indicates that the
> application (or APIs used by that application) attempted to allocate an
> array that is larger than the heap size. For example, if an application
> attempts to allocate an array of 512MB but the maximum heap size is 256MB
> then OutOfMemoryError will be thrown with the reason Requested array size
> exceeds VM limit. In most cases the problem is either a configuration
> issue (heap size too small), or a bug that results in an application
> attempting to create a huge array, for example, when the number of elements
> in the array are computed using an algorithm that computes an incorrect
> size.”
>
>
>
>
> On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar 
> wrote:
>
> Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
> sort use case in a framework that is leading in sorting bench marks? Or is
> there something fundamentally wrong in the usage?
> On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar"  wrote:
>
>> Hi,
>>
>> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
>> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
>> of 1061 keys with values being Iterable> String>>. The job runs on 3 hosts in a standalone setup with each host's
>> executor having 100G RAM and 24 cores dedicated to it. While the groupBy
>> stage completes successfully with ~24GB of shuffle write, the
>> saveAsTextFile fails after repeated retries with each attempt failing due
>> to an out of memory error *[1]*. I understand that a few partitions may
>> be overloaded as a result of the groupBy and I've tried the following
>> config combinations unsuccessfully:
>>
>> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
>> 1061 paritions and have max cores = 3 so that each key is a "logical"
>> partition (though many partitions will end up on very few hosts), and each
>> host likely runs saveAsTextFile on a single key at a time due to max cores
>> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
>>
>> 2) Leave max cores unspecified, set the level of parallelism to 72, and
>> leave number of partitions unspecified (in which case the # input
>> partitions was used, which is 44)
>> Since I do not intend to cache RDD's, I have set
>> spark.storage.memoryFraction=0.2 in both cases.
>>
>> My understanding is that if each host is processing a single logical
>> partition to saveAsTextFile and is reading from other hosts to write out
>> the RDD, it is unlikely that it would run out of memory. My interpretation
>> of the spark tuning guide is that the degree of parallelism has little
>> impact in case (1) above since max cores = number of hosts. Can someone
>> explain why there are still OOM's with 100G being available? On a related
>> note, intuitively (though I haven't read the source), it appears that an
>> entire key-value pair needn't fit into memory of a single host for
>> saveAsTextFile since a single shuffle read from a remote can be written to
>> HDFS before the next remote read is carried out. This way, not all data
>> needs to be collected at the same time.
>>
>> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
>> tuning guide and even as per Datastax's spark introduction), there may need
>> to be more documentation around the internals of spark to help users take
>> better informed tuning decisions with parallelism, max cores, number
>> partitions and other tunables. Is there any ongoing effort on that front?
>>
>> Thanks,
>> Bharath
>>
>>
>> *[1]* OOM stack trace and logs
>> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
>> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
>> size exceeds VM limit
>> java.util.Arrays.copyOf(Arrays.java:3326)
>>
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>
>> java.lang.AbstractStringBuilder.append(Abstract

Re: something about rdd.collect

2014-10-14 Thread Reynold Xin
Hi Randy,

collect essentially transfers all the data to the driver node. You definitely 
wouldn’t want to collect 200 million words. It is a pretty large number and you 
can run out of memory on your driver with that much data.

-- 
Reynold Xin


On October 14, 2014 at 9:26:13 PM, randylu (randyl...@gmail.com) wrote:

My code is as follows:  
*documents.flatMap(case words => words.map(w => (w, 1))).reduceByKey(_ +  
_).collect()*  
In driver's log, reduceByKey() is finished, but collect() seems always in  
run, just can't be finished.  
In additional, there are about 200,000,000 words needs to be collected. Is  
it too large for collect()? But when words decreases to 1,000,000, it's  
okay!  
Anyone can explain it? Thanks a lot.  



--  
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/something-about-rdd-collect-tp16451.html
  
Sent from the Apache Spark User List mailing list archive at Nabble.com.  

-  
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org  
For additional commands, e-mail: user-h...@spark.apache.org  



Re: SQL queries fail in 1.2.0-SNAPSHOT

2014-09-29 Thread Reynold Xin
Hi Daoyuan,

Do you mind applying this patch and look at the exception again?

https://github.com/apache/spark/pull/2580


It has also been merged in master so if you pull from master, you should
have that.


On Mon, Sep 29, 2014 at 1:17 AM, Wang, Daoyuan 
wrote:

>  Hi all,
>
>
>
> I had some of my queries run on 1.1.0-SANPSHOT at commit b1b20301(Aug 24),
> but in current master branch, my queries would not work. I looked into the
> stderr file in executor, and find the following lines:
>
>
>
> 14/09/26 16:52:46 ERROR nio.NioBlockTransferService: Exception handling
> buffer message
>
> java.io.IOException: Channel not open for writing - cannot extend file to
> required size
>
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)
>
> at
> org.apache.spark.network.FileSegmentManagedBuffer.nioByteBuffer(ManagedBuffer.scala:73)
>
> at
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:203)
>
> at org.apache.spark.network.nio.NioBlockTransferService.org
> $apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:179)
>
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:149)
>
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:149)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>
> at org.apache.spark.network.nio.NioBlockTransferService.org
> $apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:149)
>
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:68)
>
> at
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:68)
>
> at org.apache.spark.network.nio.ConnectionManager.org
> $apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:677)
>
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:515)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Shuffle compress was turned off, because I encountered parsing_error when
> with shuffle compress. Even after I set the native library path, I got
> errors when uncompress in snappy. With shuffle compress turned off, I still
> get message above in some of my nodes, and the others would have a message
> that saying ack is not received after 60s. Any one get some ideas? Thanks
> for your help!
>
>
>
> Thanks,
>
> Daoyuan Wang
>


Re: driver memory management

2014-09-28 Thread Reynold Xin
The storage fraction only limits the amount of memory used for storage. It
doesn't actually limit anything else. I.e you can use all the memory if you
want in collect.

On Sunday, September 28, 2014, Brad Miller 
wrote:

> Hi All,
>
> I am interested to collect() a large RDD so that I can run a learning
> algorithm on it.  I've noticed that when I don't increase
> SPARK_DRIVER_MEMORY I can run out of memory. I've also noticed that it
> looks like the same fraction of memory is reserved for storage on the
> driver as on the worker nodes, and that the web UI doesn't show any storage
> usage on the driver.  Since that memory is reserved for storage, it seems
> possible that it is not being used towards the collection of my RDD.
>
> Is there a way to configure the memory management (
> spark.storage.memoryFraction, spark.shuffle.memoryFraction) for the
> driver separately from the workers?
>
> Is there any reason to leave space for shuffle or storage on the driver?
> It seems like I never see either of these used on the web UI, although I
> may not be interpreting the UI correctly or my jobs may not trigger the use
> case.
>
> For context, I am using PySpark (so much of my processing happens outside
> of the allocated memory in Java) and running the Spark 1.1.0 release
> binaries.
>
> best,
> -Brad
>


Spark meetup on Oct 15 in NYC

2014-09-28 Thread Reynold Xin
Hi Spark users and developers,

Some of the most active Spark developers (including Matei Zaharia, Michael
Armbrust, Joseph Bradley, TD, Paco Nathan, and me) will be in NYC for
Strata NYC. We are working with the Spark NYC meetup group and Bloomberg to
host a meetup event. This might be the event with the highest committer to
user ratio in the history of user meetups. Look forward to meeting more
users in NYC.

You can sign up for that here:
http://www.meetup.com/Spark-NYC/events/209271842/

Cheers.


Re: collect on hadoopFile RDD returns wrong results

2014-09-18 Thread Reynold Xin
This is due to the HadoopRDD (and also the underlying Hadoop InputFormat)
reuse objects to avoid allocation. It is sort of tricky to fix. However, in
most cases you can clone the records to make sure you are not collecting
the same object over and over again.

https://issues.apache.org/jira/browse/SPARK-1018

http://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3ccaf_kkpzrq4otyqvwcoc6plaz9x9_sfo33u4ysatki5ptqoy...@mail.gmail.com%3E


On Thu, Sep 18, 2014 at 12:43 AM, vasiliy  wrote:

> i posted an example in previous post. Tested on spark 1.0.2, 1.2.0-SNAPSHOT
> and 1.1.0 for hadoop 2.4.0 on Windows and Linux servers with hortonworks
> hadoop 2.4 in local[4] mode. Any ideas about this spark behavior ?
>
>
> Akhil Das-2 wrote
> > Can you dump out a small piece of data? while doing rdd.collect and
> > rdd.foreach(println)
> >
> > Thanks
> > Best Regards
> >
> > On Wed, Sep 17, 2014 at 12:26 PM, vasiliy <
>
> > zadonskiyd@
>
> > > wrote:
> >
> >> it also appears in streaming hdfs fileStream
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail:
>
> > user-unsubscribe@.apache
>
> >> For additional commands, e-mail:
>
> > user-help@.apache
>
> >>
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14527.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Powered By Spark: Can you please add our org?

2014-07-08 Thread Reynold Xin
I added you to the list. Cheers.



On Mon, Jul 7, 2014 at 6:19 PM, Alex Gaudio  wrote:

> Hi,
>
> Sailthru is also using Spark.  Could you please add us to the Powered By
> Spark  
> page
> when you have a chance?
>
> Organization Name: Sailthru
> URL: www.sailthru.com
> Short Description: Our data science platform uses Spark to build
> predictive models and recommendation systems for marketing automation and
> personalization
>
>
> Thank you!
> Alex
>


Re: Comparative study

2014-07-08 Thread Reynold Xin
Not sure exactly what is happening but perhaps there are ways to
restructure your program for it to work better. Spark is definitely able to
handle much, much larger workloads.

I've personally run a workload that shuffled 300 TB of data. I've also ran
something that shuffled 5TB/node and stuffed my disks fairly full that the
file system is close to breaking.

We can definitely do a better job in Spark to make it output more
meaningful diagnosis and more robust with partitions of data that don't fit
in memory though. A lot of the work in the next few releases will be on
that.



On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> I'll respond for Dan.
>
> Our test dataset was a total of 10 GB of input data (full production
> dataset for this particular dataflow would be 60 GB roughly).
>
> I'm not sure what the size of the final output data was but I think it was
> on the order of 20 GBs for the given 10 GB of input data. Also, I can say
> that when we were experimenting with persist(DISK_ONLY), the size of all
> RDDs on disk was around 200 GB, which gives a sense of overall transient
> memory usage with no persistence.
>
> In terms of our test cluster, we had 15 nodes. Each node had 24 cores and
> 2 workers each. Each executor got 14 GB of memory.
>
> -Suren
>
>
>
> On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey 
> wrote:
>
>>  When you say "large data sets", how large?
>> Thanks
>>
>>
>> On 07/07/2014 01:39 PM, Daniel Siegmann wrote:
>>
>>  From a development perspective, I vastly prefer Spark to MapReduce. The
>> MapReduce API is very constrained; Spark's API feels much more natural to
>> me. Testing and local development is also very easy - creating a local
>> Spark context is trivial and it reads local files. For your unit tests you
>> can just have them create a local context and execute your flow with some
>> test data. Even better, you can do ad-hoc work in the Spark shell and if
>> you want that in your production code it will look exactly the same.
>>
>>  Unfortunately, the picture isn't so rosy when it gets to production. In
>> my experience, Spark simply doesn't scale to the volumes that MapReduce
>> will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
>> would be better, but I haven't had the opportunity to try them. I find jobs
>> tend to just hang forever for no apparent reason on large data sets (but
>> smaller than what I push through MapReduce).
>>
>>  I am hopeful the situation will improve - Spark is developing quickly -
>> but if you have large amounts of data you should proceed with caution.
>>
>>  Keep in mind there are some frameworks for Hadoop which can hide the
>> ugly MapReduce with something very similar in form to Spark's API; e.g.
>> Apache Crunch. So you might consider those as well.
>>
>>  (Note: the above is with Spark 1.0.0.)
>>
>>
>>
>> On Mon, Jul 7, 2014 at 11:07 AM, 
>> wrote:
>>
>>>  Hello Experts,
>>>
>>>
>>>
>>> I am doing some comparative study on the below:
>>>
>>>
>>>
>>> Spark vs Impala
>>>
>>> Spark vs MapREduce . Is it worth migrating from existing MR
>>> implementation to Spark?
>>>
>>>
>>>
>>>
>>>
>>> Please share your thoughts and expertise.
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Santosh
>>>
>>> --
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Where allowed
>>> by local law, electronic communications with Accenture and its affiliates,
>>> including e-mail and instant messaging (including content), may be scanned
>>> by our systems for the purposes of information security and assessment of
>>> internal compliance with Accenture policy.
>>>
>>> __
>>>
>>> www.accenture.com
>>>
>>
>>
>>
>> --
>>  Daniel Siegmann, Software Developer
>> Velos
>>  Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


openstack swift integration with Spark

2014-06-13 Thread Reynold Xin
If you are interested in openstack/swift integration with Spark, please
drop me a line. We are looking into improving the integration.

Thanks.


Re: Largest input data set observed for Spark.

2014-03-20 Thread Reynold Xin
I'm not really at liberty to discuss details of the job. It involves some
expensive aggregated statistics, and took 10 hours to complete (mostly
bottlenecked by network & io).





On Thu, Mar 20, 2014 at 11:12 AM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> Reynold,
>
> How complex was that job (I guess in terms of number of transforms and
> actions) and how long did that take to process?
>
> -Suren
>
>
>
> On Thu, Mar 20, 2014 at 2:08 PM, Reynold Xin  wrote:
>
> > Actually we just ran a job with 70TB+ compressed data on 28 worker nodes
> -
> > I didn't count the size of the uncompressed data, but I am guessing it is
> > somewhere between 200TB to 700TB.
> >
> >
> >
> > On Thu, Mar 20, 2014 at 12:23 AM, Usman Ghani 
> wrote:
> >
> > > All,
> > > What is the largest input data set y'all have come across that has been
> > > successfully processed in production using spark. Ball park?
> > >
> >
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>


<    1   2