[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2021-07-11 Thread zengrui (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zengrui updated SPARK-27396:

Description: 
*strong text**SPIP: Columnar Processing Without Arrow Formatting Guarantees.*

 

*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to
 # Add to the current sql extensions mechanism so advanced users can have 
access to the physical SparkPlan and manipulate it to provide columnar 
processing for existing operators, including shuffle.  This will allow them to 
implement their own cost based optimizers to decide when processing should be 
columnar and when it should not.
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the users so operations that are not columnar see the 
data as rows, and operations that are columnar see the data as columns.

 

Not Requirements, but things that would be nice to have.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.

 

*Q2.* What problem is this proposal NOT designed to solve? 

The goal of this is not for ML/AI but to provide APIs for accelerated computing 
in Spark primarily targeting SQL/ETL like workloads.  ML/AI already have 
several mechanisms to get data into/out of them. These can be improved but will 
be covered in a separate SPIP.

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation.

This does not cover exposing the underlying format of the data.  The only way 
to get at the data in a ColumnVector is through the public APIs.  Exposing the 
underlying format to improve efficiency will be covered in a separate SPIP.

This is not trying to implement new ways of transferring data to external ML/AI 
applications.  That is covered by separate SPIPs already.

This is not trying to add in generic code generation for columnar processing.  
Currently code generation for columnar processing is only supported when 
translating columns to rows.  We will continue to support this, but will not 
extend it as a general solution. That will be covered in a separate SPIP if we 
find it is helpful.  For now columnar processing will be interpreted.

This is not trying to expose a way to get columnar data into Spark through 
DataSource V2 or any other similar API.  That would be covered by a separate 
SPIP if we find it is needed.

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Internal implementations of FileFormats, optionally can return a 
ColumnarBatch instead of rows.  The code generation phase knows how to take 
that columnar data and iterate through it as rows for stages that wants rows, 
which currently is almost everything.  The limitations here are mostly 
implementation specific. The current standard is to abuse Scala’s type erasure 
to return ColumnarBatches as the elements of an RDD[InternalRow]. The code 
generation can handle this because it is generating java code, so it bypasses 
scala’s type checking and just casts the InternalRow to the desired 
ColumnarBatch.  This makes it difficult for others to implement the same 
functionality for different processing because they can only do it through code 
generation. There really is no clean separate path in the code generation for 
columnar vs row based. Additionally, because it is only supported through code 
generation if for any reason code generation would fail there is no backup.  
This is typically fine for input formats but can be problematic when we get 
into more extensive processing.
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it to python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is the same format. The limitations here really are around 
performance.  Transforming the data back and forth can be very expensive.

 

*Q4.* What is new in your approach 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-06-04 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Epic Name: Public APIs for extended Columnar Processing Support

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Epic
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-06-04 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Issue Type: Epic  (was: Improvement)

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Epic
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-29 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Description: 
*SPIP: Columnar Processing Without Arrow Formatting Guarantees.*

 

*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to
 # Add to the current sql extensions mechanism so advanced users can have 
access to the physical SparkPlan and manipulate it to provide columnar 
processing for existing operators, including shuffle.  This will allow them to 
implement their own cost based optimizers to decide when processing should be 
columnar and when it should not.
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the users so operations that are not columnar see the 
data as rows, and operations that are columnar see the data as columns.

 

Not Requirements, but things that would be nice to have.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.

 

*Q2.* What problem is this proposal NOT designed to solve? 

The goal of this is not for ML/AI but to provide APIs for accelerated computing 
in Spark primarily targeting SQL/ETL like workloads.  ML/AI already have 
several mechanisms to get data into/out of them. These can be improved but will 
be covered in a separate SPIP.

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation.

This does not cover exposing the underlying format of the data.  The only way 
to get at the data in a ColumnVector is through the public APIs.  Exposing the 
underlying format to improve efficiency will be covered in a separate SPIP.

This is not trying to implement new ways of transferring data to external ML/AI 
applications.  That is covered by separate SPIPs already.

This is not trying to add in generic code generation for columnar processing.  
Currently code generation for columnar processing is only supported when 
translating columns to rows.  We will continue to support this, but will not 
extend it as a general solution. That will be covered in a separate SPIP if we 
find it is helpful.  For now columnar processing will be interpreted.

This is not trying to expose a way to get columnar data into Spark through 
DataSource V2 or any other similar API.  That would be covered by a separate 
SPIP if we find it is needed.

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Internal implementations of FileFormats, optionally can return a 
ColumnarBatch instead of rows.  The code generation phase knows how to take 
that columnar data and iterate through it as rows for stages that wants rows, 
which currently is almost everything.  The limitations here are mostly 
implementation specific. The current standard is to abuse Scala’s type erasure 
to return ColumnarBatches as the elements of an RDD[InternalRow]. The code 
generation can handle this because it is generating java code, so it bypasses 
scala’s type checking and just casts the InternalRow to the desired 
ColumnarBatch.  This makes it difficult for others to implement the same 
functionality for different processing because they can only do it through code 
generation. There really is no clean separate path in the code generation for 
columnar vs row based. Additionally, because it is only supported through code 
generation if for any reason code generation would fail there is no backup.  
This is typically fine for input formats but can be problematic when we get 
into more extensive processing.
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it to python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is the same format. The limitations here really are around 
performance.  Transforming the data back and forth can be very expensive.

 

*Q4.* What is new in 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-11 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Shepherd: Thomas Graves

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can be problematic when we get into more extensive 
> processing. 
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but it is the only piece of 
> code that