Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-29 Thread Erik Erlandson


- Original Message -
 Sure, drop() would be useful, but breaking the transformations are lazy;
 only actions launch jobs model is abhorrent -- which is not to say that we
 haven't already broken that model for useful operations (cf.
 RangePartitioner, which is used for sorted RDDs), but rather that each such
 exception to the model is a significant source of pain that can be hard to
 work with or work around.
 
 I really wouldn't like to see another such model-breaking transformation
 added to the API.  On the other hand, being able to write transformations
 with dependencies on these kind of internal jobs is sometimes very
 useful, so a significant reworking of Spark's Dependency model that would
 allow for lazily running such internal jobs and making the results
 available to subsequent stages may be something worth pursuing.


It turns out that drop can be implemented as a proper lazy transform.  I 
discuss how that works here:
http://erikerlandson.github.io/blog/2014/07/29/deferring-spark-actions-to-lazy-transforms-with-the-promise-rdd/

I updated the PR with this lazy implementation.




 
 
 On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote:
 
  Personally I'd find the method useful -- I've often had a .csv file with a
  header row that I want to drop so filter it out, which touches all
  partitions anyway.  I don't have any comments on the implementation quite
  yet though.
 
 
  On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote:
 
   A few weeks ago I submitted a PR for supporting rdd.drop(n), under
   SPARK-2315:
   https://issues.apache.org/jira/browse/SPARK-2315
  
   Supporting the drop method would make some operations convenient, however
   it forces computation of = 1 partition of the parent RDD, and so it
  would
   behave like a partial action that returns an RDD as the result.
  
   I wrote up a discussion of these trade-offs here:
  
  
  http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
  
 
 


Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-22 Thread Sandy Ryza
Yeah, the input format doesn't support this behavior.  But it does tell you
the byte position of each record in the file.


On Mon, Jul 21, 2014 at 10:55 PM, Reynold Xin r...@databricks.com wrote:

 Yes, that could work. But it is not as simple as just a binary flag.

 We might want to skip the first row for every file, or the header only for
 the first file. The former is not really supported out of the box by the
 input format I think?


 On Mon, Jul 21, 2014 at 10:50 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

  It could make sense to add a skipHeader argument to
 SparkContext.textFile?
 
 
  On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com
 wrote:
 
   If the purpose is for dropping csv headers, perhaps we don't really
 need
  a
   common drop and only one that drops the first line in a file? I'd
 really
   try hard to avoid a common drop/dropWhile because they can be expensive
  to
   do.
  
   Note that I think we will be adding this functionality (ignoring
 headers)
   to the CsvRDD functionality in Spark SQL.
https://github.com/apache/spark/pull/1351
  
  
   On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com
 
   wrote:
  
You can find some of the prior, related discussion here:
https://issues.apache.org/jira/browse/SPARK-1021
   
   
On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com
  wrote:
   


 - Original Message -
  Rather than embrace non-lazy transformations and add more of
 them,
   I'd
  rather we 1) try to fully characterize the needs that are driving
   their
  creation/usage; and 2) design and implement new Spark
 abstractions
   that
  will allow us to meet those needs and eliminate existing non-lazy
  transformation.


 In the case of drop, obtaining the index of the boundary partition
  can
   be
 viewed as the action forcing compute -- one that happens to be
  invoked
 inside of a transform.  The concept of a lazy action, that is
 only
 triggered if the result rdd has compute invoked on it, might be
sufficient
 to restore laziness to the drop transform.   For that matter, I
 might
find
 some way to make use of Scala lazy values directly and achieve the
  same
 goal for drop.



  They really mess up things like creation of asynchronous
  FutureActions, job cancellation and accounting of job resource
  usage,
 etc.,
  so I'd rather we seek a way out of the existing hole rather than
  make
it
  deeper.
 
 
  On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com
 
wrote:
 
  
  
   - Original Message -
Sure, drop() would be useful, but breaking the
 transformations
   are
 lazy;
only actions launch jobs model is abhorrent -- which is not
 to
   say
 that
   we
haven't already broken that model for useful operations (cf.
RangePartitioner, which is used for sorted RDDs), but rather
  that
 each
   such
exception to the model is a significant source of pain that
 can
   be
 hard
   to
work with or work around.
  
   A thought that comes to my mind here is that there are in fact
already
 two
   categories of transform: ones that are truly lazy, and ones
 that
   are
 not.
A possible option is to embrace that, and commit to
 documenting
   the
 two
   categories as such, with an obvious bias towards favoring lazy
 transforms
   (to paraphrase Churchill, we're down to haggling over the
 price).
  
  
   
I really wouldn't like to see another such model-breaking
 transformation
added to the API.  On the other hand, being able to write
 transformations
with dependencies on these kind of internal jobs is
 sometimes
very
useful, so a significant reworking of Spark's Dependency
 model
   that
 would
allow for lazily running such internal jobs and making the
   results
available to subsequent stages may be something worth
 pursuing.
  
  
   This seems like a very interesting angle.   I don't have much
  feel
for
   what a solution would look like, but it sounds as if it would
   involve
   caching all operations embodied by RDD transform method code
 for
   provisional execution.  I believe that these levels of
 invocation
   are
   currently executed in the master, not executor nodes.
  
  
   
   
On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash 
   and...@andrewash.com
   wrote:
   
 Personally I'd find the method useful -- I've often had a
  .csv
file
   with a
 header row that I want to drop so filter it out, which
  touches
all
 partitions anyway.  I don't have any comments on the
implementation
   quite
 yet though.


 On 

Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-22 Thread Erik Erlandson


- Original Message -
 It could make sense to add a skipHeader argument to SparkContext.textFile?

I also looked into this.   I don't think it's feasible given the limits of the 
InputFormat and RecordReader interfaces.  RecordReader can't (I think) *ever* 
know which split it's attached to, and the getSplits() method has no concept of 
RecordReader, so it can't know how many records reside in its splits.   At 
least in RDD it's possible to do, if not attractive.



 
 
 On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote:
 
  If the purpose is for dropping csv headers, perhaps we don't really need a
  common drop and only one that drops the first line in a file? I'd really
  try hard to avoid a common drop/dropWhile because they can be expensive to
  do.
 
  Note that I think we will be adding this functionality (ignoring headers)
  to the CsvRDD functionality in Spark SQL.
   https://github.com/apache/spark/pull/1351
 
 
  On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com
  wrote:
 
   You can find some of the prior, related discussion here:
   https://issues.apache.org/jira/browse/SPARK-1021
  
  
   On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote:
  
   
   
- Original Message -
 Rather than embrace non-lazy transformations and add more of them,
  I'd
 rather we 1) try to fully characterize the needs that are driving
  their
 creation/usage; and 2) design and implement new Spark abstractions
  that
 will allow us to meet those needs and eliminate existing non-lazy
 transformation.
   
   
In the case of drop, obtaining the index of the boundary partition can
  be
viewed as the action forcing compute -- one that happens to be invoked
inside of a transform.  The concept of a lazy action, that is only
triggered if the result rdd has compute invoked on it, might be
   sufficient
to restore laziness to the drop transform.   For that matter, I might
   find
some way to make use of Scala lazy values directly and achieve the same
goal for drop.
   
   
   
 They really mess up things like creation of asynchronous
 FutureActions, job cancellation and accounting of job resource usage,
etc.,
 so I'd rather we seek a way out of the existing hole rather than make
   it
 deeper.


 On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com
   wrote:

 
 
  - Original Message -
   Sure, drop() would be useful, but breaking the transformations
  are
lazy;
   only actions launch jobs model is abhorrent -- which is not to
  say
that
  we
   haven't already broken that model for useful operations (cf.
   RangePartitioner, which is used for sorted RDDs), but rather that
each
  such
   exception to the model is a significant source of pain that can
  be
hard
  to
   work with or work around.
 
  A thought that comes to my mind here is that there are in fact
   already
two
  categories of transform: ones that are truly lazy, and ones that
  are
not.
   A possible option is to embrace that, and commit to documenting
  the
two
  categories as such, with an obvious bias towards favoring lazy
transforms
  (to paraphrase Churchill, we're down to haggling over the price).
 
 
  
   I really wouldn't like to see another such model-breaking
transformation
   added to the API.  On the other hand, being able to write
transformations
   with dependencies on these kind of internal jobs is sometimes
   very
   useful, so a significant reworking of Spark's Dependency model
  that
would
   allow for lazily running such internal jobs and making the
  results
   available to subsequent stages may be something worth pursuing.
 
 
  This seems like a very interesting angle.   I don't have much feel
   for
  what a solution would look like, but it sounds as if it would
  involve
  caching all operations embodied by RDD transform method code for
  provisional execution.  I believe that these levels of invocation
  are
  currently executed in the master, not executor nodes.
 
 
  
  
   On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash 
  and...@andrewash.com
  wrote:
  
Personally I'd find the method useful -- I've often had a .csv
   file
  with a
header row that I want to drop so filter it out, which touches
   all
partitions anyway.  I don't have any comments on the
   implementation
  quite
yet though.
   
   
On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson 
  e...@redhat.com
  wrote:
   
 A few weeks ago I submitted a PR for supporting rdd.drop(n),
under
 SPARK-2315:
 https://issues.apache.org/jira/browse/SPARK-2315

 Supporting the drop method would make 

Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Andrew Ash
Personally I'd find the method useful -- I've often had a .csv file with a
header row that I want to drop so filter it out, which touches all
partitions anyway.  I don't have any comments on the implementation quite
yet though.


On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote:

 A few weeks ago I submitted a PR for supporting rdd.drop(n), under
 SPARK-2315:
 https://issues.apache.org/jira/browse/SPARK-2315

 Supporting the drop method would make some operations convenient, however
 it forces computation of = 1 partition of the parent RDD, and so it would
 behave like a partial action that returns an RDD as the result.

 I wrote up a discussion of these trade-offs here:

 http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/



Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Mark Hamstra
Sure, drop() would be useful, but breaking the transformations are lazy;
only actions launch jobs model is abhorrent -- which is not to say that we
haven't already broken that model for useful operations (cf.
RangePartitioner, which is used for sorted RDDs), but rather that each such
exception to the model is a significant source of pain that can be hard to
work with or work around.

I really wouldn't like to see another such model-breaking transformation
added to the API.  On the other hand, being able to write transformations
with dependencies on these kind of internal jobs is sometimes very
useful, so a significant reworking of Spark's Dependency model that would
allow for lazily running such internal jobs and making the results
available to subsequent stages may be something worth pursuing.


On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote:

 Personally I'd find the method useful -- I've often had a .csv file with a
 header row that I want to drop so filter it out, which touches all
 partitions anyway.  I don't have any comments on the implementation quite
 yet though.


 On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote:

  A few weeks ago I submitted a PR for supporting rdd.drop(n), under
  SPARK-2315:
  https://issues.apache.org/jira/browse/SPARK-2315
 
  Supporting the drop method would make some operations convenient, however
  it forces computation of = 1 partition of the parent RDD, and so it
 would
  behave like a partial action that returns an RDD as the result.
 
  I wrote up a discussion of these trade-offs here:
 
 
 http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
 



Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Erik Erlandson


- Original Message -
 I too would like this feature. Erik's post makes sense. However, shouldn't
 the RDD also repartition itself after drop to effectively make use of
 cluster resources?


My thinking is that in most use cases(*), one is dropping a small number of 
rows, and they are in only the 1st partition, and so repartitioning would not 
be worth the cost.  The first partition would be passed mostly intact, and the 
remainder would be completely unchanged.

(*) or at least most use cases that I've considered.


 On Jul 21, 2014 8:58 PM, Andrew Ash [via Apache Spark Developers List] 
 ml-node+s1001551n7434...@n3.nabble.com wrote:
 
  Personally I'd find the method useful -- I've often had a .csv file with a
  header row that I want to drop so filter it out, which touches all
  partitions anyway.  I don't have any comments on the implementation quite
  yet though.
 
 
  On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson [hidden email]
  http://user/SendEmail.jtp?type=nodenode=7434i=0 wrote:
 
   A few weeks ago I submitted a PR for supporting rdd.drop(n), under
   SPARK-2315:
   https://issues.apache.org/jira/browse/SPARK-2315
  
   Supporting the drop method would make some operations convenient,
  however
   it forces computation of = 1 partition of the parent RDD, and so it
  would
   behave like a partial action that returns an RDD as the result.
  
   I wrote up a discussion of these trade-offs here:
  
  
  http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
  
 
 
  --
   If you reply to this email, your message will be added to the discussion
  below:
 
  http://apache-spark-developers-list.1001551.n3.nabble.com/RFC-Supporting-the-Scala-drop-Method-for-Spark-RDDs-tp7433p7434.html
   To start a new topic under Apache Spark Developers List, email
  ml-node+s1001551n1...@n3.nabble.com
  To unsubscribe from Apache Spark Developers List, click here
  http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=YW5pa2V0LmJoYXRuYWdhckBnbWFpbC5jb218MXwxMzE3NTAzMzQz
  .
  NAML
  http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
 
 
 
 
 
 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/RFC-Supporting-the-Scala-drop-Method-for-Spark-RDDs-tp7433p7436.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.


Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Erik Erlandson


- Original Message -
 Sure, drop() would be useful, but breaking the transformations are lazy;
 only actions launch jobs model is abhorrent -- which is not to say that we
 haven't already broken that model for useful operations (cf.
 RangePartitioner, which is used for sorted RDDs), but rather that each such
 exception to the model is a significant source of pain that can be hard to
 work with or work around.

A thought that comes to my mind here is that there are in fact already two 
categories of transform: ones that are truly lazy, and ones that are not.  A 
possible option is to embrace that, and commit to documenting the two 
categories as such, with an obvious bias towards favoring lazy transforms (to 
paraphrase Churchill, we're down to haggling over the price).
 

 
 I really wouldn't like to see another such model-breaking transformation
 added to the API.  On the other hand, being able to write transformations
 with dependencies on these kind of internal jobs is sometimes very
 useful, so a significant reworking of Spark's Dependency model that would
 allow for lazily running such internal jobs and making the results
 available to subsequent stages may be something worth pursuing.


This seems like a very interesting angle.   I don't have much feel for what a 
solution would look like, but it sounds as if it would involve caching all 
operations embodied by RDD transform method code for provisional execution.  I 
believe that these levels of invocation are currently executed in the master, 
not executor nodes.


 
 
 On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com wrote:
 
  Personally I'd find the method useful -- I've often had a .csv file with a
  header row that I want to drop so filter it out, which touches all
  partitions anyway.  I don't have any comments on the implementation quite
  yet though.
 
 
  On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com wrote:
 
   A few weeks ago I submitted a PR for supporting rdd.drop(n), under
   SPARK-2315:
   https://issues.apache.org/jira/browse/SPARK-2315
  
   Supporting the drop method would make some operations convenient, however
   it forces computation of = 1 partition of the parent RDD, and so it
  would
   behave like a partial action that returns an RDD as the result.
  
   I wrote up a discussion of these trade-offs here:
  
  
  http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
  
 
 


Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Mark Hamstra
Rather than embrace non-lazy transformations and add more of them, I'd
rather we 1) try to fully characterize the needs that are driving their
creation/usage; and 2) design and implement new Spark abstractions that
will allow us to meet those needs and eliminate existing non-lazy
transformation.  They really mess up things like creation of asynchronous
FutureActions, job cancellation and accounting of job resource usage, etc.,
so I'd rather we seek a way out of the existing hole rather than make it
deeper.


On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote:



 - Original Message -
  Sure, drop() would be useful, but breaking the transformations are lazy;
  only actions launch jobs model is abhorrent -- which is not to say that
 we
  haven't already broken that model for useful operations (cf.
  RangePartitioner, which is used for sorted RDDs), but rather that each
 such
  exception to the model is a significant source of pain that can be hard
 to
  work with or work around.

 A thought that comes to my mind here is that there are in fact already two
 categories of transform: ones that are truly lazy, and ones that are not.
  A possible option is to embrace that, and commit to documenting the two
 categories as such, with an obvious bias towards favoring lazy transforms
 (to paraphrase Churchill, we're down to haggling over the price).


 
  I really wouldn't like to see another such model-breaking transformation
  added to the API.  On the other hand, being able to write transformations
  with dependencies on these kind of internal jobs is sometimes very
  useful, so a significant reworking of Spark's Dependency model that would
  allow for lazily running such internal jobs and making the results
  available to subsequent stages may be something worth pursuing.


 This seems like a very interesting angle.   I don't have much feel for
 what a solution would look like, but it sounds as if it would involve
 caching all operations embodied by RDD transform method code for
 provisional execution.  I believe that these levels of invocation are
 currently executed in the master, not executor nodes.


 
 
  On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com
 wrote:
 
   Personally I'd find the method useful -- I've often had a .csv file
 with a
   header row that I want to drop so filter it out, which touches all
   partitions anyway.  I don't have any comments on the implementation
 quite
   yet though.
  
  
   On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com
 wrote:
  
A few weeks ago I submitted a PR for supporting rdd.drop(n), under
SPARK-2315:
https://issues.apache.org/jira/browse/SPARK-2315
   
Supporting the drop method would make some operations convenient,
 however
it forces computation of = 1 partition of the parent RDD, and so it
   would
behave like a partial action that returns an RDD as the result.
   
I wrote up a discussion of these trade-offs here:
   
   
  
 http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
   
  
 



Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Erik Erlandson


- Original Message -
 Rather than embrace non-lazy transformations and add more of them, I'd
 rather we 1) try to fully characterize the needs that are driving their
 creation/usage; and 2) design and implement new Spark abstractions that
 will allow us to meet those needs and eliminate existing non-lazy
 transformation.  


In the case of drop, obtaining the index of the boundary partition can be 
viewed as the action forcing compute -- one that happens to be invoked inside 
of a transform.  The concept of a lazy action, that is only triggered if the 
result rdd has compute invoked on it, might be sufficient to restore laziness 
to the drop transform.   For that matter, I might find some way to make use of 
Scala lazy values directly and achieve the same goal for drop.



 They really mess up things like creation of asynchronous
 FutureActions, job cancellation and accounting of job resource usage, etc.,
 so I'd rather we seek a way out of the existing hole rather than make it
 deeper.
 
 
 On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com wrote:
 
 
 
  - Original Message -
   Sure, drop() would be useful, but breaking the transformations are lazy;
   only actions launch jobs model is abhorrent -- which is not to say that
  we
   haven't already broken that model for useful operations (cf.
   RangePartitioner, which is used for sorted RDDs), but rather that each
  such
   exception to the model is a significant source of pain that can be hard
  to
   work with or work around.
 
  A thought that comes to my mind here is that there are in fact already two
  categories of transform: ones that are truly lazy, and ones that are not.
   A possible option is to embrace that, and commit to documenting the two
  categories as such, with an obvious bias towards favoring lazy transforms
  (to paraphrase Churchill, we're down to haggling over the price).
 
 
  
   I really wouldn't like to see another such model-breaking transformation
   added to the API.  On the other hand, being able to write transformations
   with dependencies on these kind of internal jobs is sometimes very
   useful, so a significant reworking of Spark's Dependency model that would
   allow for lazily running such internal jobs and making the results
   available to subsequent stages may be something worth pursuing.
 
 
  This seems like a very interesting angle.   I don't have much feel for
  what a solution would look like, but it sounds as if it would involve
  caching all operations embodied by RDD transform method code for
  provisional execution.  I believe that these levels of invocation are
  currently executed in the master, not executor nodes.
 
 
  
  
   On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com
  wrote:
  
Personally I'd find the method useful -- I've often had a .csv file
  with a
header row that I want to drop so filter it out, which touches all
partitions anyway.  I don't have any comments on the implementation
  quite
yet though.
   
   
On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com
  wrote:
   
 A few weeks ago I submitted a PR for supporting rdd.drop(n), under
 SPARK-2315:
 https://issues.apache.org/jira/browse/SPARK-2315

 Supporting the drop method would make some operations convenient,
  however
 it forces computation of = 1 partition of the parent RDD, and so it
would
 behave like a partial action that returns an RDD as the result.

 I wrote up a discussion of these trade-offs here:


   
  http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/

   
  
 
 


Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Reynold Xin
If the purpose is for dropping csv headers, perhaps we don't really need a
common drop and only one that drops the first line in a file? I'd really
try hard to avoid a common drop/dropWhile because they can be expensive to
do.

Note that I think we will be adding this functionality (ignoring headers)
to the CsvRDD functionality in Spark SQL.
 https://github.com/apache/spark/pull/1351


On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 You can find some of the prior, related discussion here:
 https://issues.apache.org/jira/browse/SPARK-1021


 On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote:

 
 
  - Original Message -
   Rather than embrace non-lazy transformations and add more of them, I'd
   rather we 1) try to fully characterize the needs that are driving their
   creation/usage; and 2) design and implement new Spark abstractions that
   will allow us to meet those needs and eliminate existing non-lazy
   transformation.
 
 
  In the case of drop, obtaining the index of the boundary partition can be
  viewed as the action forcing compute -- one that happens to be invoked
  inside of a transform.  The concept of a lazy action, that is only
  triggered if the result rdd has compute invoked on it, might be
 sufficient
  to restore laziness to the drop transform.   For that matter, I might
 find
  some way to make use of Scala lazy values directly and achieve the same
  goal for drop.
 
 
 
   They really mess up things like creation of asynchronous
   FutureActions, job cancellation and accounting of job resource usage,
  etc.,
   so I'd rather we seek a way out of the existing hole rather than make
 it
   deeper.
  
  
   On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com
 wrote:
  
   
   
- Original Message -
 Sure, drop() would be useful, but breaking the transformations are
  lazy;
 only actions launch jobs model is abhorrent -- which is not to say
  that
we
 haven't already broken that model for useful operations (cf.
 RangePartitioner, which is used for sorted RDDs), but rather that
  each
such
 exception to the model is a significant source of pain that can be
  hard
to
 work with or work around.
   
A thought that comes to my mind here is that there are in fact
 already
  two
categories of transform: ones that are truly lazy, and ones that are
  not.
 A possible option is to embrace that, and commit to documenting the
  two
categories as such, with an obvious bias towards favoring lazy
  transforms
(to paraphrase Churchill, we're down to haggling over the price).
   
   

 I really wouldn't like to see another such model-breaking
  transformation
 added to the API.  On the other hand, being able to write
  transformations
 with dependencies on these kind of internal jobs is sometimes
 very
 useful, so a significant reworking of Spark's Dependency model that
  would
 allow for lazily running such internal jobs and making the results
 available to subsequent stages may be something worth pursuing.
   
   
This seems like a very interesting angle.   I don't have much feel
 for
what a solution would look like, but it sounds as if it would involve
caching all operations embodied by RDD transform method code for
provisional execution.  I believe that these levels of invocation are
currently executed in the master, not executor nodes.
   
   


 On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash and...@andrewash.com
wrote:

  Personally I'd find the method useful -- I've often had a .csv
 file
with a
  header row that I want to drop so filter it out, which touches
 all
  partitions anyway.  I don't have any comments on the
 implementation
quite
  yet though.
 
 
  On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson e...@redhat.com
wrote:
 
   A few weeks ago I submitted a PR for supporting rdd.drop(n),
  under
   SPARK-2315:
   https://issues.apache.org/jira/browse/SPARK-2315
  
   Supporting the drop method would make some operations
 convenient,
however
   it forces computation of = 1 partition of the parent RDD, and
  so it
  would
   behave like a partial action that returns an RDD as the
 result.
  
   I wrote up a discussion of these trade-offs here:
  
  
 
   
 
 http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
  
 

   
  
 



Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Sandy Ryza
It could make sense to add a skipHeader argument to SparkContext.textFile?


On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote:

 If the purpose is for dropping csv headers, perhaps we don't really need a
 common drop and only one that drops the first line in a file? I'd really
 try hard to avoid a common drop/dropWhile because they can be expensive to
 do.

 Note that I think we will be adding this functionality (ignoring headers)
 to the CsvRDD functionality in Spark SQL.
  https://github.com/apache/spark/pull/1351


 On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

  You can find some of the prior, related discussion here:
  https://issues.apache.org/jira/browse/SPARK-1021
 
 
  On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com wrote:
 
  
  
   - Original Message -
Rather than embrace non-lazy transformations and add more of them,
 I'd
rather we 1) try to fully characterize the needs that are driving
 their
creation/usage; and 2) design and implement new Spark abstractions
 that
will allow us to meet those needs and eliminate existing non-lazy
transformation.
  
  
   In the case of drop, obtaining the index of the boundary partition can
 be
   viewed as the action forcing compute -- one that happens to be invoked
   inside of a transform.  The concept of a lazy action, that is only
   triggered if the result rdd has compute invoked on it, might be
  sufficient
   to restore laziness to the drop transform.   For that matter, I might
  find
   some way to make use of Scala lazy values directly and achieve the same
   goal for drop.
  
  
  
They really mess up things like creation of asynchronous
FutureActions, job cancellation and accounting of job resource usage,
   etc.,
so I'd rather we seek a way out of the existing hole rather than make
  it
deeper.
   
   
On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com
  wrote:
   


 - Original Message -
  Sure, drop() would be useful, but breaking the transformations
 are
   lazy;
  only actions launch jobs model is abhorrent -- which is not to
 say
   that
 we
  haven't already broken that model for useful operations (cf.
  RangePartitioner, which is used for sorted RDDs), but rather that
   each
 such
  exception to the model is a significant source of pain that can
 be
   hard
 to
  work with or work around.

 A thought that comes to my mind here is that there are in fact
  already
   two
 categories of transform: ones that are truly lazy, and ones that
 are
   not.
  A possible option is to embrace that, and commit to documenting
 the
   two
 categories as such, with an obvious bias towards favoring lazy
   transforms
 (to paraphrase Churchill, we're down to haggling over the price).


 
  I really wouldn't like to see another such model-breaking
   transformation
  added to the API.  On the other hand, being able to write
   transformations
  with dependencies on these kind of internal jobs is sometimes
  very
  useful, so a significant reworking of Spark's Dependency model
 that
   would
  allow for lazily running such internal jobs and making the
 results
  available to subsequent stages may be something worth pursuing.


 This seems like a very interesting angle.   I don't have much feel
  for
 what a solution would look like, but it sounds as if it would
 involve
 caching all operations embodied by RDD transform method code for
 provisional execution.  I believe that these levels of invocation
 are
 currently executed in the master, not executor nodes.


 
 
  On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash 
 and...@andrewash.com
 wrote:
 
   Personally I'd find the method useful -- I've often had a .csv
  file
 with a
   header row that I want to drop so filter it out, which touches
  all
   partitions anyway.  I don't have any comments on the
  implementation
 quite
   yet though.
  
  
   On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson 
 e...@redhat.com
 wrote:
  
A few weeks ago I submitted a PR for supporting rdd.drop(n),
   under
SPARK-2315:
https://issues.apache.org/jira/browse/SPARK-2315
   
Supporting the drop method would make some operations
  convenient,
 however
it forces computation of = 1 partition of the parent RDD,
 and
   so it
   would
behave like a partial action that returns an RDD as the
  result.
   
I wrote up a discussion of these trade-offs here:
   
   
  

  
 
 http://erikerlandson.github.io/blog/2014/07/20/some-implications-of-supporting-the-scala-drop-method-for-spark-rdds/
   
  
 

   
  
 



Re: RFC: Supporting the Scala drop Method for Spark RDDs

2014-07-21 Thread Reynold Xin
Yes, that could work. But it is not as simple as just a binary flag.

We might want to skip the first row for every file, or the header only for
the first file. The former is not really supported out of the box by the
input format I think?


On Mon, Jul 21, 2014 at 10:50 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 It could make sense to add a skipHeader argument to SparkContext.textFile?


 On Mon, Jul 21, 2014 at 10:37 PM, Reynold Xin r...@databricks.com wrote:

  If the purpose is for dropping csv headers, perhaps we don't really need
 a
  common drop and only one that drops the first line in a file? I'd really
  try hard to avoid a common drop/dropWhile because they can be expensive
 to
  do.
 
  Note that I think we will be adding this functionality (ignoring headers)
  to the CsvRDD functionality in Spark SQL.
   https://github.com/apache/spark/pull/1351
 
 
  On Mon, Jul 21, 2014 at 1:45 PM, Mark Hamstra m...@clearstorydata.com
  wrote:
 
   You can find some of the prior, related discussion here:
   https://issues.apache.org/jira/browse/SPARK-1021
  
  
   On Mon, Jul 21, 2014 at 1:25 PM, Erik Erlandson e...@redhat.com
 wrote:
  
   
   
- Original Message -
 Rather than embrace non-lazy transformations and add more of them,
  I'd
 rather we 1) try to fully characterize the needs that are driving
  their
 creation/usage; and 2) design and implement new Spark abstractions
  that
 will allow us to meet those needs and eliminate existing non-lazy
 transformation.
   
   
In the case of drop, obtaining the index of the boundary partition
 can
  be
viewed as the action forcing compute -- one that happens to be
 invoked
inside of a transform.  The concept of a lazy action, that is only
triggered if the result rdd has compute invoked on it, might be
   sufficient
to restore laziness to the drop transform.   For that matter, I might
   find
some way to make use of Scala lazy values directly and achieve the
 same
goal for drop.
   
   
   
 They really mess up things like creation of asynchronous
 FutureActions, job cancellation and accounting of job resource
 usage,
etc.,
 so I'd rather we seek a way out of the existing hole rather than
 make
   it
 deeper.


 On Mon, Jul 21, 2014 at 10:24 AM, Erik Erlandson e...@redhat.com
   wrote:

 
 
  - Original Message -
   Sure, drop() would be useful, but breaking the transformations
  are
lazy;
   only actions launch jobs model is abhorrent -- which is not to
  say
that
  we
   haven't already broken that model for useful operations (cf.
   RangePartitioner, which is used for sorted RDDs), but rather
 that
each
  such
   exception to the model is a significant source of pain that can
  be
hard
  to
   work with or work around.
 
  A thought that comes to my mind here is that there are in fact
   already
two
  categories of transform: ones that are truly lazy, and ones that
  are
not.
   A possible option is to embrace that, and commit to documenting
  the
two
  categories as such, with an obvious bias towards favoring lazy
transforms
  (to paraphrase Churchill, we're down to haggling over the price).
 
 
  
   I really wouldn't like to see another such model-breaking
transformation
   added to the API.  On the other hand, being able to write
transformations
   with dependencies on these kind of internal jobs is sometimes
   very
   useful, so a significant reworking of Spark's Dependency model
  that
would
   allow for lazily running such internal jobs and making the
  results
   available to subsequent stages may be something worth pursuing.
 
 
  This seems like a very interesting angle.   I don't have much
 feel
   for
  what a solution would look like, but it sounds as if it would
  involve
  caching all operations embodied by RDD transform method code for
  provisional execution.  I believe that these levels of invocation
  are
  currently executed in the master, not executor nodes.
 
 
  
  
   On Mon, Jul 21, 2014 at 8:27 AM, Andrew Ash 
  and...@andrewash.com
  wrote:
  
Personally I'd find the method useful -- I've often had a
 .csv
   file
  with a
header row that I want to drop so filter it out, which
 touches
   all
partitions anyway.  I don't have any comments on the
   implementation
  quite
yet though.
   
   
On Mon, Jul 21, 2014 at 8:24 AM, Erik Erlandson 
  e...@redhat.com
  wrote:
   
 A few weeks ago I submitted a PR for supporting
 rdd.drop(n),
under
 SPARK-2315:
 https://issues.apache.org/jira/browse/SPARK-2315

 Supporting the drop method would make some operations
   convenient,
  however
 it forces computation