Re:Re:Driver memory leak?

2015-04-29 Thread wyphao.2007
No, I am not collect  the result to driver,I sample send the result to kafka.


BTW, the image address are:
https://cloud.githubusercontent.com/assets/5170878/7389463/ac03bf34-eea0-11e4-9e6b-1d2fba170c1c.png
and 
https://cloud.githubusercontent.com/assets/5170878/7389480/c629d236-eea0-11e4-983a-dc5aa97c2554.png



At 2015-04-29 18:48:33,zhangxiongfei zhangxiongfei0...@163.com wrote:



The mount of memory that the driver consumes depends on your program logic,did 
you try to collect the result of Spark job?




At 2015-04-29 18:42:04, wyphao.2007 wyphao.2...@163.com wrote:

Hi, Dear developer, I am using Spark Streaming to read data from kafka, the 
program already run about 120 hours, but today the program failed because of 
driver's OOM as follow:


Container [pid=49133,containerID=container_1429773909253_0050_02_01] is 
running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical 
memory used; 3.2 GB of 50 GB virtual memory used. Killing container.


I set --driver-memory to 2g, In my mind, driver is responsibility for job 
scheduler and job monitor(Please correct me If I'm wrong), Why it using so much 
memory?


So I using jmap to monitor other program(already run about 48 hours): 
sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow:
the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb memory!


and I also using jmap to monitor other program(already run about 1 hours),  the 
result as follow:
the java.util.HashMap$Entry and java.lang.Long object doesn't using so many 
memory, But I found, as time goes by, the java.util.HashMap$Entry and 
java.lang.Long object will occupied more and more memory,
It is driver's memory leak question? or other reason?
Thanks
Best Regards

















RDD split into multiple RDDs

2015-04-29 Thread Sébastien Soubré-Lanabère
Hello,

I'm facing a problem with custom RDD transformations.

I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map
of RDD by key.

This would be great, for example, in order to process mllib clustering on V
values grouped by K.

I know I could do it using filter() on my RDD as many times I have keys,
but I'm afraid this would not be efficient (the entire RDD would be read
each time, right ?). Then, I could mapByPartition my RDD before filtering,
but the code is finally huge...

So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only,
but I cannot achieve my development.

Please, could you tell me first if this is really faisable, and then, could
you give me some pointers ?

Thank you,
Regards,
Sebastien


Re: RDD split into multiple RDDs

2015-04-29 Thread Juan Rodríguez Hortalá
Hi Daniel,

I understood Sébastien was talking having having a high number of keys, I
guess I was prejudiced by my own problem! :) Anyway I don't think you need
to use disk or a database to generate a RDD per key, you can use filter
which I guess would be more efficient because IO is avoided, especially if
the RDD was cached. For example:

// in the spark shell
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._
import scala.reflect.ClassTag

// generate a map from key to rdd of values
def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K],
vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
val keys = pairRDD.keys.distinct.collect
(for (k - keys) yield
k - (pairRDD filter(_._1 == k) values)
) toMap
}

// simple demo
val xs = sc.parallelize(1 to 1000)
val ixs = xs map(x = (x % 10, x))
val gs = groupByKeyToRDDs(ixs)
gs(1).collect

Just an idea.

Greetings,

Juan Rodriguez



2015-04-29 14:20 GMT+02:00 Daniel Darabos daniel.dara...@lynxanalytics.com
:

 Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method
 for saving the RDD into separate files by key in a single pass. Then you
 can read the files into separate RDDs.

 On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi Sébastien,

 I came with a similar problem some time ago, you can see the discussion in
 the Spark users mailing list at

 http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
 . My experience was that when you create too many RDDs the Spark scheduler
 gets stuck, so if you have many keys in the map you are creating you'll
 probably have problems. On the other hand, the latest example I proposed
 in
 that mailing thread was a batch job in which we start from a single RDD of
 time tagged data, transform the RDD in a list of RDD corresponding to
 generating windows according to the time tag of the records, and then
 apply
 a transformation of RDD to each window RDD, like for example KMeans.run of
 MLlib. This is very similar to what you propose.
 So in my humble opinion the approach of generating thousands of RDDs by
 filtering doesn't work, and a new RDD class should be implemented for
 this.
 I have never implemented a custom RDD, but if you want some help I would
 be
 happy to join you in this task


 Sebastien said nothing about thousands of keys. This is a valid problem
 even if you only have two different keys.

 Greetings,

 Juan



 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com
 :

  Hello,
 
  I'm facing a problem with custom RDD transformations.
 
  I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
 map
  of RDD by key.
 
  This would be great, for example, in order to process mllib clustering
 on V
  values grouped by K.
 
  I know I could do it using filter() on my RDD as many times I have keys,
  but I'm afraid this would not be efficient (the entire RDD would be read
  each time, right ?). Then, I could mapByPartition my RDD before
 filtering,
  but the code is finally huge...
 
  So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
  V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
 only,
  but I cannot achieve my development.
 
  Please, could you tell me first if this is really faisable, and then,
 could
  you give me some pointers ?
 
  Thank you,
  Regards,
  Sebastien
 





Re: RDD split into multiple RDDs

2015-04-29 Thread Daniel Darabos
Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method
for saving the RDD into separate files by key in a single pass. Then you
can read the files into separate RDDs.

On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi Sébastien,

 I came with a similar problem some time ago, you can see the discussion in
 the Spark users mailing list at

 http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
 . My experience was that when you create too many RDDs the Spark scheduler
 gets stuck, so if you have many keys in the map you are creating you'll
 probably have problems. On the other hand, the latest example I proposed in
 that mailing thread was a batch job in which we start from a single RDD of
 time tagged data, transform the RDD in a list of RDD corresponding to
 generating windows according to the time tag of the records, and then apply
 a transformation of RDD to each window RDD, like for example KMeans.run of
 MLlib. This is very similar to what you propose.
 So in my humble opinion the approach of generating thousands of RDDs by
 filtering doesn't work, and a new RDD class should be implemented for this.
 I have never implemented a custom RDD, but if you want some help I would be
 happy to join you in this task


Sebastien said nothing about thousands of keys. This is a valid problem
even if you only have two different keys.

Greetings,

 Juan



 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com:

  Hello,
 
  I'm facing a problem with custom RDD transformations.
 
  I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
 map
  of RDD by key.
 
  This would be great, for example, in order to process mllib clustering
 on V
  values grouped by K.
 
  I know I could do it using filter() on my RDD as many times I have keys,
  but I'm afraid this would not be efficient (the entire RDD would be read
  each time, right ?). Then, I could mapByPartition my RDD before
 filtering,
  but the code is finally huge...
 
  So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
  V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
 only,
  but I cannot achieve my development.
 
  Please, could you tell me first if this is really faisable, and then,
 could
  you give me some pointers ?
 
  Thank you,
  Regards,
  Sebastien
 



Spark SQL cannot tolerate regexp with BIGINT

2015-04-29 Thread lonely Feb
Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter
difference between HIVE and Spark SQL that our sql has a statement like:

select A from B where id regexp '^12345$'

in HIVE it works fine but in Spark SQL we got a:

java.lang.ClassCastException: java.lang.Long cannot be cast to
java.lang.String

Can this statement be handled with Spark SQL?


Re: Pandas' Shift in Dataframe

2015-04-29 Thread Nicholas Chammas
You can check JIRA for any existing plans. If there isn't any, then feel
free to create a JIRA and make the case there for why this would be a good
feature to add.

Nick

On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 Hi,
 Is there any plan to add the shift method from Pandas to Spark Dataframe,
 not that I think it's an easy task...

 c.f.

 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html

 Regards,

 Olivier.



Re: Pandas' Shift in Dataframe

2015-04-29 Thread Nicholas Chammas
I can't comment on the direction of the DataFrame API (that's more for
Reynold or Michael I guess), but I just wanted to point out that the JIRA
would be the recommended way to create a central place for discussing a
feature add like that.

Nick

On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 Hi Nicholas,
 yes I've already checked, and I've just created the
 https://issues.apache.org/jira/browse/SPARK-7247
 I'm not even sure why this would be a good feature to add except the fact
 that some of the data scientists I'm working with are using it, and it
 would be therefore useful for me to translate Pandas code to Spark...

 Isn't the goal of Spark Dataframe to allow all the features of Pandas/R
 Dataframe using Spark ?

 Regards,

 Olivier.

 Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com
 a écrit :

 You can check JIRA for any existing plans. If there isn't any, then feel
 free to create a JIRA and make the case there for why this would be a good
 feature to add.

 Nick

 On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi,
 Is there any plan to add the shift method from Pandas to Spark
 Dataframe,
 not that I think it's an easy task...

 c.f.

 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html

 Regards,

 Olivier.




Re: Pandas' Shift in Dataframe

2015-04-29 Thread Olivier Girardot
To give you a broader idea of the current use case, I have a few
transformations (sort and column creations) oriented towards a simple goal.
My data is timestamped and if two lines are identical, that time difference
will have to be more than X days in order to be kept, so there are a few
shifts done but very locally : only -1 or +1.

FYI regarding JIRA, i created one -
https://issues.apache.org/jira/browse/SPARK-7247 - associated to this
discussion.
@rxin considering, in my use case, the data is sorted beforehand, there
might be a better way - but I guess some shuffle would needed anyway...


Le mer. 29 avr. 2015 à 22:34, Evan R. Sparks evan.spa...@gmail.com a
écrit :

 In general there's a tension between ordered data and set-oriented data
 model underlying DataFrames. You can force a total ordering on the data,
 but it may come at a high cost with respect to performance.

 It would be good to get a sense of the use case you're trying to support,
 but one suggestion would be to apply I can imagine achieving a similar
 result by applying a datetime.timedelta (in Python terms) to a time
 attribute (your axis) and then performing join between the base table and
 this derived table to merge the data back together. This type of join could
 then be optimized if the use case is frequent enough to warrant it.

 - Evan

 On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote:

 In this case it's fine to discuss whether this would fit in Spark
 DataFrames' high level direction before putting it in JIRA. Otherwise we
 might end up creating a lot of tickets just for querying whether something
 might be a good idea.

 About this specific feature -- I'm not sure what it means in general given
 we don't have axis in Spark DataFrames. But I think it'd probably be good
 to be able to shift a column by one so we can support the end time / begin
 time case, although it'd require two passes over the data.



 On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

  I can't comment on the direction of the DataFrame API (that's more for
  Reynold or Michael I guess), but I just wanted to point out that the
 JIRA
  would be the recommended way to create a central place for discussing a
  feature add like that.
 
  Nick
 
  On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot 
  o.girar...@lateral-thoughts.com wrote:
 
   Hi Nicholas,
   yes I've already checked, and I've just created the
   https://issues.apache.org/jira/browse/SPARK-7247
   I'm not even sure why this would be a good feature to add except the
 fact
   that some of the data scientists I'm working with are using it, and it
   would be therefore useful for me to translate Pandas code to Spark...
  
   Isn't the goal of Spark Dataframe to allow all the features of
 Pandas/R
   Dataframe using Spark ?
  
   Regards,
  
   Olivier.
  
   Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas 
  nicholas.cham...@gmail.com
   a écrit :
  
   You can check JIRA for any existing plans. If there isn't any, then
 feel
   free to create a JIRA and make the case there for why this would be a
  good
   feature to add.
  
   Nick
  
   On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot 
   o.girar...@lateral-thoughts.com wrote:
  
   Hi,
   Is there any plan to add the shift method from Pandas to Spark
   Dataframe,
   not that I think it's an easy task...
  
   c.f.
  
  
 
 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html
  
   Regards,
  
   Olivier.
  
  
 





Re: Spark SQL cannot tolerate regexp with BIGINT

2015-04-29 Thread Olivier Girardot
I guess you can use cast(id as String) instead of just id in your where
clause ?

Le mer. 29 avr. 2015 à 12:13, lonely Feb lonely8...@gmail.com a écrit :

 Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter
 difference between HIVE and Spark SQL that our sql has a statement like:

 select A from B where id regexp '^12345$'

 in HIVE it works fine but in Spark SQL we got a:

 java.lang.ClassCastException: java.lang.Long cannot be cast to
 java.lang.String

 Can this statement be handled with Spark SQL?



Re: Using memory mapped file for shuffle

2015-04-29 Thread Sandy Ryza
Spark currently doesn't allocate any memory off of the heap for shuffle
objects.  When the in-memory data gets too large, it will write it out to a
file, and then merge spilled filed later.

What exactly do you mean by store shuffle data in HDFS?

-Sandy

On Tue, Apr 14, 2015 at 10:15 AM, Kannan Rajah kra...@maprtech.com wrote:

 Sandy,
 Can you clarify how it won't cause OOM? Is it anyway to related to memory
 being allocated outside the heap - native space? The reason I ask is that I
 have a use case to store shuffle data in HDFS. Since there is no notion of
 memory mapped files, I need to store it as a byte buffer. I want to make
 sure this will not cause OOM when the file size is large.


 --
 Kannan

 On Tue, Apr 14, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Kannan,

 Both in MapReduce and Spark, the amount of shuffle data a task produces
 can exceed the tasks memory without risk of OOM.

 -Sandy

 On Tue, Apr 14, 2015 at 6:47 AM, Imran Rashid iras...@cloudera.com
 wrote:

 That limit doesn't have anything to do with the amount of available
 memory.  Its just a tuning parameter, as one version is more efficient
 for
 smaller files, the other is better for bigger files.  I suppose the
 comment
 is a little better in FileSegmentManagedBuffer:


 https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62

 On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah kra...@maprtech.com
 wrote:

  DiskStore.getBytes uses memory mapped files if the length is more than
 a
  configured limit. This code path is used during map side shuffle in
  ExternalSorter. I want to know if its possible for the length to
 exceed the
  limit in the case of shuffle. The reason I ask is in the case of
 Hadoop,
  each map task is supposed to produce only data that can fit within the
  task's configured max memory. Otherwise it will result in OOM. Is the
  behavior same in Spark or the size of data generated by a map task can
  exceed what can be fitted in memory.
 
if (length  minMemoryMapBytes) {
  val buf = ByteBuffer.allocate(length.toInt)
  
} else {
  Some(channel.map(MapMode.READ_ONLY, offset, length))
}
 
  --
  Kannan
 






Re: Pandas' Shift in Dataframe

2015-04-29 Thread Reynold Xin
In this case it's fine to discuss whether this would fit in Spark
DataFrames' high level direction before putting it in JIRA. Otherwise we
might end up creating a lot of tickets just for querying whether something
might be a good idea.

About this specific feature -- I'm not sure what it means in general given
we don't have axis in Spark DataFrames. But I think it'd probably be good
to be able to shift a column by one so we can support the end time / begin
time case, although it'd require two passes over the data.



On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 I can't comment on the direction of the DataFrame API (that's more for
 Reynold or Michael I guess), but I just wanted to point out that the JIRA
 would be the recommended way to create a central place for discussing a
 feature add like that.

 Nick

 On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

  Hi Nicholas,
  yes I've already checked, and I've just created the
  https://issues.apache.org/jira/browse/SPARK-7247
  I'm not even sure why this would be a good feature to add except the fact
  that some of the data scientists I'm working with are using it, and it
  would be therefore useful for me to translate Pandas code to Spark...
 
  Isn't the goal of Spark Dataframe to allow all the features of Pandas/R
  Dataframe using Spark ?
 
  Regards,
 
  Olivier.
 
  Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas 
 nicholas.cham...@gmail.com
  a écrit :
 
  You can check JIRA for any existing plans. If there isn't any, then feel
  free to create a JIRA and make the case there for why this would be a
 good
  feature to add.
 
  Nick
 
  On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot 
  o.girar...@lateral-thoughts.com wrote:
 
  Hi,
  Is there any plan to add the shift method from Pandas to Spark
  Dataframe,
  not that I think it's an easy task...
 
  c.f.
 
 
 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html
 
  Regards,
 
  Olivier.
 
 



Re: Pandas' Shift in Dataframe

2015-04-29 Thread Evan R. Sparks
In general there's a tension between ordered data and set-oriented data
model underlying DataFrames. You can force a total ordering on the data,
but it may come at a high cost with respect to performance.

It would be good to get a sense of the use case you're trying to support,
but one suggestion would be to apply I can imagine achieving a similar
result by applying a datetime.timedelta (in Python terms) to a time
attribute (your axis) and then performing join between the base table and
this derived table to merge the data back together. This type of join could
then be optimized if the use case is frequent enough to warrant it.

- Evan

On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote:

 In this case it's fine to discuss whether this would fit in Spark
 DataFrames' high level direction before putting it in JIRA. Otherwise we
 might end up creating a lot of tickets just for querying whether something
 might be a good idea.

 About this specific feature -- I'm not sure what it means in general given
 we don't have axis in Spark DataFrames. But I think it'd probably be good
 to be able to shift a column by one so we can support the end time / begin
 time case, although it'd require two passes over the data.



 On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

  I can't comment on the direction of the DataFrame API (that's more for
  Reynold or Michael I guess), but I just wanted to point out that the JIRA
  would be the recommended way to create a central place for discussing a
  feature add like that.
 
  Nick
 
  On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot 
  o.girar...@lateral-thoughts.com wrote:
 
   Hi Nicholas,
   yes I've already checked, and I've just created the
   https://issues.apache.org/jira/browse/SPARK-7247
   I'm not even sure why this would be a good feature to add except the
 fact
   that some of the data scientists I'm working with are using it, and it
   would be therefore useful for me to translate Pandas code to Spark...
  
   Isn't the goal of Spark Dataframe to allow all the features of Pandas/R
   Dataframe using Spark ?
  
   Regards,
  
   Olivier.
  
   Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas 
  nicholas.cham...@gmail.com
   a écrit :
  
   You can check JIRA for any existing plans. If there isn't any, then
 feel
   free to create a JIRA and make the case there for why this would be a
  good
   feature to add.
  
   Nick
  
   On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot 
   o.girar...@lateral-thoughts.com wrote:
  
   Hi,
   Is there any plan to add the shift method from Pandas to Spark
   Dataframe,
   not that I think it's an easy task...
  
   c.f.
  
  
 
 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html
  
   Regards,
  
   Olivier.
  
  
 



Re: [discuss] DataFrame function namespacing

2015-04-29 Thread Reynold Xin
Scaladoc isn't much of a problem because scaladocs are grouped. Java/Python
is the main problem ...

See
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

On Wed, Apr 29, 2015 at 3:38 PM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 My feeling is that we should have a handful of namespaces (say 4 or 5). It
 becomes too cumbersome to import / remember more package names and having
 everything in one package makes it hard to read scaladoc etc.

 Thanks
 Shivaram

 On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com wrote:

 To add a little bit more context, some pros/cons I can think of are:

 Option 1: Very easy for users to find the function, since they are all in
 org.apache.spark.sql.functions. However, there will be quite a large
 number
 of them.

 Option 2: I can't tell why we would want this one over Option 3, since it
 has all the problems of Option 3, and not as nice of a hierarchy.

 Option 3: Opposite of Option 1. Each package or static class has a small
 number of functions that are relevant to each other, but for some
 functions
 it is unclear where they should go (e.g. should min go into basic or
 math?)




 On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote:

  Before we make DataFrame non-alpha, it would be great to decide how we
  want to namespace all the functions. There are 3 alternatives:
 
  1. Put all in org.apache.spark.sql.functions. This is how SQL does it,
  since SQL doesn't have namespaces. I estimate eventually we will have ~
 200
  functions.
 
  2. Have explicit namespaces, which is what master branch currently looks
  like:
 
  - org.apache.spark.sql.functions
  - org.apache.spark.sql.mathfunctions
  - ...
 
  3. Have explicit namespaces, but restructure them slightly so everything
  is under functions.
 
  package object functions {
 
// all the old functions here -- but deprecated so we keep source
  compatibility
def ...
  }
 
  package org.apache.spark.sql.functions
 
  object mathFunc {
...
  }
 
  object basicFuncs {
...
  }
 
 
 





Re: Tungsten + Flink

2015-04-29 Thread Sree V
I agree, Ewan.
We should also look into combining both Flink and Spark into one.This eases the 
industry adaptation instead.

Thanking you.

With Regards
Sree 


 On Wednesday, April 29, 2015 3:21 AM, Ewan Higgs ewan.hi...@ugent.be 
wrote:
   

 Hi all,
A quick question about Tungsten. The announcement of the Tungsten 
project is on the back of Hadoop Summit in Brussels where some of the 
Flink devs were giving talks [1] on how Flink manages memory using byte 
arrays and the like to avoid the overhead of all the Java types[2]. Is 
there an opportunity for code reuse here? Spark and Flink may have 
different needs in some respects, but they work fundamentally towards 
the same goal so I imagine there could be come worthwhile collaboration.

-Ewan

[1] http://2015.hadoopsummit.org/brussels/speaker/?speaker=MrtonBalassi
http://2015.hadoopsummit.org/brussels/speaker/?speaker=AljoschaKrettek

[2] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



   

Re: [discuss] DataFrame function namespacing

2015-04-29 Thread Reynold Xin
To add a little bit more context, some pros/cons I can think of are:

Option 1: Very easy for users to find the function, since they are all in
org.apache.spark.sql.functions. However, there will be quite a large number
of them.

Option 2: I can't tell why we would want this one over Option 3, since it
has all the problems of Option 3, and not as nice of a hierarchy.

Option 3: Opposite of Option 1. Each package or static class has a small
number of functions that are relevant to each other, but for some functions
it is unclear where they should go (e.g. should min go into basic or
math?)




On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote:

 Before we make DataFrame non-alpha, it would be great to decide how we
 want to namespace all the functions. There are 3 alternatives:

 1. Put all in org.apache.spark.sql.functions. This is how SQL does it,
 since SQL doesn't have namespaces. I estimate eventually we will have ~ 200
 functions.

 2. Have explicit namespaces, which is what master branch currently looks
 like:

 - org.apache.spark.sql.functions
 - org.apache.spark.sql.mathfunctions
 - ...

 3. Have explicit namespaces, but restructure them slightly so everything
 is under functions.

 package object functions {

   // all the old functions here -- but deprecated so we keep source
 compatibility
   def ...
 }

 package org.apache.spark.sql.functions

 object mathFunc {
   ...
 }

 object basicFuncs {
   ...
 }





Re: Spark SQL cannot tolerate regexp with BIGINT

2015-04-29 Thread Reynold Xin
Actually I'm doing some cleanups related to type coercion, and I will take
care of this.


On Wed, Apr 29, 2015 at 5:10 PM, lonely Feb lonely8...@gmail.com wrote:

 OK, I'll try.
 On Apr 30, 2015 06:54, Reynold Xin r...@databricks.com wrote:

 We added ExpectedInputConversion rule recently in analysis:
 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L647

 With this rule, the analyzer automatically adds cast for expressions that
 inherit ExpectsInputTypes. We can make all string functions inherit
 ExpectsInputTypes and specify input types, so the casts are added
 automatically. Would you like to submit a PR?



 On Wed, Apr 29, 2015 at 2:06 PM, Olivier Girardot ssab...@gmail.com
 wrote:

 I guess you can use cast(id as String) instead of just id in your where
 clause ?

 Le mer. 29 avr. 2015 à 12:13, lonely Feb lonely8...@gmail.com a écrit
 :

  Hi all, we are transfer our HIVE job into SparkSQL, but we found a
 litter
  difference between HIVE and Spark SQL that our sql has a statement
 like:
 
  select A from B where id regexp '^12345$'
 
  in HIVE it works fine but in Spark SQL we got a:
 
  java.lang.ClassCastException: java.lang.Long cannot be cast to
  java.lang.String
 
  Can this statement be handled with Spark SQL?
 





[discuss] DataFrame function namespacing

2015-04-29 Thread Reynold Xin
Before we make DataFrame non-alpha, it would be great to decide how we want
to namespace all the functions. There are 3 alternatives:

1. Put all in org.apache.spark.sql.functions. This is how SQL does it,
since SQL doesn't have namespaces. I estimate eventually we will have ~ 200
functions.

2. Have explicit namespaces, which is what master branch currently looks
like:

- org.apache.spark.sql.functions
- org.apache.spark.sql.mathfunctions
- ...

3. Have explicit namespaces, but restructure them slightly so everything is
under functions.

package object functions {

  // all the old functions here -- but deprecated so we keep source
compatibility
  def ...
}

package org.apache.spark.sql.functions

object mathFunc {
  ...
}

object basicFuncs {
  ...
}


Re: [discuss] DataFrame function namespacing

2015-04-29 Thread Shivaram Venkataraman
My feeling is that we should have a handful of namespaces (say 4 or 5). It
becomes too cumbersome to import / remember more package names and having
everything in one package makes it hard to read scaladoc etc.

Thanks
Shivaram

On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com wrote:

 To add a little bit more context, some pros/cons I can think of are:

 Option 1: Very easy for users to find the function, since they are all in
 org.apache.spark.sql.functions. However, there will be quite a large number
 of them.

 Option 2: I can't tell why we would want this one over Option 3, since it
 has all the problems of Option 3, and not as nice of a hierarchy.

 Option 3: Opposite of Option 1. Each package or static class has a small
 number of functions that are relevant to each other, but for some functions
 it is unclear where they should go (e.g. should min go into basic or
 math?)




 On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote:

  Before we make DataFrame non-alpha, it would be great to decide how we
  want to namespace all the functions. There are 3 alternatives:
 
  1. Put all in org.apache.spark.sql.functions. This is how SQL does it,
  since SQL doesn't have namespaces. I estimate eventually we will have ~
 200
  functions.
 
  2. Have explicit namespaces, which is what master branch currently looks
  like:
 
  - org.apache.spark.sql.functions
  - org.apache.spark.sql.mathfunctions
  - ...
 
  3. Have explicit namespaces, but restructure them slightly so everything
  is under functions.
 
  package object functions {
 
// all the old functions here -- but deprecated so we keep source
  compatibility
def ...
  }
 
  package org.apache.spark.sql.functions
 
  object mathFunc {
...
  }
 
  object basicFuncs {
...
  }
 
 
 



Re: RDD split into multiple RDDs

2015-04-29 Thread Sébastien Soubré-Lanabère
Hi Juan, Daniel,

thank you for your explanations. Indeed, I don't have a big number of keys,
at least not enough to stuck the scheduler.

I was using a method quite similar as what you post, Juan, and yes it
works, but I think this would be more efficient to not call filter on each
key. So, I was thinking something like :
- get the iterator of the KV rdd
- distribute each value into a subset by key and then recreate a rdd from
this subset

Because spark context parallelize method cannot be used inside a
transformation, I wonder if I could do it by creating a custom RDD and then
try to implement something like PairRDDFunctions.lookup method, but
remplacing Seq[V] of course by a RDD

def lookup(key: K): Seq[V] = {
self.partitioner match {
  case Some(p) =
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) = {
  val buf = new ArrayBuffer[V]
  for (pair - it if pair._1 == key) {
buf += pair._2
  }
  buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index), false)
res(0)
  case None =
self.filter(_._1 == key).map(_._2).collect()
}
  }


2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com:

 Hi Daniel,

 I understood Sébastien was talking having having a high number of keys, I
 guess I was prejudiced by my own problem! :) Anyway I don't think you need
 to use disk or a database to generate a RDD per key, you can use filter
 which I guess would be more efficient because IO is avoided, especially if
 the RDD was cached. For example:

 // in the spark shell
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.RDD._
 import scala.reflect.ClassTag

 // generate a map from key to rdd of values
 def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt:
 ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
 val keys = pairRDD.keys.distinct.collect
 (for (k - keys) yield
 k - (pairRDD filter(_._1 == k) values)
 ) toMap
 }

 // simple demo
 val xs = sc.parallelize(1 to 1000)
 val ixs = xs map(x = (x % 10, x))
 val gs = groupByKeyToRDDs(ixs)
 gs(1).collect

 Just an idea.

 Greetings,

 Juan Rodriguez



 2015-04-29 14:20 GMT+02:00 Daniel Darabos 
 daniel.dara...@lynxanalytics.com:

 Check out http://stackoverflow.com/a/26051042/3318517. It's a nice
 method for saving the RDD into separate files by key in a single pass. Then
 you can read the files into separate RDDs.

 On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi Sébastien,

 I came with a similar problem some time ago, you can see the discussion
 in
 the Spark users mailing list at

 http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
 . My experience was that when you create too many RDDs the Spark
 scheduler
 gets stuck, so if you have many keys in the map you are creating you'll
 probably have problems. On the other hand, the latest example I proposed
 in
 that mailing thread was a batch job in which we start from a single RDD
 of
 time tagged data, transform the RDD in a list of RDD corresponding to
 generating windows according to the time tag of the records, and then
 apply
 a transformation of RDD to each window RDD, like for example KMeans.run
 of
 MLlib. This is very similar to what you propose.
 So in my humble opinion the approach of generating thousands of RDDs by
 filtering doesn't work, and a new RDD class should be implemented for
 this.
 I have never implemented a custom RDD, but if you want some help I would
 be
 happy to join you in this task


 Sebastien said nothing about thousands of keys. This is a valid problem
 even if you only have two different keys.

 Greetings,

 Juan



 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com
 :

  Hello,
 
  I'm facing a problem with custom RDD transformations.
 
  I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
 map
  of RDD by key.
 
  This would be great, for example, in order to process mllib clustering
 on V
  values grouped by K.
 
  I know I could do it using filter() on my RDD as many times I have
 keys,
  but I'm afraid this would not be efficient (the entire RDD would be
 read
  each time, right ?). Then, I could mapByPartition my RDD before
 filtering,
  but the code is finally huge...
 
  So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
  V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
 only,
  but I cannot achieve my development.
 
  Please, could you tell me first if this is really faisable, and then,
 could
  you give me some pointers ?
 
  Thank you,
  Regards,
  Sebastien
 






Re: Plans for upgrading Hive dependency?

2015-04-29 Thread Michael Armbrust
I am working on it.  Here is the (very rough) version:
https://github.com/apache/spark/compare/apache:master...marmbrus:multiHiveVersions

On Mon, Apr 27, 2015 at 1:03 PM, Punyashloka Biswal punya.bis...@gmail.com
wrote:

 Thanks Marcelo and Patrick - I don't know how I missed that ticket in my
 Jira search earlier. Is anybody working on the sub-issues yet, or is there
 a design doc I should look at before taking a stab?

 Regards,
 Punya

 On Mon, Apr 27, 2015 at 3:56 PM Patrick Wendell pwend...@gmail.com
 wrote:

  Hey Punya,
 
  There is some ongoing work to help make Hive upgrades more manageable
  and allow us to support multiple versions of Hive. Once we do that, it
  will be much easier for us to upgrade.
 
  https://issues.apache.org/jira/browse/SPARK-6906
 
  - Patrick
 
  On Mon, Apr 27, 2015 at 12:47 PM, Marcelo Vanzin van...@cloudera.com
  wrote:
   That's a lot more complicated than you might think.
  
   We've done some basic work to get HiveContext to compile against Hive
   1.1.0. Here's the code:
  
 
 https://github.com/cloudera/spark/commit/00e2c7e35d4ac236bcfbcd3d2805b483060255ec
  
   We didn't sent that upstream because that only solves half of the
   problem; the hive-thriftserver is disabled in our CDH build because it
   uses a lot of Hive APIs that have been removed in 1.1.0, so even
   getting it to compile is really complicated.
  
   If there's interest in getting the HiveContext part fixed up I can
   send a PR for that code. But at this time I don't really have plans to
   look at the thrift server.
  
  
   On Mon, Apr 27, 2015 at 11:58 AM, Punyashloka Biswal
   punya.bis...@gmail.com wrote:
   Dear Spark devs,
  
   Is there a plan for staying up-to-date with current (and future)
  versions
   of Hive? Spark currently supports version 0.13 (June 2014), but the
  latest
   version of Hive is 1.1.0 (March 2015). I don't see any Jira tickets
  about
   updating beyond 0.13, so I was wondering if this was intentional or it
  was
   just that nobody had started work on this yet.
  
   I'd be happy to work on a PR for the upgrade if one of the core
  developers
   can tell me what pitfalls to watch out for.
  
   Punya
  
  
  
   --
   Marcelo
  
   -
   To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
   For additional commands, e-mail: dev-h...@spark.apache.org
  
 



Re: [discuss] DataFrame function namespacing

2015-04-29 Thread Reynold Xin
We definitely still have the name collision problem in SQL.

On Wed, Apr 29, 2015 at 10:01 PM, Punyashloka Biswal punya.bis...@gmail.com
 wrote:

 Do we still have to keep the names of the functions distinct to avoid
 collisions in SQL? Or is there a plan to allow importing a namespace into
 SQL somehow?

 I ask because if we have to keep worrying about name collisions then I'm
 not sure what the added complexity of #2 and #3 buys us.

 Punya

 On Wed, Apr 29, 2015 at 3:52 PM Reynold Xin r...@databricks.com wrote:

 Scaladoc isn't much of a problem because scaladocs are grouped.
 Java/Python
 is the main problem ...

 See

 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

 On Wed, Apr 29, 2015 at 3:38 PM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

  My feeling is that we should have a handful of namespaces (say 4 or 5).
 It
  becomes too cumbersome to import / remember more package names and
 having
  everything in one package makes it hard to read scaladoc etc.
 
  Thanks
  Shivaram
 
  On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com
 wrote:
 
  To add a little bit more context, some pros/cons I can think of are:
 
  Option 1: Very easy for users to find the function, since they are all
 in
  org.apache.spark.sql.functions. However, there will be quite a large
  number
  of them.
 
  Option 2: I can't tell why we would want this one over Option 3, since
 it
  has all the problems of Option 3, and not as nice of a hierarchy.
 
  Option 3: Opposite of Option 1. Each package or static class has a
 small
  number of functions that are relevant to each other, but for some
  functions
  it is unclear where they should go (e.g. should min go into basic or
  math?)
 
 
 
 
  On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com
 wrote:
 
   Before we make DataFrame non-alpha, it would be great to decide how
 we
   want to namespace all the functions. There are 3 alternatives:
  
   1. Put all in org.apache.spark.sql.functions. This is how SQL does
 it,
   since SQL doesn't have namespaces. I estimate eventually we will
 have ~
  200
   functions.
  
   2. Have explicit namespaces, which is what master branch currently
 looks
   like:
  
   - org.apache.spark.sql.functions
   - org.apache.spark.sql.mathfunctions
   - ...
  
   3. Have explicit namespaces, but restructure them slightly so
 everything
   is under functions.
  
   package object functions {
  
 // all the old functions here -- but deprecated so we keep source
   compatibility
 def ...
   }
  
   package org.apache.spark.sql.functions
  
   object mathFunc {
 ...
   }
  
   object basicFuncs {
 ...
   }
  
  
  
 
 
 




Event generator for SPARK-Streaming from csv

2015-04-29 Thread anshu shukla
I have the real DEBS-TAxi data in csv file , in order to operate over it
how to simulate a Spout kind  of thing as event generator using the
timestamps in CSV file.

-- 
SERC-IISC
Thanks  Regards,
Anshu Shukla