[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142982#comment-14142982 ] Patrick Wendell edited comment on SPARK-3622 at 9/22/14 7:35 AM: - In Spark most RDD operations are lazy, so in the example you gave, there is no guarantee that rddA and rddB will be computed at the same time. In fact in this case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike almost all transformations). I'm trying to think if there is a case where you can know ahead of time that the derivative RDD's (B and C) are definitely going to be computed at the same time. It would be interesting to see if two RDD's could be jointly computed in that case. Right now the physical execution is pretty closely tied to the idea of computing a single RDD at one time, so not sure if it's really possible - this is probably different than Hive where logical plans are more decoupled from physical execution. A case where we could exploit this is something like: {code} val a = sc.parallelize(1 to 1000, 100) val b = a.filter(...) val c = a.filter(...) val d = a.filter(...) b.union(c).count() {code} In the above example currently it will submit two stages at the same time that will both compute a. But maybe we could do something smarter. If we can detect this we might be able to expose something where users can e.g. run multiple operations on the same underlying RDD and it will work efficiently provided those get computed at the same time. was (Author: pwendell): In Spark most RDD operations are lazy, so in the example you gave, there is no guarantee that rddA and rddB will be computed at the same time. In fact in this case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike almost all transformations). I'm trying to think if there is a case where you can know ahead of time that the derivative RDD's (B and C) are definitely going to be computed at the same time. It would be interesting to see if two RDD's could be jointly computed in that case. Right now the physical execution is pretty closely tied to the idea of computing a single RDD at one time, so not sure if it's really possible - this is probably different than Hive where logical plans are more decoupled from physical execution. A case where we could exploit this is something like: {code} val a = sc.parallelize(1 to 1000, 100) val b = a.filter(...) val c = a.filter(...) val d = a.filter(...) b.union(c).count() {code} In the above example currently it will submit two stages at the same time that will both compute a. But maybe we could do something smarter. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142982#comment-14142982 ] Patrick Wendell edited comment on SPARK-3622 at 9/22/14 7:35 AM: - In Spark most RDD operations are lazy, so in the example you gave, there is no guarantee that rddA and rddB will be computed at the same time. In fact in this case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike almost all transformations). I'm trying to think if there is a case where you can know ahead of time that the derivative RDD's (B and C) are definitely going to be computed at the same time. It would be interesting to see if two RDD's could be jointly computed in that case. Right now the physical execution is pretty closely tied to the idea of computing a single RDD at one time, so not sure if it's really possible - this is probably different than Hive where logical plans are more decoupled from physical execution. A case where we could exploit this is something like: {code} val a = sc.parallelize(1 to 1000, 100) val b = a.filter(...) val c = a.filter(...) val d = a.filter(...) b.union(c).count() {code} In the above example currently it will submit two stages at the same time that will both compute a. But maybe we could do something smarter. If we can detect this we might be able to expose something where users can e.g. run multiple operations on the same underlying RDD and it will work efficiently provided those get computed at the same time. I'm not really sure that this is possible, but I also can't prove it's impossible, so that's good! was (Author: pwendell): In Spark most RDD operations are lazy, so in the example you gave, there is no guarantee that rddA and rddB will be computed at the same time. In fact in this case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike almost all transformations). I'm trying to think if there is a case where you can know ahead of time that the derivative RDD's (B and C) are definitely going to be computed at the same time. It would be interesting to see if two RDD's could be jointly computed in that case. Right now the physical execution is pretty closely tied to the idea of computing a single RDD at one time, so not sure if it's really possible - this is probably different than Hive where logical plans are more decoupled from physical execution. A case where we could exploit this is something like: {code} val a = sc.parallelize(1 to 1000, 100) val b = a.filter(...) val c = a.filter(...) val d = a.filter(...) b.union(c).count() {code} In the above example currently it will submit two stages at the same time that will both compute a. But maybe we could do something smarter. If we can detect this we might be able to expose something where users can e.g. run multiple operations on the same underlying RDD and it will work efficiently provided those get computed at the same time. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142865#comment-14142865 ] Patrick Wendell edited comment on SPARK-3622 at 9/22/14 3:24 AM: - Do you mind clarifying a little bit how hive would use this (maybe with a code example)? Let's say you had a transformation that went from a single RDD A to two RDD's B and C. The normal way to do this if you want to avoid recomputing A would be to persist it, then use it to derive both B and C (this will do multiple passes on A, but it won't fully recompute A twice). I think that doing this in the general case is not possible by definition. The user might use B and C at different times, so it's not possible to guarantee that A will be computed only once unless you persist A. was (Author: pwendell): Do you mind clarifying a little bit how hive would use this (maybe with a code example)? The normal way to do this if you want to avoid recomputing A would be to persist it, then use it to derive both B and C (this will do multiple passes on A, but it won't fully recompute A twice). I think that doing this in the general case is not possible by definition. Let's say you had a transformation that went from a single RDD A to two RDD's B and C. The user might use B and C at different times, so it's not possible to guarantee that A will be computed only once unless you persist A. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881 ] Xuefu Zhang edited comment on SPARK-3622 at 9/22/14 4:39 AM: - Thanks for your comments, [~pwendell]. I understand caching A would be helpful if I need to transform it to get B and C separately. My proposal is to get B and C just by one pass of A, so A doens't even need to be cached. Here is an example how it may be used in Hive. {code} JavaPairRDD table = sparkContext.hadoopRDD(..); Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction); JavaPairRDD rddA = mapperRDDs.get(A); JavaPairRDD rddB = mapperRDDs.get(B); JavaPairRDD sortedRddA = rddA.sortByKey(); javaPairRDD groupedRddB = rddB.groupByKey(); // further processing sortedRddA and groupedRddB. ... {code} In this case, mapFunction can return named iterators for A and B. B is automatically computed whenever A is computed, and vice versa. Since both are computed if any of them computed, subsequent reference to either one should not recompute any of them. The benefits of it: 1) no need to cache A; 2) only one pass of the input. I'm not sure if this is possible feasible in Spark, but Hive's map function is exactly doing this. It's operator tree can branch off anywhere, resulting multiple output datasets from a single input dataset. Please let me know if there are more questions. was (Author: xuefuz): Thanks for your comments, [~pwendell]. I understand caching A would be helpful if I need to transform it to get B and C separately. My proposal is to get B and C just by one pass of A, so A doens't even need to be cached. Here is an example how it may be used in Hive. {code} JavaPairRDD table = sparkContext.hadoopRDD(..); Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction); JavaPairRDD rddA = mapperRDDs.get(A); JavaPairRDD rddB = mapperRDDs.get(A); JavaPairRDD sortedRddA = rddA.sortByKey(); javaPairRDD groupedRddB = rddB.groupByKey(); // further processing sortedRddA and groupedRddB. ... {code} In this case, mapFunction can return named iterators for A and B. B is automatically computed whenever A is computed, and vice versa. Since both are computed if any of them computed, subsequent reference to either one should not recompute any of them. The benefits of it: 1) no need to cache A; 2) only one pass of the input. I'm not sure if this is possible feasible in Spark, but Hive's map function is exactly doing this. It's operator tree can branch off anywhere, resulting multiple output datasets from a single input dataset. Please let me know if there are more questions. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org