[ https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Joseph Evans updated SPARK-27396: ---------------------------------------- Description: *SPIP: Columnar Processing Without Arrow Formatting Guarantees.* *Q1.* What are you trying to do? Articulate your objectives using absolutely no jargon. The Dataset/DataFrame API in Spark currently only exposes to users one row at a time when processing data. The goals of this are to # Add to the current sql extensions mechanism so advanced users can have access to the physical SparkPlan and manipulate it to provide columnar processing for existing operators, including shuffle. This will allow them to implement their own cost based optimizers to decide when processing should be columnar and when it should not. # Make any transitions between the columnar memory layout and a row based layout transparent to the users so operations that are not columnar see the data as rows, and operations that are columnar see the data as columns. Not Requirements, but things that would be nice to have. # Transition the existing in memory columnar layouts to be compatible with Apache Arrow. This would make the transformations to Apache Arrow format a no-op. The existing formats are already very close to those layouts in many cases. This would not be using the Apache Arrow java library, but instead being compatible with the memory [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a subset of that layout. *Q2.* What problem is this proposal NOT designed to solve? The goal of this is not for ML/AI but to provide APIs for accelerated computing in Spark primarily targeting SQL/ETL like workloads. ML/AI already have several mechanisms to get data into/out of them. These can be improved but will be covered in a separate SPIP. This is not trying to implement any of the processing itself in a columnar way, with the exception of examples for documentation. This does not cover exposing the underlying format of the data. The only way to get at the data in a ColumnVector is through the public APIs. Exposing the underlying format to improve efficiency will be covered in a separate SPIP. This is not trying to implement new ways of transferring data to external ML/AI applications. That is covered by separate SPIPs already. This is not trying to add in generic code generation for columnar processing. Currently code generation for columnar processing is only supported when translating columns to rows. We will continue to support this, but will not extend it as a general solution. That will be covered in a separate SPIP if we find it is helpful. For now columnar processing will be interpreted. This is not trying to expose a way to get columnar data into Spark through DataSource V2 or any other similar API. That would be covered by a separate SPIP if we find it is needed. *Q3.* How is it done today, and what are the limits of current practice? The current columnar support is limited to 3 areas. # Internal implementations of FileFormats, optionally can return a ColumnarBatch instead of rows. The code generation phase knows how to take that columnar data and iterate through it as rows for stages that wants rows, which currently is almost everything. The limitations here are mostly implementation specific. The current standard is to abuse Scala’s type erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The code generation can handle this because it is generating java code, so it bypasses scala’s type checking and just casts the InternalRow to the desired ColumnarBatch. This makes it difficult for others to implement the same functionality for different processing because they can only do it through code generation. There really is no clean separate path in the code generation for columnar vs row based. Additionally, because it is only supported through code generation if for any reason code generation would fail there is no backup. This is typically fine for input formats but can be problematic when we get into more extensive processing. # When caching data it can optionally be cached in a columnar format if the input is also columnar. This is similar to the first area and has the same limitations because the cache acts as an input, but it is the only piece of code that also consumes columnar data as an input. # Pandas vectorized processing. To be able to support Pandas UDFs Spark will build up a batch of data and send it to python for processing, and then get a batch of data back as a result. The format of the data being sent to python can either be pickle, which is the default, or optionally Arrow. The result returned is the same format. The limitations here really are around performance. Transforming the data back and forth can be very expensive. *Q4.* What is new in your approach and why do you think it will be successful? What we are primarily doing is cleaning up a lot of existing functionality, refactoring it, and making it more generic. We think we can be successful because we have already completed a proof of concept that shows columnar processing can be efficiently done in Spark. *Q5.* Who cares? If you are successful, what difference will it make? Anyone who wants to accelerate spark. At Spark+AI summit this year, 2019, I spoke with multiple companies (7 by my count including Facebook) trying to do this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more efficient processing. This will help all of them to provide a clean implementation of accelerated ETL processing, without hacks like overriding internal spark classes by putting jars first on the classpath, which many of these companies are currently doing. *Q6.* What are the risks? Technologically I don’t see many risks. We have done a proof of concept implementation that shows it can be done, it is just a matter of putting those changes in place. *Q7.* How long will it take? I suspect that we can put together a patch with tests in a month. Adding documentation and iterating on the APIs I would suspect would put it at a month and a half to two months. So one quarter would give us enough time to probably get through everything. *Q8.* What are the mid-term and final “exams” to check for success? The first check for success would be to successfully transition the existing columnar code over to using this behind the scenes. That would be separating out the code that goes from rows to columns when sending the data to python for processing by Pandas UDFs, and also the code that goes from columns to rows for the file formats, caching, and the output of Pandas UDFs. The final success is really about adoption and seeing if the follow on work that we, and hopefully others, have shows that it cleanly enables something that was not possible before. *Appendix A:* For the most part the APIs will not need to change in any backwards incompatible way. Also note that these are not necessarily final changes to the APIs, but mostly reflect the general direction that we want to go in, so there is no need to include nits in the discussion on the APIs. Those can be covered by code reviews. {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to another to allow the catalyst Expression class to have access to them. Expression will get added to it. {code:java} /** * Returns true if this expression supports columnar processing through [[columnarEval]]. */ def supportsColumnar: Boolean = false /** * Returns the result of evaluating this expression on the entire ColumnarBatch. The result of * calling this may be a single ColumnVector, or a scalar value. Scalar values can happen if they * are a part of the expression or in some cases may be an optimization, like using the batch's * null count to know is isNull is false for the entire batch without doing any calculations. */ def columnarEval(batch: ColumnarBatch): Any = { throw new IllegalStateException(s"Internal Error ${this.getClass} has column support mismatch") } {code} SparkPlan will be updated to include {code:java} /** * Return true if this stage of the plan supports columnar execution. */ def supportsColumnar: Boolean = false /** * The exact types of the columns that are output in columnar processing mode (used for faster codegen of transitions from columns to rows). */ def vectorTypes: Option[Seq[String]] = None /** * Returns the result of this query as an RDD[ColumnarBatch] by delegating to `doColumnarExecute` * after preparations. * * Concrete implementations of SparkPlan should override `doColumnarExecute` if `supportsColumnar` * returns true. */ final def executeColumnar(): RDD[ColumnarBatch] = executeQuery { if (isCanonicalizedPlan) { throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") } doExecuteColumnar() } /** * Produces the result of the query as an `RDD[ColumnarBatch]` if [[supportsColumnar]] returns * true. */ protected def doExecuteColumnar(): RDD[ColumnarBatch] = { // If the user updates supportsColumnar, but not this blow up. throw new IllegalStateException(s"Internal Error ${this.getClass} has column support" + s" mismatch:\n${this}") } {code} In BufferedRowIterator init will change to reflect that the values in it could be ColumnarBatches too, which is the case today. {code:java} - public abstract void init(int index, Iterator<InternalRow>[] iters); + public abstract void init(int index, Iterator<Object>[] iters); {code} SparkSessionExtensions will have new APIs for columnar processing. {code:java} type ColumnarRuleBuilder = SparkSession => ColumnarRule def injectColumnar(builder: ColumnarRuleBuilder): Unit /** * :: Experimental :: * Holds a rule that run prior to inserting column to row and row to column transitions to * allow for injecting a columnar implementation into various operators, and a rule that * runs after to allow overriding the implementation of those transitions, and potentially * cleaning up the plan (like inserting batching of columnar data for more efficient processing). */ @DeveloperApi @Experimental @Unstable class ColumnarRule { def pre(plan: SparkPlan): SparkPlan = plan def post(plan: SparkPlan): SparkPlan = plan } {code} was: *Q1.* What are you trying to do? Articulate your objectives using absolutely no jargon. The Dataset/DataFrame API in Spark currently only exposes to users one row at a time when processing data. The goals of this are to # Expose to end users a new option of processing the data in a columnar format, multiple rows at a time, with the data organized into contiguous arrays in memory. # Make any transitions between the columnar memory layout and a row based layout transparent to the end user. # Allow for simple data exchange with other systems, DL/ML libraries, pandas, etc. by having clean APIs to transform the columnar data into an Apache Arrow compatible layout. # Provide a plugin mechanism for columnar processing support so an advanced user could avoid data transition between columnar and row based processing even through shuffles. This means we should at least support pluggable APIs so an advanced end user can implement the columnar partitioning themselves, and provide the glue necessary to shuffle the data still in a columnar format. # Expose new APIs that allow advanced users or frameworks to implement columnar processing either as UDFs, or by adjusting the physical plan to do columnar processing. If the latter is too controversial we can move it to another SPIP, but we plan to implement some accelerated computing in parallel with this feature to be sure the APIs work, and without this feature it makes that impossible. Not Requirements, but things that would be nice to have. # Provide default implementations for partitioning columnar data, so users don’t have to. # Transition the existing in memory columnar layouts to be compatible with Apache Arrow. This would make the transformations to Apache Arrow format a no-op. The existing formats are already very close to those layouts in many cases. This would not be using the Apache Arrow java library, but instead being compatible with the memory [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a subset of that layout. # Provide a clean transition from the existing code to the new one. The existing APIs which are public but evolving are not that far off from what is being proposed. We should be able to create a new parallel API that can wrap the existing one. This means any file format that is trying to support columnar can still do so until we make a conscious decision to deprecate and then turn off the old APIs. *Q2.* What problem is this proposal NOT designed to solve? This is not trying to implement any of the processing itself in a columnar way, with the exception of examples for documentation, and possibly default implementations for partitioning of columnar shuffle. *Q3.* How is it done today, and what are the limits of current practice? The current columnar support is limited to 3 areas. # Input formats, optionally can return a ColumnarBatch instead of rows. The code generation phase knows how to take that columnar data and iterate through it as rows for stages that wants rows, which currently is almost everything. The limitations here are mostly implementation specific. The current standard is to abuse Scala’s type erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The code generation can handle this because it is generating java code, so it bypasses scala’s type checking and just casts the InternalRow to the desired ColumnarBatch. This makes it difficult for others to implement the same functionality for different processing because they can only do it through code generation. There really is no clean separate path in the code generation for columnar vs row based. Additionally because it is only supported through code generation if for any reason code generation would fail there is no backup. This is typically fine for input formats but can be problematic when we get into more extensive processing. # When caching data it can optionally be cached in a columnar format if the input is also columnar. This is similar to the first area and has the same limitations because the cache acts as an input, but it is the only piece of code that also consumes columnar data as an input. # Pandas vectorized processing. To be able to support Pandas UDFs Spark will build up a batch of data and send it python for processing, and then get a batch of data back as a result. The format of the data being sent to python can either be pickle, which is the default, or optionally Arrow. The result returned is the same format. The limitations here really are around performance. Transforming the data back and forth can be very expensive. *Q4.* What is new in your approach and why do you think it will be successful? Conceptually what we are primarily doing is cleaning up a lot of existing functionality and getting it ready to be exposed as public facing APIs. We think we can be successful because we have already completed a proof of concept that shows columnar processing can be efficiently done in Spark. *Q5.* Who cares? If you are successful, what difference will it make? Anyone who wants to integrate Spark with other data processing systems will care, as it provides a cleaner, more standards-based way of transferring the data. Anyone who wants to experiment with accelerated computing, either on a CPU or GPU, will benefit as it provides a supported way to make this work instead of trying to hack something in that was not available before. *Q6.* What are the risks? Technologically I don’t see many risks. We have done a proof of concept implementation that shows it can be done. This biggest risks are if Apache Arrow loses favor as an interchange format between different systems. In the worst cast that means we may have to support transforming the data into other types, but because Arrow as the internal format is not a requirement and there really are not that many ways to lay out columnar data, it should require minor changes if anything. The second risk is if we didn’t setup the API in a powerful/flexible enough way for users to really take full advantage of it. This is really going to come with time, and if we start off fairly conservatively in our API we can hopefully expand it in the future. The final risk is around dictionaries. The current columnar layout exposes the dictionary as just a java class. When we go to an Arrow compatible layout the dictionary has a specific format. The Parquet APIs don’t give raw access to the layout of the dictionary. This is not something that we cannot overcome, it just means we may have less than optimal translations, in terms of performance, when going from Parquet formatted data to Arrow until we can get some changes into the Parquet API. This may be true of Orc as well. It is no worse off than it is today already. *Q7.* How long will it take? I suspect that we can put together a patch with tests in a month. Adding documentation and iterating on the APIs I would suspect would put it at a month and a half to two months. So one quarter would give us enough time to get through everything, including reviews if things go well. *Q8.* What are the mid-term and final “exams” to check for success? The first check for success would be to successfully transition the existing columnar code over to using this. That would be transforming and sending the data to python for processing by Pandas UDFs, and the file formats and caching code to use a cleaner more explicit columnar API. The final success is really about adoption and seeing if the follow on work that we, and hopefully others, have shows that it cleanly enables something that was not possible before. > SPIP: Public APIs for extended Columnar Processing Support > ---------------------------------------------------------- > > Key: SPARK-27396 > URL: https://issues.apache.org/jira/browse/SPARK-27396 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Robert Joseph Evans > Priority: Major > > *SPIP: Columnar Processing Without Arrow Formatting Guarantees.* > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > The Dataset/DataFrame API in Spark currently only exposes to users one row at > a time when processing data. The goals of this are to > # Add to the current sql extensions mechanism so advanced users can have > access to the physical SparkPlan and manipulate it to provide columnar > processing for existing operators, including shuffle. This will allow them > to implement their own cost based optimizers to decide when processing should > be columnar and when it should not. > # Make any transitions between the columnar memory layout and a row based > layout transparent to the users so operations that are not columnar see the > data as rows, and operations that are columnar see the data as columns. > > Not Requirements, but things that would be nice to have. > # Transition the existing in memory columnar layouts to be compatible with > Apache Arrow. This would make the transformations to Apache Arrow format a > no-op. The existing formats are already very close to those layouts in many > cases. This would not be using the Apache Arrow java library, but instead > being compatible with the memory > [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a > subset of that layout. > > *Q2.* What problem is this proposal NOT designed to solve? > The goal of this is not for ML/AI but to provide APIs for accelerated > computing in Spark primarily targeting SQL/ETL like workloads. ML/AI already > have several mechanisms to get data into/out of them. These can be improved > but will be covered in a separate SPIP. > This is not trying to implement any of the processing itself in a columnar > way, with the exception of examples for documentation. > This does not cover exposing the underlying format of the data. The only way > to get at the data in a ColumnVector is through the public APIs. Exposing > the underlying format to improve efficiency will be covered in a separate > SPIP. > This is not trying to implement new ways of transferring data to external > ML/AI applications. That is covered by separate SPIPs already. > This is not trying to add in generic code generation for columnar processing. > Currently code generation for columnar processing is only supported when > translating columns to rows. We will continue to support this, but will not > extend it as a general solution. That will be covered in a separate SPIP if > we find it is helpful. For now columnar processing will be interpreted. > This is not trying to expose a way to get columnar data into Spark through > DataSource V2 or any other similar API. That would be covered by a separate > SPIP if we find it is needed. > > *Q3.* How is it done today, and what are the limits of current practice? > The current columnar support is limited to 3 areas. > # Internal implementations of FileFormats, optionally can return a > ColumnarBatch instead of rows. The code generation phase knows how to take > that columnar data and iterate through it as rows for stages that wants rows, > which currently is almost everything. The limitations here are mostly > implementation specific. The current standard is to abuse Scala’s type > erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The > code generation can handle this because it is generating java code, so it > bypasses scala’s type checking and just casts the InternalRow to the desired > ColumnarBatch. This makes it difficult for others to implement the same > functionality for different processing because they can only do it through > code generation. There really is no clean separate path in the code > generation for columnar vs row based. Additionally, because it is only > supported through code generation if for any reason code generation would > fail there is no backup. This is typically fine for input formats but can be > problematic when we get into more extensive processing. > # When caching data it can optionally be cached in a columnar format if the > input is also columnar. This is similar to the first area and has the same > limitations because the cache acts as an input, but it is the only piece of > code that also consumes columnar data as an input. > # Pandas vectorized processing. To be able to support Pandas UDFs Spark > will build up a batch of data and send it to python for processing, and then > get a batch of data back as a result. The format of the data being sent to > python can either be pickle, which is the default, or optionally Arrow. The > result returned is the same format. The limitations here really are around > performance. Transforming the data back and forth can be very expensive. > > *Q4.* What is new in your approach and why do you think it will be successful? > What we are primarily doing is cleaning up a lot of existing functionality, > refactoring it, and making it more generic. We think we can be successful > because we have already completed a proof of concept that shows columnar > processing can be efficiently done in Spark. > > *Q5.* Who cares? If you are successful, what difference will it make? > Anyone who wants to accelerate spark. At Spark+AI summit this year, 2019, I > spoke with multiple companies (7 by my count including Facebook) trying to do > this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more > efficient processing. This will help all of them to provide a clean > implementation of accelerated ETL processing, without hacks like overriding > internal spark classes by putting jars first on the classpath, which many of > these companies are currently doing. > > *Q6.* What are the risks? > Technologically I don’t see many risks. We have done a proof of concept > implementation that shows it can be done, it is just a matter of putting > those changes in place. > > *Q7.* How long will it take? > I suspect that we can put together a patch with tests in a month. Adding > documentation and iterating on the APIs I would suspect would put it at a > month and a half to two months. So one quarter would give us enough time to > probably get through everything. > > *Q8.* What are the mid-term and final “exams” to check for success? > The first check for success would be to successfully transition the existing > columnar code over to using this behind the scenes. That would be separating > out the code that goes from rows to columns when sending the data to python > for processing by Pandas UDFs, and also the code that goes from columns to > rows for the file formats, caching, and the output of Pandas UDFs. > > The final success is really about adoption and seeing if the follow on work > that we, and hopefully others, have shows that it cleanly enables something > that was not possible before. > > *Appendix A:* > For the most part the APIs will not need to change in any backwards > incompatible way. Also note that these are not necessarily final changes to > the APIs, but mostly reflect the general direction that we want to go in, so > there is no need to include nits in the discussion on the APIs. Those can be > covered by code reviews. > > {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to > another to allow the catalyst Expression class to have access to them. > > Expression will get added to it. > > {code:java} > /** > * Returns true if this expression supports columnar processing through > [[columnarEval]]. > */ > def supportsColumnar: Boolean = false > /** > * Returns the result of evaluating this expression on the entire > ColumnarBatch. The result of > * calling this may be a single ColumnVector, or a scalar value. Scalar > values can happen if they > * are a part of the expression or in some cases may be an optimization, > like using the batch's > * null count to know is isNull is false for the entire batch without doing > any calculations. > */ > def columnarEval(batch: ColumnarBatch): Any = { > throw new IllegalStateException(s"Internal Error ${this.getClass} has > column support mismatch") > } > {code} > > SparkPlan will be updated to include > {code:java} > /** > * Return true if this stage of the plan supports columnar execution. > */ > def supportsColumnar: Boolean = false > /** > * The exact types of the columns that are output in columnar processing > mode (used for faster codegen of transitions from columns to rows). > */ > def vectorTypes: Option[Seq[String]] = None > /** > * Returns the result of this query as an RDD[ColumnarBatch] by delegating > to `doColumnarExecute` > * after preparations. > * > * Concrete implementations of SparkPlan should override `doColumnarExecute` > if `supportsColumnar` > * returns true. > */ > final def executeColumnar(): RDD[ColumnarBatch] = executeQuery { > if (isCanonicalizedPlan) { > throw new IllegalStateException("A canonicalized plan is not supposed > to be executed.") > } > doExecuteColumnar() > } > /** > * Produces the result of the query as an `RDD[ColumnarBatch]` if > [[supportsColumnar]] returns > * true. > */ > protected def doExecuteColumnar(): RDD[ColumnarBatch] = { > // If the user updates supportsColumnar, but not this blow up. > throw new IllegalStateException(s"Internal Error ${this.getClass} has > column support" + > s" mismatch:\n${this}") > } > {code} > In BufferedRowIterator init will change to reflect that the values in it > could be ColumnarBatches too, which is the case today. > {code:java} > - public abstract void init(int index, Iterator<InternalRow>[] iters); > + public abstract void init(int index, Iterator<Object>[] iters); > {code} > > SparkSessionExtensions will have new APIs for columnar processing. > {code:java} > type ColumnarRuleBuilder = SparkSession => ColumnarRule > def injectColumnar(builder: ColumnarRuleBuilder): Unit > /** > * :: Experimental :: > * Holds a rule that run prior to inserting column to row and row to column > transitions to > * allow for injecting a columnar implementation into various operators, and > a rule that > * runs after to allow overriding the implementation of those transitions, > and potentially > * cleaning up the plan (like inserting batching of columnar data for more > efficient processing). > */ > @DeveloperApi > @Experimental > @Unstable > class ColumnarRule { > def pre(plan: SparkPlan): SparkPlan = plan > def post(plan: SparkPlan): SparkPlan = plan > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org