I just realized I didn't make it very clear my stance here ... here's another try:
I think it's a no brainer to have a good columnar UDF interface. This would facilitate a lot of high performance applications, e.g. GPU-based accelerations for machine learning algorithms. On rewriting the entire internals of Spark SQL to leverage columnar processing, I don't see enough evidence to suggest that's a good idea yet. On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bo...@apache.org > wrote: > > Kazuaki Ishizaki, > > > Yes, ColumnarBatchScan does provide a framework for doing code generation > for the processing of columnar data. I have to admit that I don't have a > deep understanding of the code generation piece, so if I get something > wrong please correct me. From what I had seen only input formats > currently inherent from ColumnarBatchScan, and from comments in the trait > > > /** > * Generate [[ColumnVector]] expressions for our parent to consume as > rows. > * This is called once per [[ColumnarBatch]]. > */ > https:/ / github. com/ apache/ spark/ blob/ > 956b52b1670985a67e49b938ac1499ae65c79f6e/ > sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/ > ColumnarBatchScan. > scala#L42-L43 ( > https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43 > ) > > > > It appears that ColumnarBatchScan is really only intended to pull out the > data from the batch, and not to process that data in a columnar fashion. > The Loading stage that you mentioned. > > > > The SIMDzation or GPUization capability depends on a compiler that > translates native code from the code generated by the whole-stage codegen. > > To be able to support vectorized processing Hive stayed with pure java and > let the JVM detect and do the SIMDzation of the code. To make that happen > they created loops to go through each element in a column and remove all > conditionals from the body of the loops. To the best of my knowledge that > would still require a separate code path like I am proposing to make the > different processing phases generate code that the JVM can compile down to > SIMD instructions. The generated code is full of null checks for each > element which would prevent the operations we want. Also, the > intermediate results are often stored in UnsafeRow instances. This is > really fast for row-based processing, but the complexity of how they work > I believe would prevent the JVM from being able to vectorize the > processing. If you have a better way to take java code and vectorize it > we should put it into OpenJDK instead of spark so everyone can benefit > from it. > > > Trying to compile directly from generated java code to something a GPU can > process is something we are tackling but we decided to go a different > route from what you proposed. From talking with several compiler experts > here at NVIDIA my understanding is that IBM in partnership with NVIDIA > attempted in the past to extend the JVM to run at least partially on GPUs, > but it was really difficult to get right, especially with how java does > memory management and memory layout. > > > To avoid that complexity we decided to split the JITing up into two > separate pieces. I didn't mention any of this before because this > discussion was intended to just be around the memory layout support, and > not GPU processing. The first part would be to take the Catalyst AST and > produce CUDA code directly from it. If properly done we should be able to > do the selection and projection phases within a single kernel. The > biggest issue comes with UDFs as they cannot easily be vectorized for the > CPU or GPU. So to deal with that we have a prototype written by the > compiler team that is trying to tackle SPARK-14083 which can translate > basic UDFs into catalyst expressions. If the UDF is too complicated or > covers operations not yet supported it will fall back to the original UDF > processing. I don't know how close the team is to submit a SPIP or a > patch for it, but I do know that they have some very basic operations > working. The big issue is that it requires java 11+ so it can use > standard APIs to get the byte code of scala UDFs. > > > We split it this way because we thought it would be simplest to implement, > and because it would provide a benefit to more than just GPU accelerated > queries. > > > Thanks, > > > Bobby > > On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@ jp. ibm. com > ( ishiz...@jp.ibm.com ) > wrote: > > >> Looks interesting discussion. >> Let me describe the current structure and remaining issues. This is >> orthogonal to cost-benefit trade-off discussion. >> >> The code generation basically consists of three parts. >> 1. Loading >> 2. Selection (map, filter, ...) >> 3. Projection >> >> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well >> abstracted by using ColumnVector ( >> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java >> ( >> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java >> ) ) class. By combining with ColumnarBatchScan, the whole-stage code >> generation generate code to directly get valus from the columnar storage >> if there is no row-based operation. >> Note: The current master does not support Arrow as a data source. However, >> I think it is not technically hard to support Arrow. >> >> 2. The current whole-stage codegen generates code for element-wise >> selection (excluding sort and join). The SIMDzation or GPUization >> capability depends on a compiler that translates native code from the code >> generated by the whole-stage codegen. >> >> 3. The current Projection assume to store row-oriented data, I think that >> is a part that Wenchen pointed out >> >> My slides >> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41 >> ( >> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use >> ) may simplify the above issue and possible implementation. >> >> >> >> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python >> at SAIS 2019 >> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110 >> ( >> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110 >> ). I think that it uses Python UDF support with Arrow in Spark. >> >> P.S. I will give a presentation about in-memory data storages for SPark at >> SAIS 2019 >> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >> ( >> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >> ) :) >> >> Kazuaki Ishizaki >> >> >> >> From: Wenchen Fan < cloud0fan@ gmail. com ( cloud0...@gmail.com ) > >> >> To: Bobby Evans < bobby@ apache. org ( bo...@apache.org ) > >> Cc: Spark dev list < dev@ spark. apache. org ( dev@spark.apache.org >> ) > >> Date: 2019/03/26 13:53 >> Subject: Re: [DISCUSS] Spark Columnar Processing >> >> >> >> Do you have some initial perf numbers? It seems fine to me to remain >> row-based inside Spark with whole-stage-codegen, and convert rows to >> columnar batches when communicating with external systems. >> >> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bo...@apache.org ( >> bo...@apache.org ) > wrote: >> This thread is to discuss adding in support for data frame processing >> using an in-memory columnar format compatible with Apache Arrow. My main >> goal in this is to lay the groundwork so we can add in support for GPU >> accelerated processing of data frames, but this feature has a number of >> other benefits. Spark currently supports Apache Arrow formatted data as >> an option to exchange data with python for pandas UDF processing. There >> has also been discussion around extending this to allow for exchanging >> data with other tools like pytorch, tensorflow, xgboost,... If Spark >> supports processing on Arrow compatible data it could eliminate the >> serialization/deserialization overhead when going between these systems. >> It also would allow for doing optimizations on a CPU with SIMD >> instructions similar to what Hive currently supports. Accelerated >> processing using a GPU is something that we will start a separate >> discussion thread on, but I wanted to set the context a bit. >> Jason Lowe, Tom Graves, and I created a prototype over the past few months >> to try and understand how to make this work. What we are proposing is >> based off of lessons learned when building this prototype, but we really >> wanted to get feedback early on from the community. We will file a SPIP >> once we can get agreement that this is a good direction to go in. >> >> The current support for columnar processing lets a Parquet or Orc file >> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s >> type erasure. The code generation is aware that the RDD actually holds >> ColumnarBatchs and generates code to loop through the data in each batch >> as InternalRows. >> >> >> Instead, we propose a new set of APIs to work on an >> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we >> propose adding in a Rule similar to how WholeStageCodeGen currently works. >> Each part of the physical SparkPlan would expose columnar support through >> a combination of traits and method calls. The rule would then decide when >> columnar processing would start and when it would end. Switching between >> columnar and row based processing is not free, so the rule would make a >> decision based off of an estimate of the cost to do the transformation and >> the estimated speedup in processing time. >> >> >> >> >> This should allow us to disable columnar support by simply disabling the >> rule that modifies the physical SparkPlan. It should be minimal risk to >> the existing row-based code path, as that code should not be touched, and >> in many cases could be reused to implement the columnar version. This >> also allows for small easily manageable patches. No huge patches that no >> one wants to review. >> >> >> >> >> As far as the memory layout is concerned OnHeapColumnVector and >> OffHeapColumnVector are already really close to being Apache Arrow >> compatible so shifting them over would be a relatively simple change. >> Alternatively we could add in a new implementation that is Arrow >> compatible if there are reasons to keep the old ones. >> >> >> >> >> Again this is just to get the discussion started, any feedback is welcome, >> and we will file a SPIP on it once we feel like the major changes we are >> proposing are acceptable. >> >> Thanks, >> >> Bobby Evans >> >> > > >