[ https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940974#comment-16940974 ]
Thomas Graves commented on SPARK-27396: --------------------------------------- The main objectives have actually already been implemented, see the linked jiras. I will close this. > SPIP: Public APIs for extended Columnar Processing Support > ---------------------------------------------------------- > > Key: SPARK-27396 > URL: https://issues.apache.org/jira/browse/SPARK-27396 > Project: Spark > Issue Type: Epic > Components: SQL > Affects Versions: 3.0.0 > Reporter: Robert Joseph Evans > Priority: Major > > *SPIP: Columnar Processing Without Arrow Formatting Guarantees.* > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > The Dataset/DataFrame API in Spark currently only exposes to users one row at > a time when processing data. The goals of this are to > # Add to the current sql extensions mechanism so advanced users can have > access to the physical SparkPlan and manipulate it to provide columnar > processing for existing operators, including shuffle. This will allow them > to implement their own cost based optimizers to decide when processing should > be columnar and when it should not. > # Make any transitions between the columnar memory layout and a row based > layout transparent to the users so operations that are not columnar see the > data as rows, and operations that are columnar see the data as columns. > > Not Requirements, but things that would be nice to have. > # Transition the existing in memory columnar layouts to be compatible with > Apache Arrow. This would make the transformations to Apache Arrow format a > no-op. The existing formats are already very close to those layouts in many > cases. This would not be using the Apache Arrow java library, but instead > being compatible with the memory > [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a > subset of that layout. > > *Q2.* What problem is this proposal NOT designed to solve? > The goal of this is not for ML/AI but to provide APIs for accelerated > computing in Spark primarily targeting SQL/ETL like workloads. ML/AI already > have several mechanisms to get data into/out of them. These can be improved > but will be covered in a separate SPIP. > This is not trying to implement any of the processing itself in a columnar > way, with the exception of examples for documentation. > This does not cover exposing the underlying format of the data. The only way > to get at the data in a ColumnVector is through the public APIs. Exposing > the underlying format to improve efficiency will be covered in a separate > SPIP. > This is not trying to implement new ways of transferring data to external > ML/AI applications. That is covered by separate SPIPs already. > This is not trying to add in generic code generation for columnar processing. > Currently code generation for columnar processing is only supported when > translating columns to rows. We will continue to support this, but will not > extend it as a general solution. That will be covered in a separate SPIP if > we find it is helpful. For now columnar processing will be interpreted. > This is not trying to expose a way to get columnar data into Spark through > DataSource V2 or any other similar API. That would be covered by a separate > SPIP if we find it is needed. > > *Q3.* How is it done today, and what are the limits of current practice? > The current columnar support is limited to 3 areas. > # Internal implementations of FileFormats, optionally can return a > ColumnarBatch instead of rows. The code generation phase knows how to take > that columnar data and iterate through it as rows for stages that wants rows, > which currently is almost everything. The limitations here are mostly > implementation specific. The current standard is to abuse Scala’s type > erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The > code generation can handle this because it is generating java code, so it > bypasses scala’s type checking and just casts the InternalRow to the desired > ColumnarBatch. This makes it difficult for others to implement the same > functionality for different processing because they can only do it through > code generation. There really is no clean separate path in the code > generation for columnar vs row based. Additionally, because it is only > supported through code generation if for any reason code generation would > fail there is no backup. This is typically fine for input formats but can be > problematic when we get into more extensive processing. > # When caching data it can optionally be cached in a columnar format if the > input is also columnar. This is similar to the first area and has the same > limitations because the cache acts as an input, but it is the only piece of > code that also consumes columnar data as an input. > # Pandas vectorized processing. To be able to support Pandas UDFs Spark > will build up a batch of data and send it to python for processing, and then > get a batch of data back as a result. The format of the data being sent to > python can either be pickle, which is the default, or optionally Arrow. The > result returned is the same format. The limitations here really are around > performance. Transforming the data back and forth can be very expensive. > > *Q4.* What is new in your approach and why do you think it will be successful? > What we are primarily doing is cleaning up a lot of existing functionality, > refactoring it, and making it more generic. We think we can be successful > because we have already completed a proof of concept that shows columnar > processing can be efficiently done in Spark. > > *Q5.* Who cares? If you are successful, what difference will it make? > Anyone who wants to accelerate spark. At Spark+AI summit this year, 2019, I > spoke with multiple companies (7 by my count including Facebook) trying to do > this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more > efficient processing. This will help all of them to provide a clean > implementation of accelerated ETL processing, without hacks like overriding > internal spark classes by putting jars first on the classpath, which many of > these companies are currently doing. > > *Q6.* What are the risks? > Technologically I don’t see many risks. We have done a proof of concept > implementation that shows it can be done, it is just a matter of putting > those changes in place. > > *Q7.* How long will it take? > I suspect that we can put together a patch with tests in a month. Adding > documentation and iterating on the APIs I would suspect would put it at a > month and a half to two months. So one quarter would give us enough time to > probably get through everything. > > *Q8.* What are the mid-term and final “exams” to check for success? > The first check for success would be to successfully transition the existing > columnar code over to using this behind the scenes. That would be separating > out the code that goes from rows to columns when sending the data to python > for processing by Pandas UDFs, and also the code that goes from columns to > rows for the file formats, caching, and the output of Pandas UDFs. > > The final success is really about adoption and seeing if the follow on work > that we, and hopefully others, have shows that it cleanly enables something > that was not possible before. > > *Appendix A:* > For the most part the APIs will not need to change in any backwards > incompatible way. Also note that these are not necessarily final changes to > the APIs, but mostly reflect the general direction that we want to go in, so > there is no need to include nits in the discussion on the APIs. Those can be > covered by code reviews. > > {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to > another to allow the catalyst Expression class to have access to them. > > Expression will get added to it. > > {code:java} > /** > * Returns true if this expression supports columnar processing through > [[columnarEval]]. > */ > def supportsColumnar: Boolean = false > /** > * Returns the result of evaluating this expression on the entire > ColumnarBatch. The result of > * calling this may be a single ColumnVector, or a scalar value. Scalar > values can happen if they > * are a part of the expression or in some cases may be an optimization, > like using the batch's > * null count to know is isNull is false for the entire batch without doing > any calculations. > */ > def columnarEval(batch: ColumnarBatch): Any = { > throw new IllegalStateException(s"Internal Error ${this.getClass} has > column support mismatch") > } > {code} > > SparkPlan will be updated to include > {code:java} > /** > * Return true if this stage of the plan supports columnar execution. > */ > def supportsColumnar: Boolean = false > /** > * The exact types of the columns that are output in columnar processing > mode (used for faster codegen of transitions from columns to rows). > */ > def vectorTypes: Option[Seq[String]] = None > /** > * Returns the result of this query as an RDD[ColumnarBatch] by delegating > to `doColumnarExecute` > * after preparations. > * > * Concrete implementations of SparkPlan should override `doColumnarExecute` > if `supportsColumnar` > * returns true. > */ > final def executeColumnar(): RDD[ColumnarBatch] = executeQuery { > if (isCanonicalizedPlan) { > throw new IllegalStateException("A canonicalized plan is not supposed > to be executed.") > } > doExecuteColumnar() > } > /** > * Produces the result of the query as an `RDD[ColumnarBatch]` if > [[supportsColumnar]] returns > * true. > */ > protected def doExecuteColumnar(): RDD[ColumnarBatch] = { > // If the user updates supportsColumnar, but not this blow up. > throw new IllegalStateException(s"Internal Error ${this.getClass} has > column support" + > s" mismatch:\n${this}") > } > {code} > In BufferedRowIterator init will change to reflect that the values in it > could be ColumnarBatches too, which is the case today. > {code:java} > - public abstract void init(int index, Iterator<InternalRow>[] iters); > + public abstract void init(int index, Iterator<Object>[] iters); > {code} > > SparkSessionExtensions will have new APIs for columnar processing. > {code:java} > type ColumnarRuleBuilder = SparkSession => ColumnarRule > def injectColumnar(builder: ColumnarRuleBuilder): Unit > /** > * :: Experimental :: > * Holds a rule that run prior to inserting column to row and row to column > transitions to > * allow for injecting a columnar implementation into various operators, and > a rule that > * runs after to allow overriding the implementation of those transitions, > and potentially > * cleaning up the plan (like inserting batching of columnar data for more > efficient processing). > */ > @DeveloperApi > @Experimental > @Unstable > class ColumnarRule { > def pre(plan: SparkPlan): SparkPlan = plan > def post(plan: SparkPlan): SparkPlan = plan > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org