Hi, Bobby: Do you have design doc? I'm also interested in this topic and want to help contribute.
On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans <[email protected]> wrote: > Thanks to everyone for the feedback. > > Overall the feedback has been really positive for exposing columnar as a > processing option to users. I'll write up a SPIP on the proposed changes > to support columnar processing (not necessarily implement it) and then ping > the list again for more feedback and discussion. > > Thanks again, > > Bobby > > On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin <[email protected]> wrote: > >> I just realized I didn't make it very clear my stance here ... here's >> another try: >> >> I think it's a no brainer to have a good columnar UDF interface. This >> would facilitate a lot of high performance applications, e.g. GPU-based >> accelerations for machine learning algorithms. >> >> On rewriting the entire internals of Spark SQL to leverage columnar >> processing, I don't see enough evidence to suggest that's a good idea yet. >> >> >> >> >> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans <[email protected]> wrote: >> >>> Kazuaki Ishizaki, >>> >>> Yes, ColumnarBatchScan does provide a framework for doing code >>> generation for the processing of columnar data. I have to admit that I >>> don't have a deep understanding of the code generation piece, so if I get >>> something wrong please correct me. From what I had seen only input formats >>> currently inherent from ColumnarBatchScan, and from comments in the trait >>> >>> /** >>> * Generate [[ColumnVector]] expressions for our parent to consume as >>> rows. >>> * This is called once per [[ColumnarBatch]]. >>> */ >>> >>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43 >>> >>> It appears that ColumnarBatchScan is really only intended to pull out >>> the data from the batch, and not to process that data in a columnar >>> fashion. The Loading stage that you mentioned. >>> >>> > The SIMDzation or GPUization capability depends on a compiler that >>> translates native code from the code generated by the whole-stage codegen. >>> To be able to support vectorized processing Hive stayed with pure java >>> and let the JVM detect and do the SIMDzation of the code. To make that >>> happen they created loops to go through each element in a column and remove >>> all conditionals from the body of the loops. To the best of my knowledge >>> that would still require a separate code path like I am proposing to make >>> the different processing phases generate code that the JVM can compile down >>> to SIMD instructions. The generated code is full of null checks for each >>> element which would prevent the operations we want. Also, the intermediate >>> results are often stored in UnsafeRow instances. This is really fast for >>> row-based processing, but the complexity of how they work I believe would >>> prevent the JVM from being able to vectorize the processing. If you have a >>> better way to take java code and vectorize it we should put it into OpenJDK >>> instead of spark so everyone can benefit from it. >>> >>> Trying to compile directly from generated java code to something a GPU >>> can process is something we are tackling but we decided to go a different >>> route from what you proposed. From talking with several compiler experts >>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA >>> attempted in the past to extend the JVM to run at least partially on GPUs, >>> but it was really difficult to get right, especially with how java does >>> memory management and memory layout. >>> >>> To avoid that complexity we decided to split the JITing up into two >>> separate pieces. I didn't mention any of this before because this >>> discussion was intended to just be around the memory layout support, and >>> not GPU processing. The first part would be to take the Catalyst AST and >>> produce CUDA code directly from it. If properly done we should be able to >>> do the selection and projection phases within a single kernel. The biggest >>> issue comes with UDFs as they cannot easily be vectorized for the CPU or >>> GPU. So to deal with that we have a prototype written by the compiler team >>> that is trying to tackle SPARK-14083 which can translate basic UDFs into >>> catalyst expressions. If the UDF is too complicated or covers operations >>> not yet supported it will fall back to the original UDF processing. I >>> don't know how close the team is to submit a SPIP or a patch for it, but I >>> do know that they have some very basic operations working. The big issue >>> is that it requires java 11+ so it can use standard APIs to get the byte >>> code of scala UDFs. >>> >>> We split it this way because we thought it would be simplest to >>> implement, and because it would provide a benefit to more than just GPU >>> accelerated queries. >>> >>> Thanks, >>> >>> Bobby >>> >>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <[email protected]> >>> wrote: >>> >>> Looks interesting discussion. >>> Let me describe the current structure and remaining issues. This is >>> orthogonal to cost-benefit trade-off discussion. >>> >>> The code generation basically consists of three parts. >>> 1. Loading >>> 2. Selection (map, filter, ...) >>> 3. Projection >>> >>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well >>> abstracted by using ColumnVector ( >>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java) >>> class. By combining with ColumnarBatchScan, the whole-stage code generation >>> generate code to directly get valus from the columnar storage if there is >>> no row-based operation. >>> Note: The current master does not support Arrow as a data source. >>> However, I think it is not technically hard to support Arrow. >>> >>> 2. The current whole-stage codegen generates code for element-wise >>> selection (excluding sort and join). The SIMDzation or GPUization >>> capability depends on a compiler that translates native code from the code >>> generated by the whole-stage codegen. >>> >>> 3. The current Projection assume to store row-oriented data, I think >>> that is a part that Wenchen pointed out >>> >>> My slides >>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41 >>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may >>> simplify the above issue and possible implementation. >>> >>> >>> >>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru >>> Python at SAIS 2019 >>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110. >>> I think that it uses Python UDF support with Arrow in Spark. >>> >>> P.S. I will give a presentation about in-memory data storages for SPark >>> at SAIS 2019 >>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >>> :) >>> >>> Kazuaki Ishizaki >>> >>> >>> >>> From: Wenchen Fan <[email protected]> >>> To: Bobby Evans <[email protected]> >>> Cc: Spark dev list <[email protected]> >>> Date: 2019/03/26 13:53 >>> Subject: Re: [DISCUSS] Spark Columnar Processing >>> ------------------------------ >>> >>> >>> >>> Do you have some initial perf numbers? It seems fine to me to remain >>> row-based inside Spark with whole-stage-codegen, and convert rows to >>> columnar batches when communicating with external systems. >>> >>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*[email protected]* >>> <[email protected]>> wrote: >>> This thread is to discuss adding in support for data frame processing >>> using an in-memory columnar format compatible with Apache Arrow. My main >>> goal in this is to lay the groundwork so we can add in support for GPU >>> accelerated processing of data frames, but this feature has a number of >>> other benefits. Spark currently supports Apache Arrow formatted data as an >>> option to exchange data with python for pandas UDF processing. There has >>> also been discussion around extending this to allow for exchanging data >>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports >>> processing on Arrow compatible data it could eliminate the >>> serialization/deserialization overhead when going between these systems. >>> It also would allow for doing optimizations on a CPU with SIMD instructions >>> similar to what Hive currently supports. Accelerated processing using a GPU >>> is something that we will start a separate discussion thread on, but I >>> wanted to set the context a bit. >>> Jason Lowe, Tom Graves, and I created a prototype over the past few >>> months to try and understand how to make this work. What we are proposing >>> is based off of lessons learned when building this prototype, but we really >>> wanted to get feedback early on from the community. We will file a SPIP >>> once we can get agreement that this is a good direction to go in. >>> >>> The current support for columnar processing lets a Parquet or Orc file >>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type >>> erasure. The code generation is aware that the RDD actually holds >>> ColumnarBatchs and generates code to loop through the data in each batch as >>> InternalRows. >>> >>> >>> Instead, we propose a new set of APIs to work on an >>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we >>> propose adding in a Rule similar to how WholeStageCodeGen currently works. >>> Each part of the physical SparkPlan would expose columnar support through a >>> combination of traits and method calls. The rule would then decide when >>> columnar processing would start and when it would end. Switching between >>> columnar and row based processing is not free, so the rule would make a >>> decision based off of an estimate of the cost to do the transformation and >>> the estimated speedup in processing time. >>> >>> >>> This should allow us to disable columnar support by simply disabling the >>> rule that modifies the physical SparkPlan. It should be minimal risk to >>> the existing row-based code path, as that code should not be touched, and >>> in many cases could be reused to implement the columnar version. This also >>> allows for small easily manageable patches. No huge patches that no one >>> wants to review. >>> >>> >>> As far as the memory layout is concerned OnHeapColumnVector and >>> OffHeapColumnVector are already really close to being Apache Arrow >>> compatible so shifting them over would be a relatively simple change. >>> Alternatively we could add in a new implementation that is Arrow compatible >>> if there are reasons to keep the old ones. >>> >>> >>> Again this is just to get the discussion started, any feedback is >>> welcome, and we will file a SPIP on it once we feel like the major changes >>> we are proposing are acceptable. >>> >>> Thanks, >>> >>> Bobby Evans >>> >>> >> -- Renjie Liu Software Engineer, MVAD
