I just filed SPARK-27396 as the SPIP for this proposal.  Please use that
JIRA for further discussions.

Thanks for all of the feedback,

Bobby

On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans <bo...@apache.org> wrote:

> I am still working on the SPIP and should get it up in the next few days.
> I have the basic text more or less ready, but I want to get a high-level
> API concept ready too just to have something more concrete.  I have not
> really done much with contributing new features to spark so I am not sure
> where a design document really fits in here because from
> http://spark.apache.org/improvement-proposals.html and
> http://spark.apache.org/contributing.html it does not mention a design
> anywhere.  I am happy to put one up, but I was hoping the API concept would
> cover most of that.
>
> Thanks,
>
> Bobby
>
> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu <liurenjie2...@gmail.com> wrote:
>
>> Hi, Bobby:
>> Do you have design doc? I'm also interested in this topic and want to
>> help contribute.
>>
>> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans <bo...@apache.org> wrote:
>>
>>> Thanks to everyone for the feedback.
>>>
>>> Overall the feedback has been really positive for exposing columnar as a
>>> processing option to users.  I'll write up a SPIP on the proposed changes
>>> to support columnar processing (not necessarily implement it) and then ping
>>> the list again for more feedback and discussion.
>>>
>>> Thanks again,
>>>
>>> Bobby
>>>
>>> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> I just realized I didn't make it very clear my stance here ... here's
>>>> another try:
>>>>
>>>> I think it's a no brainer to have a good columnar UDF interface. This
>>>> would facilitate a lot of high performance applications, e.g. GPU-based
>>>> accelerations for machine learning algorithms.
>>>>
>>>> On rewriting the entire internals of Spark SQL to leverage columnar
>>>> processing, I don't see enough evidence to suggest that's a good idea yet.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans <bo...@apache.org> wrote:
>>>>
>>>>> Kazuaki Ishizaki,
>>>>>
>>>>> Yes, ColumnarBatchScan does provide a framework for doing code
>>>>> generation for the processing of columnar data.  I have to admit that I
>>>>> don't have a deep understanding of the code generation piece, so if I get
>>>>> something wrong please correct me.  From what I had seen only input 
>>>>> formats
>>>>> currently inherent from ColumnarBatchScan, and from comments in the trait
>>>>>
>>>>>   /**
>>>>>    * Generate [[ColumnVector]] expressions for our parent to consume
>>>>> as rows.
>>>>>    * This is called once per [[ColumnarBatch]].
>>>>>    */
>>>>>
>>>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>>>>
>>>>> It appears that ColumnarBatchScan is really only intended to pull out
>>>>> the data from the batch, and not to process that data in a columnar
>>>>> fashion.  The Loading stage that you mentioned.
>>>>>
>>>>> > The SIMDzation or GPUization capability depends on a compiler that
>>>>> translates native code from the code generated by the whole-stage codegen.
>>>>> To be able to support vectorized processing Hive stayed with pure java
>>>>> and let the JVM detect and do the SIMDzation of the code.  To make that
>>>>> happen they created loops to go through each element in a column and 
>>>>> remove
>>>>> all conditionals from the body of the loops.  To the best of my knowledge
>>>>> that would still require a separate code path like I am proposing to make
>>>>> the different processing phases generate code that the JVM can compile 
>>>>> down
>>>>> to SIMD instructions.  The generated code is full of null checks for each
>>>>> element which would prevent the operations we want.  Also, the 
>>>>> intermediate
>>>>> results are often stored in UnsafeRow instances.  This is really fast for
>>>>> row-based processing, but the complexity of how they work I believe would
>>>>> prevent the JVM from being able to vectorize the processing.  If you have 
>>>>> a
>>>>> better way to take java code and vectorize it we should put it into 
>>>>> OpenJDK
>>>>> instead of spark so everyone can benefit from it.
>>>>>
>>>>> Trying to compile directly from generated java code to something a GPU
>>>>> can process is something we are tackling but we decided to go a different
>>>>> route from what you proposed.  From talking with several compiler experts
>>>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>>>>> attempted in the past to extend the JVM to run at least partially on GPUs,
>>>>> but it was really difficult to get right, especially with how java does
>>>>> memory management and memory layout.
>>>>>
>>>>> To avoid that complexity we decided to split the JITing up into two
>>>>> separate pieces.  I didn't mention any of this before because this
>>>>> discussion was intended to just be around the memory layout support, and
>>>>> not GPU processing.  The first part would be to take the Catalyst AST and
>>>>> produce CUDA code directly from it.  If properly done we should be able to
>>>>> do the selection and projection phases within a single kernel.  The 
>>>>> biggest
>>>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>>>>> GPU.  So to deal with that we have a prototype written by the compiler 
>>>>> team
>>>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>>>>> catalyst expressions.  If the UDF is too complicated or covers operations
>>>>> not yet supported it will fall back to the original UDF processing.  I
>>>>> don't know how close the team is to submit a SPIP or a patch for it, but I
>>>>> do know that they have some very basic operations working.  The big issue
>>>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>>>> code of scala UDFs.
>>>>>
>>>>> We split it this way because we thought it would be simplest to
>>>>> implement, and because it would provide a benefit to more than just GPU
>>>>> accelerated queries.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Bobby
>>>>>
>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <ishiz...@jp.ibm.com>
>>>>> wrote:
>>>>>
>>>>> Looks interesting discussion.
>>>>> Let me describe the current structure and remaining issues. This is
>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>
>>>>> The code generation basically consists of three parts.
>>>>> 1. Loading
>>>>> 2. Selection (map, filter, ...)
>>>>> 3. Projection
>>>>>
>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is
>>>>> well abstracted by using ColumnVector (
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>>>> class. By combining with ColumnarBatchScan, the whole-stage code 
>>>>> generation
>>>>> generate code to directly get valus from the columnar storage if there is
>>>>> no row-based operation.
>>>>> Note: The current master does not support Arrow as a data source.
>>>>> However, I think it is not technically hard to support Arrow.
>>>>>
>>>>> 2. The current whole-stage codegen generates code for element-wise
>>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>>> capability depends on a compiler that translates native code from the code
>>>>> generated by the whole-stage codegen.
>>>>>
>>>>> 3. The current Projection assume to store row-oriented data, I think
>>>>> that is a part that Wenchen pointed out
>>>>>
>>>>> My slides
>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>>>>> simplify the above issue and possible implementation.
>>>>>
>>>>>
>>>>>
>>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>>>>> Python at SAIS 2019
>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>>>>> I think that it uses Python UDF support with Arrow in Spark.
>>>>>
>>>>> P.S. I will give a presentation about in-memory data storages for
>>>>> SPark at SAIS 2019
>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>>> :)
>>>>>
>>>>> Kazuaki Ishizaki
>>>>>
>>>>>
>>>>>
>>>>> From:        Wenchen Fan <cloud0...@gmail.com>
>>>>> To:        Bobby Evans <bo...@apache.org>
>>>>> Cc:        Spark dev list <dev@spark.apache.org>
>>>>> Date:        2019/03/26 13:53
>>>>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>>>>> ------------------------------
>>>>>
>>>>>
>>>>>
>>>>> Do you have some initial perf numbers? It seems fine to me to remain
>>>>> row-based inside Spark with whole-stage-codegen, and convert rows to
>>>>> columnar batches when communicating with external systems.
>>>>>
>>>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
>>>>> <bo...@apache.org>> wrote:
>>>>> This thread is to discuss adding in support for data frame processing
>>>>> using an in-memory columnar format compatible with Apache Arrow.  My main
>>>>> goal in this is to lay the groundwork so we can add in support for GPU
>>>>> accelerated processing of data frames, but this feature has a number of
>>>>> other benefits.  Spark currently supports Apache Arrow formatted data as 
>>>>> an
>>>>> option to exchange data with python for pandas UDF processing. There has
>>>>> also been discussion around extending this to allow for exchanging data
>>>>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>>>>> processing on Arrow compatible data it could eliminate the
>>>>> serialization/deserialization overhead when going between these systems.
>>>>> It also would allow for doing optimizations on a CPU with SIMD 
>>>>> instructions
>>>>> similar to what Hive currently supports. Accelerated processing using a 
>>>>> GPU
>>>>> is something that we will start a separate discussion thread on, but I
>>>>> wanted to set the context a bit.
>>>>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>>>>> months to try and understand how to make this work.  What we are proposing
>>>>> is based off of lessons learned when building this prototype, but we 
>>>>> really
>>>>> wanted to get feedback early on from the community. We will file a SPIP
>>>>> once we can get agreement that this is a good direction to go in.
>>>>>
>>>>> The current support for columnar processing lets a Parquet or Orc file
>>>>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s 
>>>>> type
>>>>> erasure. The code generation is aware that the RDD actually holds
>>>>> ColumnarBatchs and generates code to loop through the data in each batch 
>>>>> as
>>>>> InternalRows.
>>>>>
>>>>>
>>>>> Instead, we propose a new set of APIs to work on an
>>>>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>>>>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>>>>> Each part of the physical SparkPlan would expose columnar support through 
>>>>> a
>>>>> combination of traits and method calls. The rule would then decide when
>>>>> columnar processing would start and when it would end. Switching between
>>>>> columnar and row based processing is not free, so the rule would make a
>>>>> decision based off of an estimate of the cost to do the transformation and
>>>>> the estimated speedup in processing time.
>>>>>
>>>>>
>>>>> This should allow us to disable columnar support by simply disabling
>>>>> the rule that modifies the physical SparkPlan.  It should be minimal risk
>>>>> to the existing row-based code path, as that code should not be touched,
>>>>> and in many cases could be reused to implement the columnar version.  This
>>>>> also allows for small easily manageable patches. No huge patches that no
>>>>> one wants to review.
>>>>>
>>>>>
>>>>> As far as the memory layout is concerned OnHeapColumnVector and
>>>>> OffHeapColumnVector are already really close to being Apache Arrow
>>>>> compatible so shifting them over would be a relatively simple change.
>>>>> Alternatively we could add in a new implementation that is Arrow 
>>>>> compatible
>>>>> if there are reasons to keep the old ones.
>>>>>
>>>>>
>>>>> Again this is just to get the discussion started, any feedback is
>>>>> welcome, and we will file a SPIP on it once we feel like the major changes
>>>>> we are proposing are acceptable.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Bobby Evans
>>>>>
>>>>>
>>>>
>>
>> --
>> Renjie Liu
>> Software Engineer, MVAD
>>
>

Reply via email to