Re:Re:Driver memory leak?
No, I am not collect the result to driver,I sample send the result to kafka. BTW, the image address are: https://cloud.githubusercontent.com/assets/5170878/7389463/ac03bf34-eea0-11e4-9e6b-1d2fba170c1c.png and https://cloud.githubusercontent.com/assets/5170878/7389480/c629d236-eea0-11e4-983a-dc5aa97c2554.png At 2015-04-29 18:48:33,zhangxiongfei zhangxiongfei0...@163.com wrote: The mount of memory that the driver consumes depends on your program logic,did you try to collect the result of Spark job? At 2015-04-29 18:42:04, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
RDD split into multiple RDDs
Hello, I'm facing a problem with custom RDD transformations. I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map of RDD by key. This would be great, for example, in order to process mllib clustering on V values grouped by K. I know I could do it using filter() on my RDD as many times I have keys, but I'm afraid this would not be efficient (the entire RDD would be read each time, right ?). Then, I could mapByPartition my RDD before filtering, but the code is finally huge... So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only, but I cannot achieve my development. Please, could you tell me first if this is really faisable, and then, could you give me some pointers ? Thank you, Regards, Sebastien
Re: RDD split into multiple RDDs
Hi Daniel, I understood Sébastien was talking having having a high number of keys, I guess I was prejudiced by my own problem! :) Anyway I don't think you need to use disk or a database to generate a RDD per key, you can use filter which I guess would be more efficient because IO is avoided, especially if the RDD was cached. For example: // in the spark shell import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD._ import scala.reflect.ClassTag // generate a map from key to rdd of values def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = { val keys = pairRDD.keys.distinct.collect (for (k - keys) yield k - (pairRDD filter(_._1 == k) values) ) toMap } // simple demo val xs = sc.parallelize(1 to 1000) val ixs = xs map(x = (x % 10, x)) val gs = groupByKeyToRDDs(ixs) gs(1).collect Just an idea. Greetings, Juan Rodriguez 2015-04-29 14:20 GMT+02:00 Daniel Darabos daniel.dara...@lynxanalytics.com : Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method for saving the RDD into separate files by key in a single pass. Then you can read the files into separate RDDs. On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Sébastien, I came with a similar problem some time ago, you can see the discussion in the Spark users mailing list at http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results . My experience was that when you create too many RDDs the Spark scheduler gets stuck, so if you have many keys in the map you are creating you'll probably have problems. On the other hand, the latest example I proposed in that mailing thread was a batch job in which we start from a single RDD of time tagged data, transform the RDD in a list of RDD corresponding to generating windows according to the time tag of the records, and then apply a transformation of RDD to each window RDD, like for example KMeans.run of MLlib. This is very similar to what you propose. So in my humble opinion the approach of generating thousands of RDDs by filtering doesn't work, and a new RDD class should be implemented for this. I have never implemented a custom RDD, but if you want some help I would be happy to join you in this task Sebastien said nothing about thousands of keys. This is a valid problem even if you only have two different keys. Greetings, Juan 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com : Hello, I'm facing a problem with custom RDD transformations. I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map of RDD by key. This would be great, for example, in order to process mllib clustering on V values grouped by K. I know I could do it using filter() on my RDD as many times I have keys, but I'm afraid this would not be efficient (the entire RDD would be read each time, right ?). Then, I could mapByPartition my RDD before filtering, but the code is finally huge... So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only, but I cannot achieve my development. Please, could you tell me first if this is really faisable, and then, could you give me some pointers ? Thank you, Regards, Sebastien
Re: RDD split into multiple RDDs
Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method for saving the RDD into separate files by key in a single pass. Then you can read the files into separate RDDs. On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Sébastien, I came with a similar problem some time ago, you can see the discussion in the Spark users mailing list at http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results . My experience was that when you create too many RDDs the Spark scheduler gets stuck, so if you have many keys in the map you are creating you'll probably have problems. On the other hand, the latest example I proposed in that mailing thread was a batch job in which we start from a single RDD of time tagged data, transform the RDD in a list of RDD corresponding to generating windows according to the time tag of the records, and then apply a transformation of RDD to each window RDD, like for example KMeans.run of MLlib. This is very similar to what you propose. So in my humble opinion the approach of generating thousands of RDDs by filtering doesn't work, and a new RDD class should be implemented for this. I have never implemented a custom RDD, but if you want some help I would be happy to join you in this task Sebastien said nothing about thousands of keys. This is a valid problem even if you only have two different keys. Greetings, Juan 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com: Hello, I'm facing a problem with custom RDD transformations. I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map of RDD by key. This would be great, for example, in order to process mllib clustering on V values grouped by K. I know I could do it using filter() on my RDD as many times I have keys, but I'm afraid this would not be efficient (the entire RDD would be read each time, right ?). Then, I could mapByPartition my RDD before filtering, but the code is finally huge... So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only, but I cannot achieve my development. Please, could you tell me first if this is really faisable, and then, could you give me some pointers ? Thank you, Regards, Sebastien
Spark SQL cannot tolerate regexp with BIGINT
Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter difference between HIVE and Spark SQL that our sql has a statement like: select A from B where id regexp '^12345$' in HIVE it works fine but in Spark SQL we got a: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String Can this statement be handled with Spark SQL?
Re: Pandas' Shift in Dataframe
You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Pandas' Shift in Dataframe
I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Pandas' Shift in Dataframe
To give you a broader idea of the current use case, I have a few transformations (sort and column creations) oriented towards a simple goal. My data is timestamped and if two lines are identical, that time difference will have to be more than X days in order to be kept, so there are a few shifts done but very locally : only -1 or +1. FYI regarding JIRA, i created one - https://issues.apache.org/jira/browse/SPARK-7247 - associated to this discussion. @rxin considering, in my use case, the data is sorted beforehand, there might be a better way - but I guess some shuffle would needed anyway... Le mer. 29 avr. 2015 à 22:34, Evan R. Sparks evan.spa...@gmail.com a écrit : In general there's a tension between ordered data and set-oriented data model underlying DataFrames. You can force a total ordering on the data, but it may come at a high cost with respect to performance. It would be good to get a sense of the use case you're trying to support, but one suggestion would be to apply I can imagine achieving a similar result by applying a datetime.timedelta (in Python terms) to a time attribute (your axis) and then performing join between the base table and this derived table to merge the data back together. This type of join could then be optimized if the use case is frequent enough to warrant it. - Evan On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote: In this case it's fine to discuss whether this would fit in Spark DataFrames' high level direction before putting it in JIRA. Otherwise we might end up creating a lot of tickets just for querying whether something might be a good idea. About this specific feature -- I'm not sure what it means in general given we don't have axis in Spark DataFrames. But I think it'd probably be good to be able to shift a column by one so we can support the end time / begin time case, although it'd require two passes over the data. On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Spark SQL cannot tolerate regexp with BIGINT
I guess you can use cast(id as String) instead of just id in your where clause ? Le mer. 29 avr. 2015 à 12:13, lonely Feb lonely8...@gmail.com a écrit : Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter difference between HIVE and Spark SQL that our sql has a statement like: select A from B where id regexp '^12345$' in HIVE it works fine but in Spark SQL we got a: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String Can this statement be handled with Spark SQL?
Re: Using memory mapped file for shuffle
Spark currently doesn't allocate any memory off of the heap for shuffle objects. When the in-memory data gets too large, it will write it out to a file, and then merge spilled filed later. What exactly do you mean by store shuffle data in HDFS? -Sandy On Tue, Apr 14, 2015 at 10:15 AM, Kannan Rajah kra...@maprtech.com wrote: Sandy, Can you clarify how it won't cause OOM? Is it anyway to related to memory being allocated outside the heap - native space? The reason I ask is that I have a use case to store shuffle data in HDFS. Since there is no notion of memory mapped files, I need to store it as a byte buffer. I want to make sure this will not cause OOM when the file size is large. -- Kannan On Tue, Apr 14, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Kannan, Both in MapReduce and Spark, the amount of shuffle data a task produces can exceed the tasks memory without risk of OOM. -Sandy On Tue, Apr 14, 2015 at 6:47 AM, Imran Rashid iras...@cloudera.com wrote: That limit doesn't have anything to do with the amount of available memory. Its just a tuning parameter, as one version is more efficient for smaller files, the other is better for bigger files. I suppose the comment is a little better in FileSegmentManagedBuffer: https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L62 On Tue, Apr 14, 2015 at 12:01 AM, Kannan Rajah kra...@maprtech.com wrote: DiskStore.getBytes uses memory mapped files if the length is more than a configured limit. This code path is used during map side shuffle in ExternalSorter. I want to know if its possible for the length to exceed the limit in the case of shuffle. The reason I ask is in the case of Hadoop, each map task is supposed to produce only data that can fit within the task's configured max memory. Otherwise it will result in OOM. Is the behavior same in Spark or the size of data generated by a map task can exceed what can be fitted in memory. if (length minMemoryMapBytes) { val buf = ByteBuffer.allocate(length.toInt) } else { Some(channel.map(MapMode.READ_ONLY, offset, length)) } -- Kannan
Re: Pandas' Shift in Dataframe
In this case it's fine to discuss whether this would fit in Spark DataFrames' high level direction before putting it in JIRA. Otherwise we might end up creating a lot of tickets just for querying whether something might be a good idea. About this specific feature -- I'm not sure what it means in general given we don't have axis in Spark DataFrames. But I think it'd probably be good to be able to shift a column by one so we can support the end time / begin time case, although it'd require two passes over the data. On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Pandas' Shift in Dataframe
In general there's a tension between ordered data and set-oriented data model underlying DataFrames. You can force a total ordering on the data, but it may come at a high cost with respect to performance. It would be good to get a sense of the use case you're trying to support, but one suggestion would be to apply I can imagine achieving a similar result by applying a datetime.timedelta (in Python terms) to a time attribute (your axis) and then performing join between the base table and this derived table to merge the data back together. This type of join could then be optimized if the use case is frequent enough to warrant it. - Evan On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote: In this case it's fine to discuss whether this would fit in Spark DataFrames' high level direction before putting it in JIRA. Otherwise we might end up creating a lot of tickets just for querying whether something might be a good idea. About this specific feature -- I'm not sure what it means in general given we don't have axis in Spark DataFrames. But I think it'd probably be good to be able to shift a column by one so we can support the end time / begin time case, although it'd require two passes over the data. On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: [discuss] DataFrame function namespacing
Scaladoc isn't much of a problem because scaladocs are grouped. Java/Python is the main problem ... See https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ On Wed, Apr 29, 2015 at 3:38 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: My feeling is that we should have a handful of namespaces (say 4 or 5). It becomes too cumbersome to import / remember more package names and having everything in one package makes it hard to read scaladoc etc. Thanks Shivaram On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com wrote: To add a little bit more context, some pros/cons I can think of are: Option 1: Very easy for users to find the function, since they are all in org.apache.spark.sql.functions. However, there will be quite a large number of them. Option 2: I can't tell why we would want this one over Option 3, since it has all the problems of Option 3, and not as nice of a hierarchy. Option 3: Opposite of Option 1. Each package or static class has a small number of functions that are relevant to each other, but for some functions it is unclear where they should go (e.g. should min go into basic or math?) On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote: Before we make DataFrame non-alpha, it would be great to decide how we want to namespace all the functions. There are 3 alternatives: 1. Put all in org.apache.spark.sql.functions. This is how SQL does it, since SQL doesn't have namespaces. I estimate eventually we will have ~ 200 functions. 2. Have explicit namespaces, which is what master branch currently looks like: - org.apache.spark.sql.functions - org.apache.spark.sql.mathfunctions - ... 3. Have explicit namespaces, but restructure them slightly so everything is under functions. package object functions { // all the old functions here -- but deprecated so we keep source compatibility def ... } package org.apache.spark.sql.functions object mathFunc { ... } object basicFuncs { ... }
Re: Tungsten + Flink
I agree, Ewan. We should also look into combining both Flink and Spark into one.This eases the industry adaptation instead. Thanking you. With Regards Sree On Wednesday, April 29, 2015 3:21 AM, Ewan Higgs ewan.hi...@ugent.be wrote: Hi all, A quick question about Tungsten. The announcement of the Tungsten project is on the back of Hadoop Summit in Brussels where some of the Flink devs were giving talks [1] on how Flink manages memory using byte arrays and the like to avoid the overhead of all the Java types[2]. Is there an opportunity for code reuse here? Spark and Flink may have different needs in some respects, but they work fundamentally towards the same goal so I imagine there could be come worthwhile collaboration. -Ewan [1] http://2015.hadoopsummit.org/brussels/speaker/?speaker=MrtonBalassi http://2015.hadoopsummit.org/brussels/speaker/?speaker=AljoschaKrettek [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [discuss] DataFrame function namespacing
To add a little bit more context, some pros/cons I can think of are: Option 1: Very easy for users to find the function, since they are all in org.apache.spark.sql.functions. However, there will be quite a large number of them. Option 2: I can't tell why we would want this one over Option 3, since it has all the problems of Option 3, and not as nice of a hierarchy. Option 3: Opposite of Option 1. Each package or static class has a small number of functions that are relevant to each other, but for some functions it is unclear where they should go (e.g. should min go into basic or math?) On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote: Before we make DataFrame non-alpha, it would be great to decide how we want to namespace all the functions. There are 3 alternatives: 1. Put all in org.apache.spark.sql.functions. This is how SQL does it, since SQL doesn't have namespaces. I estimate eventually we will have ~ 200 functions. 2. Have explicit namespaces, which is what master branch currently looks like: - org.apache.spark.sql.functions - org.apache.spark.sql.mathfunctions - ... 3. Have explicit namespaces, but restructure them slightly so everything is under functions. package object functions { // all the old functions here -- but deprecated so we keep source compatibility def ... } package org.apache.spark.sql.functions object mathFunc { ... } object basicFuncs { ... }
Re: Spark SQL cannot tolerate regexp with BIGINT
Actually I'm doing some cleanups related to type coercion, and I will take care of this. On Wed, Apr 29, 2015 at 5:10 PM, lonely Feb lonely8...@gmail.com wrote: OK, I'll try. On Apr 30, 2015 06:54, Reynold Xin r...@databricks.com wrote: We added ExpectedInputConversion rule recently in analysis: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L647 With this rule, the analyzer automatically adds cast for expressions that inherit ExpectsInputTypes. We can make all string functions inherit ExpectsInputTypes and specify input types, so the casts are added automatically. Would you like to submit a PR? On Wed, Apr 29, 2015 at 2:06 PM, Olivier Girardot ssab...@gmail.com wrote: I guess you can use cast(id as String) instead of just id in your where clause ? Le mer. 29 avr. 2015 à 12:13, lonely Feb lonely8...@gmail.com a écrit : Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter difference between HIVE and Spark SQL that our sql has a statement like: select A from B where id regexp '^12345$' in HIVE it works fine but in Spark SQL we got a: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String Can this statement be handled with Spark SQL?
[discuss] DataFrame function namespacing
Before we make DataFrame non-alpha, it would be great to decide how we want to namespace all the functions. There are 3 alternatives: 1. Put all in org.apache.spark.sql.functions. This is how SQL does it, since SQL doesn't have namespaces. I estimate eventually we will have ~ 200 functions. 2. Have explicit namespaces, which is what master branch currently looks like: - org.apache.spark.sql.functions - org.apache.spark.sql.mathfunctions - ... 3. Have explicit namespaces, but restructure them slightly so everything is under functions. package object functions { // all the old functions here -- but deprecated so we keep source compatibility def ... } package org.apache.spark.sql.functions object mathFunc { ... } object basicFuncs { ... }
Re: [discuss] DataFrame function namespacing
My feeling is that we should have a handful of namespaces (say 4 or 5). It becomes too cumbersome to import / remember more package names and having everything in one package makes it hard to read scaladoc etc. Thanks Shivaram On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com wrote: To add a little bit more context, some pros/cons I can think of are: Option 1: Very easy for users to find the function, since they are all in org.apache.spark.sql.functions. However, there will be quite a large number of them. Option 2: I can't tell why we would want this one over Option 3, since it has all the problems of Option 3, and not as nice of a hierarchy. Option 3: Opposite of Option 1. Each package or static class has a small number of functions that are relevant to each other, but for some functions it is unclear where they should go (e.g. should min go into basic or math?) On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote: Before we make DataFrame non-alpha, it would be great to decide how we want to namespace all the functions. There are 3 alternatives: 1. Put all in org.apache.spark.sql.functions. This is how SQL does it, since SQL doesn't have namespaces. I estimate eventually we will have ~ 200 functions. 2. Have explicit namespaces, which is what master branch currently looks like: - org.apache.spark.sql.functions - org.apache.spark.sql.mathfunctions - ... 3. Have explicit namespaces, but restructure them slightly so everything is under functions. package object functions { // all the old functions here -- but deprecated so we keep source compatibility def ... } package org.apache.spark.sql.functions object mathFunc { ... } object basicFuncs { ... }
Re: RDD split into multiple RDDs
Hi Juan, Daniel, thank you for your explanations. Indeed, I don't have a big number of keys, at least not enough to stuck the scheduler. I was using a method quite similar as what you post, Juan, and yes it works, but I think this would be more efficient to not call filter on each key. So, I was thinking something like : - get the iterator of the KV rdd - distribute each value into a subset by key and then recreate a rdd from this subset Because spark context parallelize method cannot be used inside a transformation, I wonder if I could do it by creating a custom RDD and then try to implement something like PairRDDFunctions.lookup method, but remplacing Seq[V] of course by a RDD def lookup(key: K): Seq[V] = { self.partitioner match { case Some(p) = val index = p.getPartition(key) val process = (it: Iterator[(K, V)]) = { val buf = new ArrayBuffer[V] for (pair - it if pair._1 == key) { buf += pair._2 } buf } : Seq[V] val res = self.context.runJob(self, process, Array(index), false) res(0) case None = self.filter(_._1 == key).map(_._2).collect() } } 2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com: Hi Daniel, I understood Sébastien was talking having having a high number of keys, I guess I was prejudiced by my own problem! :) Anyway I don't think you need to use disk or a database to generate a RDD per key, you can use filter which I guess would be more efficient because IO is avoided, especially if the RDD was cached. For example: // in the spark shell import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD._ import scala.reflect.ClassTag // generate a map from key to rdd of values def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = { val keys = pairRDD.keys.distinct.collect (for (k - keys) yield k - (pairRDD filter(_._1 == k) values) ) toMap } // simple demo val xs = sc.parallelize(1 to 1000) val ixs = xs map(x = (x % 10, x)) val gs = groupByKeyToRDDs(ixs) gs(1).collect Just an idea. Greetings, Juan Rodriguez 2015-04-29 14:20 GMT+02:00 Daniel Darabos daniel.dara...@lynxanalytics.com: Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method for saving the RDD into separate files by key in a single pass. Then you can read the files into separate RDDs. On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Sébastien, I came with a similar problem some time ago, you can see the discussion in the Spark users mailing list at http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results . My experience was that when you create too many RDDs the Spark scheduler gets stuck, so if you have many keys in the map you are creating you'll probably have problems. On the other hand, the latest example I proposed in that mailing thread was a batch job in which we start from a single RDD of time tagged data, transform the RDD in a list of RDD corresponding to generating windows according to the time tag of the records, and then apply a transformation of RDD to each window RDD, like for example KMeans.run of MLlib. This is very similar to what you propose. So in my humble opinion the approach of generating thousands of RDDs by filtering doesn't work, and a new RDD class should be implemented for this. I have never implemented a custom RDD, but if you want some help I would be happy to join you in this task Sebastien said nothing about thousands of keys. This is a valid problem even if you only have two different keys. Greetings, Juan 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère s.sou...@gmail.com : Hello, I'm facing a problem with custom RDD transformations. I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map of RDD by key. This would be great, for example, in order to process mllib clustering on V values grouped by K. I know I could do it using filter() on my RDD as many times I have keys, but I'm afraid this would not be efficient (the entire RDD would be read each time, right ?). Then, I could mapByPartition my RDD before filtering, but the code is finally huge... So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only, but I cannot achieve my development. Please, could you tell me first if this is really faisable, and then, could you give me some pointers ? Thank you, Regards, Sebastien
Re: Plans for upgrading Hive dependency?
I am working on it. Here is the (very rough) version: https://github.com/apache/spark/compare/apache:master...marmbrus:multiHiveVersions On Mon, Apr 27, 2015 at 1:03 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Thanks Marcelo and Patrick - I don't know how I missed that ticket in my Jira search earlier. Is anybody working on the sub-issues yet, or is there a design doc I should look at before taking a stab? Regards, Punya On Mon, Apr 27, 2015 at 3:56 PM Patrick Wendell pwend...@gmail.com wrote: Hey Punya, There is some ongoing work to help make Hive upgrades more manageable and allow us to support multiple versions of Hive. Once we do that, it will be much easier for us to upgrade. https://issues.apache.org/jira/browse/SPARK-6906 - Patrick On Mon, Apr 27, 2015 at 12:47 PM, Marcelo Vanzin van...@cloudera.com wrote: That's a lot more complicated than you might think. We've done some basic work to get HiveContext to compile against Hive 1.1.0. Here's the code: https://github.com/cloudera/spark/commit/00e2c7e35d4ac236bcfbcd3d2805b483060255ec We didn't sent that upstream because that only solves half of the problem; the hive-thriftserver is disabled in our CDH build because it uses a lot of Hive APIs that have been removed in 1.1.0, so even getting it to compile is really complicated. If there's interest in getting the HiveContext part fixed up I can send a PR for that code. But at this time I don't really have plans to look at the thrift server. On Mon, Apr 27, 2015 at 11:58 AM, Punyashloka Biswal punya.bis...@gmail.com wrote: Dear Spark devs, Is there a plan for staying up-to-date with current (and future) versions of Hive? Spark currently supports version 0.13 (June 2014), but the latest version of Hive is 1.1.0 (March 2015). I don't see any Jira tickets about updating beyond 0.13, so I was wondering if this was intentional or it was just that nobody had started work on this yet. I'd be happy to work on a PR for the upgrade if one of the core developers can tell me what pitfalls to watch out for. Punya -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [discuss] DataFrame function namespacing
We definitely still have the name collision problem in SQL. On Wed, Apr 29, 2015 at 10:01 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Do we still have to keep the names of the functions distinct to avoid collisions in SQL? Or is there a plan to allow importing a namespace into SQL somehow? I ask because if we have to keep worrying about name collisions then I'm not sure what the added complexity of #2 and #3 buys us. Punya On Wed, Apr 29, 2015 at 3:52 PM Reynold Xin r...@databricks.com wrote: Scaladoc isn't much of a problem because scaladocs are grouped. Java/Python is the main problem ... See https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ On Wed, Apr 29, 2015 at 3:38 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: My feeling is that we should have a handful of namespaces (say 4 or 5). It becomes too cumbersome to import / remember more package names and having everything in one package makes it hard to read scaladoc etc. Thanks Shivaram On Wed, Apr 29, 2015 at 3:30 PM, Reynold Xin r...@databricks.com wrote: To add a little bit more context, some pros/cons I can think of are: Option 1: Very easy for users to find the function, since they are all in org.apache.spark.sql.functions. However, there will be quite a large number of them. Option 2: I can't tell why we would want this one over Option 3, since it has all the problems of Option 3, and not as nice of a hierarchy. Option 3: Opposite of Option 1. Each package or static class has a small number of functions that are relevant to each other, but for some functions it is unclear where they should go (e.g. should min go into basic or math?) On Wed, Apr 29, 2015 at 3:21 PM, Reynold Xin r...@databricks.com wrote: Before we make DataFrame non-alpha, it would be great to decide how we want to namespace all the functions. There are 3 alternatives: 1. Put all in org.apache.spark.sql.functions. This is how SQL does it, since SQL doesn't have namespaces. I estimate eventually we will have ~ 200 functions. 2. Have explicit namespaces, which is what master branch currently looks like: - org.apache.spark.sql.functions - org.apache.spark.sql.mathfunctions - ... 3. Have explicit namespaces, but restructure them slightly so everything is under functions. package object functions { // all the old functions here -- but deprecated so we keep source compatibility def ... } package org.apache.spark.sql.functions object mathFunc { ... } object basicFuncs { ... }
Event generator for SPARK-Streaming from csv
I have the real DEBS-TAxi data in csv file , in order to operate over it how to simulate a Spout kind of thing as event generator using the timestamps in CSV file. -- SERC-IISC Thanks Regards, Anshu Shukla