Hi, Bobby:
Do you have design doc? I'm also interested in this topic and want to help
contribute.

On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans <bo...@apache.org> wrote:

> Thanks to everyone for the feedback.
>
> Overall the feedback has been really positive for exposing columnar as a
> processing option to users.  I'll write up a SPIP on the proposed changes
> to support columnar processing (not necessarily implement it) and then ping
> the list again for more feedback and discussion.
>
> Thanks again,
>
> Bobby
>
> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin <r...@databricks.com> wrote:
>
>> I just realized I didn't make it very clear my stance here ... here's
>> another try:
>>
>> I think it's a no brainer to have a good columnar UDF interface. This
>> would facilitate a lot of high performance applications, e.g. GPU-based
>> accelerations for machine learning algorithms.
>>
>> On rewriting the entire internals of Spark SQL to leverage columnar
>> processing, I don't see enough evidence to suggest that's a good idea yet.
>>
>>
>>
>>
>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans <bo...@apache.org> wrote:
>>
>>> Kazuaki Ishizaki,
>>>
>>> Yes, ColumnarBatchScan does provide a framework for doing code
>>> generation for the processing of columnar data.  I have to admit that I
>>> don't have a deep understanding of the code generation piece, so if I get
>>> something wrong please correct me.  From what I had seen only input formats
>>> currently inherent from ColumnarBatchScan, and from comments in the trait
>>>
>>>   /**
>>>    * Generate [[ColumnVector]] expressions for our parent to consume as
>>> rows.
>>>    * This is called once per [[ColumnarBatch]].
>>>    */
>>>
>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>>
>>> It appears that ColumnarBatchScan is really only intended to pull out
>>> the data from the batch, and not to process that data in a columnar
>>> fashion.  The Loading stage that you mentioned.
>>>
>>> > The SIMDzation or GPUization capability depends on a compiler that
>>> translates native code from the code generated by the whole-stage codegen.
>>> To be able to support vectorized processing Hive stayed with pure java
>>> and let the JVM detect and do the SIMDzation of the code.  To make that
>>> happen they created loops to go through each element in a column and remove
>>> all conditionals from the body of the loops.  To the best of my knowledge
>>> that would still require a separate code path like I am proposing to make
>>> the different processing phases generate code that the JVM can compile down
>>> to SIMD instructions.  The generated code is full of null checks for each
>>> element which would prevent the operations we want.  Also, the intermediate
>>> results are often stored in UnsafeRow instances.  This is really fast for
>>> row-based processing, but the complexity of how they work I believe would
>>> prevent the JVM from being able to vectorize the processing.  If you have a
>>> better way to take java code and vectorize it we should put it into OpenJDK
>>> instead of spark so everyone can benefit from it.
>>>
>>> Trying to compile directly from generated java code to something a GPU
>>> can process is something we are tackling but we decided to go a different
>>> route from what you proposed.  From talking with several compiler experts
>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>>> attempted in the past to extend the JVM to run at least partially on GPUs,
>>> but it was really difficult to get right, especially with how java does
>>> memory management and memory layout.
>>>
>>> To avoid that complexity we decided to split the JITing up into two
>>> separate pieces.  I didn't mention any of this before because this
>>> discussion was intended to just be around the memory layout support, and
>>> not GPU processing.  The first part would be to take the Catalyst AST and
>>> produce CUDA code directly from it.  If properly done we should be able to
>>> do the selection and projection phases within a single kernel.  The biggest
>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>>> GPU.  So to deal with that we have a prototype written by the compiler team
>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>>> catalyst expressions.  If the UDF is too complicated or covers operations
>>> not yet supported it will fall back to the original UDF processing.  I
>>> don't know how close the team is to submit a SPIP or a patch for it, but I
>>> do know that they have some very basic operations working.  The big issue
>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>> code of scala UDFs.
>>>
>>> We split it this way because we thought it would be simplest to
>>> implement, and because it would provide a benefit to more than just GPU
>>> accelerated queries.
>>>
>>> Thanks,
>>>
>>> Bobby
>>>
>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <ishiz...@jp.ibm.com>
>>> wrote:
>>>
>>> Looks interesting discussion.
>>> Let me describe the current structure and remaining issues. This is
>>> orthogonal to cost-benefit trade-off discussion.
>>>
>>> The code generation basically consists of three parts.
>>> 1. Loading
>>> 2. Selection (map, filter, ...)
>>> 3. Projection
>>>
>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>>> abstracted by using ColumnVector (
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>>> generate code to directly get valus from the columnar storage if there is
>>> no row-based operation.
>>> Note: The current master does not support Arrow as a data source.
>>> However, I think it is not technically hard to support Arrow.
>>>
>>> 2. The current whole-stage codegen generates code for element-wise
>>> selection (excluding sort and join). The SIMDzation or GPUization
>>> capability depends on a compiler that translates native code from the code
>>> generated by the whole-stage codegen.
>>>
>>> 3. The current Projection assume to store row-oriented data, I think
>>> that is a part that Wenchen pointed out
>>>
>>> My slides
>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>>> simplify the above issue and possible implementation.
>>>
>>>
>>>
>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>>> Python at SAIS 2019
>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>>> I think that it uses Python UDF support with Arrow in Spark.
>>>
>>> P.S. I will give a presentation about in-memory data storages for SPark
>>> at SAIS 2019
>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>> :)
>>>
>>> Kazuaki Ishizaki
>>>
>>>
>>>
>>> From:        Wenchen Fan <cloud0...@gmail.com>
>>> To:        Bobby Evans <bo...@apache.org>
>>> Cc:        Spark dev list <dev@spark.apache.org>
>>> Date:        2019/03/26 13:53
>>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>>> ------------------------------
>>>
>>>
>>>
>>> Do you have some initial perf numbers? It seems fine to me to remain
>>> row-based inside Spark with whole-stage-codegen, and convert rows to
>>> columnar batches when communicating with external systems.
>>>
>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
>>> <bo...@apache.org>> wrote:
>>> This thread is to discuss adding in support for data frame processing
>>> using an in-memory columnar format compatible with Apache Arrow.  My main
>>> goal in this is to lay the groundwork so we can add in support for GPU
>>> accelerated processing of data frames, but this feature has a number of
>>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>>> option to exchange data with python for pandas UDF processing. There has
>>> also been discussion around extending this to allow for exchanging data
>>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>>> processing on Arrow compatible data it could eliminate the
>>> serialization/deserialization overhead when going between these systems.
>>> It also would allow for doing optimizations on a CPU with SIMD instructions
>>> similar to what Hive currently supports. Accelerated processing using a GPU
>>> is something that we will start a separate discussion thread on, but I
>>> wanted to set the context a bit.
>>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>>> months to try and understand how to make this work.  What we are proposing
>>> is based off of lessons learned when building this prototype, but we really
>>> wanted to get feedback early on from the community. We will file a SPIP
>>> once we can get agreement that this is a good direction to go in.
>>>
>>> The current support for columnar processing lets a Parquet or Orc file
>>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
>>> erasure. The code generation is aware that the RDD actually holds
>>> ColumnarBatchs and generates code to loop through the data in each batch as
>>> InternalRows.
>>>
>>>
>>> Instead, we propose a new set of APIs to work on an
>>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>>> Each part of the physical SparkPlan would expose columnar support through a
>>> combination of traits and method calls. The rule would then decide when
>>> columnar processing would start and when it would end. Switching between
>>> columnar and row based processing is not free, so the rule would make a
>>> decision based off of an estimate of the cost to do the transformation and
>>> the estimated speedup in processing time.
>>>
>>>
>>> This should allow us to disable columnar support by simply disabling the
>>> rule that modifies the physical SparkPlan.  It should be minimal risk to
>>> the existing row-based code path, as that code should not be touched, and
>>> in many cases could be reused to implement the columnar version.  This also
>>> allows for small easily manageable patches. No huge patches that no one
>>> wants to review.
>>>
>>>
>>> As far as the memory layout is concerned OnHeapColumnVector and
>>> OffHeapColumnVector are already really close to being Apache Arrow
>>> compatible so shifting them over would be a relatively simple change.
>>> Alternatively we could add in a new implementation that is Arrow compatible
>>> if there are reasons to keep the old ones.
>>>
>>>
>>> Again this is just to get the discussion started, any feedback is
>>> welcome, and we will file a SPIP on it once we feel like the major changes
>>> we are proposing are acceptable.
>>>
>>> Thanks,
>>>
>>> Bobby Evans
>>>
>>>
>>

-- 
Renjie Liu
Software Engineer, MVAD

Reply via email to