I am still working on the SPIP and should get it up in the next few days. I have the basic text more or less ready, but I want to get a high-level API concept ready too just to have something more concrete. I have not really done much with contributing new features to spark so I am not sure where a design document really fits in here because from http://spark.apache.org/improvement-proposals.html and http://spark.apache.org/contributing.html it does not mention a design anywhere. I am happy to put one up, but I was hoping the API concept would cover most of that.
Thanks, Bobby On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu <liurenjie2...@gmail.com> wrote: > Hi, Bobby: > Do you have design doc? I'm also interested in this topic and want to help > contribute. > > On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans <bo...@apache.org> wrote: > >> Thanks to everyone for the feedback. >> >> Overall the feedback has been really positive for exposing columnar as a >> processing option to users. I'll write up a SPIP on the proposed changes >> to support columnar processing (not necessarily implement it) and then ping >> the list again for more feedback and discussion. >> >> Thanks again, >> >> Bobby >> >> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin <r...@databricks.com> wrote: >> >>> I just realized I didn't make it very clear my stance here ... here's >>> another try: >>> >>> I think it's a no brainer to have a good columnar UDF interface. This >>> would facilitate a lot of high performance applications, e.g. GPU-based >>> accelerations for machine learning algorithms. >>> >>> On rewriting the entire internals of Spark SQL to leverage columnar >>> processing, I don't see enough evidence to suggest that's a good idea yet. >>> >>> >>> >>> >>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans <bo...@apache.org> wrote: >>> >>>> Kazuaki Ishizaki, >>>> >>>> Yes, ColumnarBatchScan does provide a framework for doing code >>>> generation for the processing of columnar data. I have to admit that I >>>> don't have a deep understanding of the code generation piece, so if I get >>>> something wrong please correct me. From what I had seen only input formats >>>> currently inherent from ColumnarBatchScan, and from comments in the trait >>>> >>>> /** >>>> * Generate [[ColumnVector]] expressions for our parent to consume as >>>> rows. >>>> * This is called once per [[ColumnarBatch]]. >>>> */ >>>> >>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43 >>>> >>>> It appears that ColumnarBatchScan is really only intended to pull out >>>> the data from the batch, and not to process that data in a columnar >>>> fashion. The Loading stage that you mentioned. >>>> >>>> > The SIMDzation or GPUization capability depends on a compiler that >>>> translates native code from the code generated by the whole-stage codegen. >>>> To be able to support vectorized processing Hive stayed with pure java >>>> and let the JVM detect and do the SIMDzation of the code. To make that >>>> happen they created loops to go through each element in a column and remove >>>> all conditionals from the body of the loops. To the best of my knowledge >>>> that would still require a separate code path like I am proposing to make >>>> the different processing phases generate code that the JVM can compile down >>>> to SIMD instructions. The generated code is full of null checks for each >>>> element which would prevent the operations we want. Also, the intermediate >>>> results are often stored in UnsafeRow instances. This is really fast for >>>> row-based processing, but the complexity of how they work I believe would >>>> prevent the JVM from being able to vectorize the processing. If you have a >>>> better way to take java code and vectorize it we should put it into OpenJDK >>>> instead of spark so everyone can benefit from it. >>>> >>>> Trying to compile directly from generated java code to something a GPU >>>> can process is something we are tackling but we decided to go a different >>>> route from what you proposed. From talking with several compiler experts >>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA >>>> attempted in the past to extend the JVM to run at least partially on GPUs, >>>> but it was really difficult to get right, especially with how java does >>>> memory management and memory layout. >>>> >>>> To avoid that complexity we decided to split the JITing up into two >>>> separate pieces. I didn't mention any of this before because this >>>> discussion was intended to just be around the memory layout support, and >>>> not GPU processing. The first part would be to take the Catalyst AST and >>>> produce CUDA code directly from it. If properly done we should be able to >>>> do the selection and projection phases within a single kernel. The biggest >>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or >>>> GPU. So to deal with that we have a prototype written by the compiler team >>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into >>>> catalyst expressions. If the UDF is too complicated or covers operations >>>> not yet supported it will fall back to the original UDF processing. I >>>> don't know how close the team is to submit a SPIP or a patch for it, but I >>>> do know that they have some very basic operations working. The big issue >>>> is that it requires java 11+ so it can use standard APIs to get the byte >>>> code of scala UDFs. >>>> >>>> We split it this way because we thought it would be simplest to >>>> implement, and because it would provide a benefit to more than just GPU >>>> accelerated queries. >>>> >>>> Thanks, >>>> >>>> Bobby >>>> >>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <ishiz...@jp.ibm.com> >>>> wrote: >>>> >>>> Looks interesting discussion. >>>> Let me describe the current structure and remaining issues. This is >>>> orthogonal to cost-benefit trade-off discussion. >>>> >>>> The code generation basically consists of three parts. >>>> 1. Loading >>>> 2. Selection (map, filter, ...) >>>> 3. Projection >>>> >>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is >>>> well abstracted by using ColumnVector ( >>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java) >>>> class. By combining with ColumnarBatchScan, the whole-stage code generation >>>> generate code to directly get valus from the columnar storage if there is >>>> no row-based operation. >>>> Note: The current master does not support Arrow as a data source. >>>> However, I think it is not technically hard to support Arrow. >>>> >>>> 2. The current whole-stage codegen generates code for element-wise >>>> selection (excluding sort and join). The SIMDzation or GPUization >>>> capability depends on a compiler that translates native code from the code >>>> generated by the whole-stage codegen. >>>> >>>> 3. The current Projection assume to store row-oriented data, I think >>>> that is a part that Wenchen pointed out >>>> >>>> My slides >>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41 >>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may >>>> simplify the above issue and possible implementation. >>>> >>>> >>>> >>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru >>>> Python at SAIS 2019 >>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110. >>>> I think that it uses Python UDF support with Arrow in Spark. >>>> >>>> P.S. I will give a presentation about in-memory data storages for SPark >>>> at SAIS 2019 >>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >>>> :) >>>> >>>> Kazuaki Ishizaki >>>> >>>> >>>> >>>> From: Wenchen Fan <cloud0...@gmail.com> >>>> To: Bobby Evans <bo...@apache.org> >>>> Cc: Spark dev list <dev@spark.apache.org> >>>> Date: 2019/03/26 13:53 >>>> Subject: Re: [DISCUSS] Spark Columnar Processing >>>> ------------------------------ >>>> >>>> >>>> >>>> Do you have some initial perf numbers? It seems fine to me to remain >>>> row-based inside Spark with whole-stage-codegen, and convert rows to >>>> columnar batches when communicating with external systems. >>>> >>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org* >>>> <bo...@apache.org>> wrote: >>>> This thread is to discuss adding in support for data frame processing >>>> using an in-memory columnar format compatible with Apache Arrow. My main >>>> goal in this is to lay the groundwork so we can add in support for GPU >>>> accelerated processing of data frames, but this feature has a number of >>>> other benefits. Spark currently supports Apache Arrow formatted data as an >>>> option to exchange data with python for pandas UDF processing. There has >>>> also been discussion around extending this to allow for exchanging data >>>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports >>>> processing on Arrow compatible data it could eliminate the >>>> serialization/deserialization overhead when going between these systems. >>>> It also would allow for doing optimizations on a CPU with SIMD instructions >>>> similar to what Hive currently supports. Accelerated processing using a GPU >>>> is something that we will start a separate discussion thread on, but I >>>> wanted to set the context a bit. >>>> Jason Lowe, Tom Graves, and I created a prototype over the past few >>>> months to try and understand how to make this work. What we are proposing >>>> is based off of lessons learned when building this prototype, but we really >>>> wanted to get feedback early on from the community. We will file a SPIP >>>> once we can get agreement that this is a good direction to go in. >>>> >>>> The current support for columnar processing lets a Parquet or Orc file >>>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type >>>> erasure. The code generation is aware that the RDD actually holds >>>> ColumnarBatchs and generates code to loop through the data in each batch as >>>> InternalRows. >>>> >>>> >>>> Instead, we propose a new set of APIs to work on an >>>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we >>>> propose adding in a Rule similar to how WholeStageCodeGen currently works. >>>> Each part of the physical SparkPlan would expose columnar support through a >>>> combination of traits and method calls. The rule would then decide when >>>> columnar processing would start and when it would end. Switching between >>>> columnar and row based processing is not free, so the rule would make a >>>> decision based off of an estimate of the cost to do the transformation and >>>> the estimated speedup in processing time. >>>> >>>> >>>> This should allow us to disable columnar support by simply disabling >>>> the rule that modifies the physical SparkPlan. It should be minimal risk >>>> to the existing row-based code path, as that code should not be touched, >>>> and in many cases could be reused to implement the columnar version. This >>>> also allows for small easily manageable patches. No huge patches that no >>>> one wants to review. >>>> >>>> >>>> As far as the memory layout is concerned OnHeapColumnVector and >>>> OffHeapColumnVector are already really close to being Apache Arrow >>>> compatible so shifting them over would be a relatively simple change. >>>> Alternatively we could add in a new implementation that is Arrow compatible >>>> if there are reasons to keep the old ones. >>>> >>>> >>>> Again this is just to get the discussion started, any feedback is >>>> welcome, and we will file a SPIP on it once we feel like the major changes >>>> we are proposing are acceptable. >>>> >>>> Thanks, >>>> >>>> Bobby Evans >>>> >>>> >>> > > -- > Renjie Liu > Software Engineer, MVAD >