I just realized I didn't make it very clear my stance here ... here's another 
try:

I think it's a no brainer to have a good columnar UDF interface. This would 
facilitate a lot of high performance applications, e.g. GPU-based accelerations 
for machine learning algorithms.

On rewriting the entire internals of Spark SQL to leverage columnar processing, 
I don't see enough evidence to suggest that's a good idea yet.

On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bo...@apache.org > wrote:

> 
> Kazuaki Ishizaki,
> 
> 
> Yes, ColumnarBatchScan does provide a framework for doing code generation
> for the processing of columnar data.  I have to admit that I don't have a
> deep understanding of the code generation piece, so if I get something
> wrong please correct me.  From what I had seen only input formats
> currently inherent from ColumnarBatchScan, and from comments in the trait
> 
> 
>   /**
>    * Generate [[ColumnVector]] expressions for our parent to consume as
> rows.
>    * This is called once per [[ColumnarBatch]].
>    */
> https:/ / github. com/ apache/ spark/ blob/ 
> 956b52b1670985a67e49b938ac1499ae65c79f6e/
> sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/ 
> ColumnarBatchScan.
> scala#L42-L43 (
> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
> )
> 
> 
> 
> It appears that ColumnarBatchScan is really only intended to pull out the
> data from the batch, and not to process that data in a columnar fashion. 
> The Loading stage that you mentioned.
> 
> 
> > The SIMDzation or GPUization capability depends on a compiler that
> translates native code from the code generated by the whole-stage codegen.
> 
> To be able to support vectorized processing Hive stayed with pure java and
> let the JVM detect and do the SIMDzation of the code.  To make that happen
> they created loops to go through each element in a column and remove all
> conditionals from the body of the loops.  To the best of my knowledge that
> would still require a separate code path like I am proposing to make the
> different processing phases generate code that the JVM can compile down to
> SIMD instructions.  The generated code is full of null checks for each
> element which would prevent the operations we want.  Also, the
> intermediate results are often stored in UnsafeRow instances.  This is
> really fast for row-based processing, but the complexity of how they work
> I believe would prevent the JVM from being able to vectorize the
> processing.  If you have a better way to take java code and vectorize it
> we should put it into OpenJDK instead of spark so everyone can benefit
> from it.
> 
> 
> Trying to compile directly from generated java code to something a GPU can
> process is something we are tackling but we decided to go a different
> route from what you proposed.  From talking with several compiler experts
> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
> attempted in the past to extend the JVM to run at least partially on GPUs,
> but it was really difficult to get right, especially with how java does
> memory management and memory layout.
> 
> 
> To avoid that complexity we decided to split the JITing up into two
> separate pieces.  I didn't mention any of this before because this
> discussion was intended to just be around the memory layout support, and
> not GPU processing.  The first part would be to take the Catalyst AST and
> produce CUDA code directly from it.  If properly done we should be able to
> do the selection and projection phases within a single kernel.  The
> biggest issue comes with UDFs as they cannot easily be vectorized for the
> CPU or GPU.  So to deal with that we have a prototype written by the
> compiler team that is trying to tackle SPARK-14083 which can translate
> basic UDFs into catalyst expressions.  If the UDF is too complicated or
> covers operations not yet supported it will fall back to the original UDF
> processing.  I don't know how close the team is to submit a SPIP or a
> patch for it, but I do know that they have some very basic operations
> working.  The big issue is that it requires java 11+ so it can use
> standard APIs to get the byte code of scala UDFs.  
> 
> 
> We split it this way because we thought it would be simplest to implement,
> and because it would provide a benefit to more than just GPU accelerated
> queries.
> 
> 
> Thanks,
> 
> 
> Bobby
> 
> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@ jp. ibm. com
> ( ishiz...@jp.ibm.com ) > wrote:
> 
> 
>> Looks interesting discussion.
>> Let me describe the current structure and remaining issues. This is
>> orthogonal to cost-benefit trade-off discussion.
>> 
>> The code generation basically consists of three parts.
>> 1. Loading
>> 2. Selection (map, filter, ...)
>> 3. Projection
>> 
>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>> abstracted by using ColumnVector ( 
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>> (
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>> ) ) class. By combining with ColumnarBatchScan, the whole-stage code
>> generation generate code to directly get valus from the columnar storage
>> if there is no row-based operation.
>> Note: The current master does not support Arrow as a data source. However,
>> I think it is not technically hard to support Arrow.
>> 
>> 2. The current whole-stage codegen generates code for element-wise
>> selection (excluding sort and join). The SIMDzation or GPUization
>> capability depends on a compiler that translates native code from the code
>> generated by the whole-stage codegen.
>> 
>> 3. The current Projection assume to store row-oriented data, I think that
>> is a part that Wenchen pointed out
>> 
>> My slides 
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>> (
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use
>> ) may simplify the above issue and possible implementation.
>> 
>> 
>> 
>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python
>> at SAIS 2019 
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>> (
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>> ). I think that it uses Python UDF support with Arrow in Spark.
>> 
>> P.S. I will give a presentation about in-memory data storages for SPark at
>> SAIS 2019 
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> (
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> ) :)
>> 
>> Kazuaki Ishizaki
>> 
>> 
>> 
>> From:        Wenchen Fan < cloud0fan@ gmail. com ( cloud0...@gmail.com ) >
>> 
>> To:        Bobby Evans < bobby@ apache. org ( bo...@apache.org ) >
>> Cc:        Spark dev list < dev@ spark. apache. org ( dev@spark.apache.org
>> ) >
>> Date:        2019/03/26 13:53
>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>> 
>> 
>> 
>> Do you have some initial perf numbers? It seems fine to me to remain
>> row-based inside Spark with whole-stage-codegen, and convert rows to
>> columnar batches when communicating with external systems.
>> 
>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bo...@apache.org (
>> bo...@apache.org ) > wrote:
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as
>> an option to exchange data with python for pandas UDF processing. There
>> has also been discussion around extending this to allow for exchanging
>> data with other tools like pytorch, tensorflow, xgboost,... If Spark
>> supports processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems. 
>> It also would allow for doing optimizations on a CPU with SIMD
>> instructions similar to what Hive currently supports. Accelerated
>> processing using a GPU is something that we will start a separate
>> discussion thread on, but I wanted to set the context a bit.
>> Jason Lowe, Tom Graves, and I created a prototype over the past few months
>> to try and understand how to make this work.  What we are proposing is
>> based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>> 
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s
>> type erasure. The code generation is aware that the RDD actually holds
>> ColumnarBatchs and generates code to loop through the data in each batch
>> as InternalRows.
>> 
>> 
>> Instead, we propose a new set of APIs to work on an
>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>> Each part of the physical SparkPlan would expose columnar support through
>> a combination of traits and method calls. The rule would then decide when
>> columnar processing would start and when it would end. Switching between
>> columnar and row based processing is not free, so the rule would make a
>> decision based off of an estimate of the cost to do the transformation and
>> the estimated speedup in processing time.
>> 
>> 
>> 
>> 
>> This should allow us to disable columnar support by simply disabling the
>> rule that modifies the physical SparkPlan.  It should be minimal risk to
>> the existing row-based code path, as that code should not be touched, and
>> in many cases could be reused to implement the columnar version.  This
>> also allows for small easily manageable patches. No huge patches that no
>> one wants to review.
>> 
>> 
>> 
>> 
>> As far as the memory layout is concerned OnHeapColumnVector and
>> OffHeapColumnVector are already really close to being Apache Arrow
>> compatible so shifting them over would be a relatively simple change.
>> Alternatively we could add in a new implementation that is Arrow
>> compatible if there are reasons to keep the old ones.
>> 
>> 
>> 
>> 
>> Again this is just to get the discussion started, any feedback is welcome,
>> and we will file a SPIP on it once we feel like the major changes we are
>> proposing are acceptable.
>> 
>> Thanks,
>> 
>> Bobby Evans
>> 
>> 
> 
> 
>

Reply via email to