Re: RFC: Supporting the Scala drop Method for Spark RDDs
- Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. It turns out that drop can be implemented as a proper lazy transform. I discuss how that works here: http://erikerlandson.github.io/blog/2014/07/29/deferring-spark-actions-to-lazy-transforms-with-the-promise-rdd/ I updated the PR with this lazy implementation. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
Yeah, the input format doesn't support this behavior. But it does tell you the byte position of each record in the file. On Mon, Jul 21, 2014 at 10:55 PM, Reynold Xin r...@databricks.com wrote: Yes, that could work. But it is not as simple as just a binary flag. We might want to skip the first row for every file, or the header only for the first file. The former is not really supported out of the box by the input format I think? On Mon, Jul 21, 2014 at 10:50 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It could make sense to add a skipHeader argument to SparkContext.textFile? On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote: If the purpose is for dropping csv headers, perhaps we don't really need a common drop and only one that drops the first line in a file? I'd really try hard to avoid a common drop/dropWhile because they can be expensive to do. Note that I think we will be adding this functionality (ignoring headers) to the CsvRDD functionality in Spark SQL. https://github.com/apache/spark/pull/1351 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com wrote: You can find some of the prior, related discussion here: https://issues.apache.org/jira/browse/SPARK-1021 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote: - Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On
Re: RFC: Supporting the Scala drop Method for Spark RDDs
- Original Message - It could make sense to add a skipHeader argument to SparkContext.textFile? I also looked into this. I don't think it's feasible given the limits of the InputFormat and RecordReader interfaces. RecordReader can't (I think) *ever* know which split it's attached to, and the getSplits() method has no concept of RecordReader, so it can't know how many records reside in its splits. At least in RDD it's possible to do, if not attractive. On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote: If the purpose is for dropping csv headers, perhaps we don't really need a common drop and only one that drops the first line in a file? I'd really try hard to avoid a common drop/dropWhile because they can be expensive to do. Note that I think we will be adding this functionality (ignoring headers) to the CsvRDD functionality in Spark SQL. https://github.com/apache/spark/pull/1351 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com wrote: You can find some of the prior, related discussion here: https://issues.apache.org/jira/browse/SPARK-1021 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote: - Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make
Re: RFC: Supporting the Scala drop Method for Spark RDDs
Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
- Original Message - I too would like this feature. Erik's post makes sense. However, shouldn't the RDD also repartition itself after drop to effectively make use of cluster resources? My thinking is that in most use cases(*), one is dropping a small number of rows, and they are in only the 1st partition, and so repartitioning would not be worth the cost. The first partition would be passed mostly intact, and the remainder would be completely unchanged. (*) or at least most use cases that I've considered. On Jul 21, 2014 8:58 PM, Andrew Ash [via Apache Spark Developers List] ml-node+s1001551n7434...@n3.nabble.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson [hidden email] http://user/SendEmail.jtp?type=nodenode=7434i=0 wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/ -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-developers-list.1001551.n3.nabble.com/RFC-Supporting-the-Scala-drop-Method-for-Spark-RDDs-tp7433p7434.html To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1...@n3.nabble.com To unsubscribe from Apache Spark Developers List, click here http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=YW5pa2V0LmJoYXRuYWdhckBnbWFpbC5jb218MXwxMzE3NTAzMzQz . NAML http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/RFC-Supporting-the-Scala-drop-Method-for-Spark-RDDs-tp7433p7436.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: RFC: Supporting the Scala drop Method for Spark RDDs
- Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
- Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
If the purpose is for dropping csv headers, perhaps we don't really need a common drop and only one that drops the first line in a file? I'd really try hard to avoid a common drop/dropWhile because they can be expensive to do. Note that I think we will be adding this functionality (ignoring headers) to the CsvRDD functionality in Spark SQL. https://github.com/apache/spark/pull/1351 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com wrote: You can find some of the prior, related discussion here: https://issues.apache.org/jira/browse/SPARK-1021 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote: - Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
It could make sense to add a skipHeader argument to SparkContext.textFile? On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote: If the purpose is for dropping csv headers, perhaps we don't really need a common drop and only one that drops the first line in a file? I'd really try hard to avoid a common drop/dropWhile because they can be expensive to do. Note that I think we will be adding this functionality (ignoring headers) to the CsvRDD functionality in Spark SQL. https://github.com/apache/spark/pull/1351 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com wrote: You can find some of the prior, related discussion here: https://issues.apache.org/jira/browse/SPARK-1021 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote: - Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation of = 1 partition of the parent RDD, and so it would behave like a partial action that returns an RDD as the result. I wrote up a discussion of these trade-offs here: http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
Re: RFC: Supporting the Scala drop Method for Spark RDDs
Yes, that could work. But it is not as simple as just a binary flag. We might want to skip the first row for every file, or the header only for the first file. The former is not really supported out of the box by the input format I think? On Mon, Jul 21, 2014 at 10:50 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It could make sense to add a skipHeader argument to SparkContext.textFile? On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote: If the purpose is for dropping csv headers, perhaps we don't really need a common drop and only one that drops the first line in a file? I'd really try hard to avoid a common drop/dropWhile because they can be expensive to do. Note that I think we will be adding this functionality (ignoring headers) to the CsvRDD functionality in Spark SQL. https://github.com/apache/spark/pull/1351 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com wrote: You can find some of the prior, related discussion here: https://issues.apache.org/jira/browse/SPARK-1021 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote: - Original Message - Rather than embrace non-lazy transformations and add more of them, I'd rather we 1) try to fully characterize the needs that are driving their creation/usage; and 2) design and implement new Spark abstractions that will allow us to meet those needs and eliminate existing non-lazy transformation. In the case of drop, obtaining the index of the boundary partition can be viewed as the action forcing compute -- one that happens to be invoked inside of a transform. The concept of a lazy action, that is only triggered if the result rdd has compute invoked on it, might be sufficient to restore laziness to the drop transform. For that matter, I might find some way to make use of Scala lazy values directly and achieve the same goal for drop. They really mess up things like creation of asynchronous FutureActions, job cancellation and accounting of job resource usage, etc., so I'd rather we seek a way out of the existing hole rather than make it deeper. On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote: - Original Message - Sure, drop() would be useful, but breaking the transformations are lazy; only actions launch jobs model is abhorrent -- which is not to say that we haven't already broken that model for useful operations (cf. RangePartitioner, which is used for sorted RDDs), but rather that each such exception to the model is a significant source of pain that can be hard to work with or work around. A thought that comes to my mind here is that there are in fact already two categories of transform: ones that are truly lazy, and ones that are not. A possible option is to embrace that, and commit to documenting the two categories as such, with an obvious bias towards favoring lazy transforms (to paraphrase Churchill, we're down to haggling over the price). I really wouldn't like to see another such model-breaking transformation added to the API. On the other hand, being able to write transformations with dependencies on these kind of internal jobs is sometimes very useful, so a significant reworking of Spark's Dependency model that would allow for lazily running such internal jobs and making the results available to subsequent stages may be something worth pursuing. This seems like a very interesting angle. I don't have much feel for what a solution would look like, but it sounds as if it would involve caching all operations embodied by RDD transform method code for provisional execution. I believe that these levels of invocation are currently executed in the master, not executor nodes. On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote: Personally I'd find the method useful -- I've often had a .csv file with a header row that I want to drop so filter it out, which touches all partitions anyway. I don't have any comments on the implementation quite yet though. On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote: A few weeks ago I submitted a PR for supporting rdd.drop(n), under SPARK-2315: https://issues.apache.org/jira/browse/SPARK-2315 Supporting the drop method would make some operations convenient, however it forces computation