[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940974#comment-16940974
 ] 

Thomas Graves commented on SPARK-27396:
---------------------------------------

The main objectives have actually already been implemented, see the linked 
jiras. I will close this.

> SPIP: Public APIs for extended Columnar Processing Support
> ----------------------------------------------------------
>
>                 Key: SPARK-27396
>                 URL: https://issues.apache.org/jira/browse/SPARK-27396
>             Project: Spark
>          Issue Type: Epic
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Robert Joseph Evans
>            Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but it is the only piece of 
> code that also consumes columnar data as an input.
>  # Pandas vectorized processing.  To be able to support Pandas UDFs Spark 
> will build up a batch of data and send it to python for processing, and then 
> get a batch of data back as a result.  The format of the data being sent to 
> python can either be pickle, which is the default, or optionally Arrow. The 
> result returned is the same format. The limitations here really are around 
> performance.  Transforming the data back and forth can be very expensive.
>  
> *Q4.* What is new in your approach and why do you think it will be successful?
> What we are primarily doing is cleaning up a lot of existing functionality, 
> refactoring it, and making it more generic. We think we can be successful 
> because we have already completed a proof of concept that shows columnar 
> processing can be efficiently done in Spark.
>  
> *Q5.* Who cares? If you are successful, what difference will it make?
> Anyone who wants to accelerate spark.  At Spark+AI summit this year, 2019, I 
> spoke with multiple companies (7 by my count including Facebook) trying to do 
> this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more 
> efficient processing.  This will help all of them to provide a clean 
> implementation of accelerated ETL processing, without hacks like overriding 
> internal spark classes by putting jars first on the classpath, which many of 
> these companies are currently doing.
>  
> *Q6.* What are the risks?
> Technologically I don’t see many risks.  We have done a proof of concept 
> implementation that shows it can be done, it is just a matter of putting 
> those changes in place.
>  
> *Q7.* How long will it take?
> I suspect that we can put together a patch with tests in a month. Adding 
> documentation and iterating on the APIs I would suspect would put it at a 
> month and a half to two months. So one quarter would give us enough time to 
> probably get through everything.
>  
> *Q8.* What are the mid-term and final “exams” to check for success?
> The first check for success would be to successfully transition the existing 
> columnar code over to using this behind the scenes. That would be separating 
> out the code that goes from rows to columns when sending the data to python 
> for processing by Pandas UDFs, and also the code that goes from columns to 
> rows for the file formats, caching, and the output of Pandas UDFs.
>  
> The final success is really about adoption and seeing if the follow on work 
> that we, and hopefully others, have shows that it cleanly enables something 
> that was not possible before.
>  
> *Appendix A:*
> For the most part the APIs will not need to change in any backwards 
> incompatible way.  Also note that these are not necessarily final changes to 
> the APIs, but mostly reflect the general direction that we want to go in, so 
> there is no need to include nits in the discussion on the APIs.  Those can be 
> covered by code reviews.
>  
> {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to 
> another to allow the catalyst Expression class to have access to them.
>  
> Expression will get added to it.
>  
> {code:java}
>  /**
>   * Returns true if this expression supports columnar processing through 
> [[columnarEval]].
>   */
>  def supportsColumnar: Boolean = false
>  /**
>   * Returns the result of evaluating this expression on the entire 
> ColumnarBatch. The result of
>   * calling this may be a single ColumnVector, or a scalar value. Scalar 
> values can happen if they
>   * are a part of the expression or in some cases may be an optimization, 
> like using the batch's
>   * null count to know is isNull is false for the entire batch without doing 
> any calculations.
>   */
>  def columnarEval(batch: ColumnarBatch): Any = {
>    throw new IllegalStateException(s"Internal Error ${this.getClass} has 
> column support mismatch")
>  }
> {code}
>  
> SparkPlan will be updated to include
> {code:java}
>  /**
>   * Return true if this stage of the plan supports columnar execution.
>   */
>  def supportsColumnar: Boolean = false
>  /**
>   * The exact types of the columns that are output in columnar processing 
> mode (used for faster codegen of transitions from columns to rows).
>   */
>  def vectorTypes: Option[Seq[String]] = None
>  /**
>   * Returns the result of this query as an RDD[ColumnarBatch] by delegating 
> to `doColumnarExecute`
>   * after preparations.
>   *
>   * Concrete implementations of SparkPlan should override `doColumnarExecute` 
> if `supportsColumnar`
>   * returns true.
>   */
>  final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
>     if (isCanonicalizedPlan) {
>         throw new IllegalStateException("A canonicalized plan is not supposed 
> to be executed.")
>     }
>     doExecuteColumnar()
>  }
>  /**
>   * Produces the result of the query as an `RDD[ColumnarBatch]` if 
> [[supportsColumnar]] returns
>   * true.
>   */
>  protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
>     // If the user updates supportsColumnar, but not this blow up.
>     throw new IllegalStateException(s"Internal Error ${this.getClass} has 
> column support" +
>     s" mismatch:\n${this}")
>  }
> {code}
> In BufferedRowIterator init will change to reflect that the values in it 
> could be ColumnarBatches too, which is the case today.
> {code:java}
> -  public abstract void init(int index, Iterator<InternalRow>[] iters);
> +  public abstract void init(int index, Iterator<Object>[] iters);
> {code}
>  
>  SparkSessionExtensions will have new APIs for columnar processing.
> {code:java}
> type ColumnarRuleBuilder = SparkSession => ColumnarRule
> def injectColumnar(builder: ColumnarRuleBuilder): Unit
> /**
>  * :: Experimental ::
>  * Holds a rule that run prior to inserting column to row and row to column 
> transitions to
>  * allow for injecting a columnar implementation into various operators, and 
> a rule that
>  * runs after to allow overriding the implementation of those transitions, 
> and potentially
>  * cleaning up the plan (like inserting batching of columnar data for more 
> efficient processing).
>  */
> @DeveloperApi
> @Experimental
> @Unstable
> class ColumnarRule {
>   def pre(plan: SparkPlan): SparkPlan = plan
>   def post(plan: SparkPlan): SparkPlan = plan
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to