Re: [DISCUSS] Spark Columnar Processing

2019-04-13 Thread Bobby Evans
 to SIMD instructions.  The generated code is full of null checks for each
>> element which would prevent the operations we want.  Also, the intermediate
>> results are often stored in UnsafeRow instances.  This is really fast for
>> row-based processing, but the complexity of how they work I believe would
>> prevent the JVM from being able to vectorize the processing.  If you have a
>> better way to take java code and vectorize it we should put it into OpenJDK
>> instead of spark so everyone can benefit from it.
>>
>> Trying to compile directly from generated java code to something a GPU
>> can process is something we are tackling but we decided to go a different
>> route from what you proposed.  From talking with several compiler experts
>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>> attempted in the past to extend the JVM to run at least partially on GPUs,
>> but it was really difficult to get right, especially with how java does
>> memory management and memory layout.
>>
>> To avoid that complexity we decided to split the JITing up into two
>> separate pieces.  I didn't mention any of this before because this
>> discussion was intended to just be around the memory layout support, and
>> not GPU processing.  The first part would be to take the Catalyst AST and
>> produce CUDA code directly from it.  If properly done we should be able to
>> do the selection and projection phases within a single kernel.  The biggest
>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>> GPU.  So to deal with that we have a prototype written by the compiler team
>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>> catalyst expressions.  If the UDF is too complicated or covers operations
>> not yet supported it will fall back to the original UDF processing.  I
>> don't know how close the team is to submit a SPIP or a patch for it, but I
>> do know that they have some very basic operations working.  The big issue
>> is that it requires java 11+ so it can use standard APIs to get the byte
>> code of scala UDFs.
>>
>> We split it this way because we thought it would be simplest to
>> implement, and because it would provide a benefit to more than just GPU
>> accelerated queries.
>>
>> Thanks,
>>
>> Bobby
>>
>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki 
>> wrote:
>>
>> Looks interesting discussion.
>> Let me describe the current structure and remaining issues. This is
>> orthogonal to cost-benefit trade-off discussion.
>>
>> The code generation basically consists of three parts.
>> 1. Loading
>> 2. Selection (map, filter, ...)
>> 3. Projection
>>
>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>> abstracted by using ColumnVector (
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>> generate code to directly get valus from the columnar storage if there is
>> no row-based operation.
>> Note: The current master does not support Arrow as a data source.
>> However, I think it is not technically hard to support Arrow.
>>
>> 2. The current whole-stage codegen generates code for element-wise
>> selection (excluding sort and join). The SIMDzation or GPUization
>> capability depends on a compiler that translates native code from the code
>> generated by the whole-stage codegen.
>>
>> 3. The current Projection assume to store row-oriented data, I think that
>> is a part that Wenchen pointed out
>>
>> My slides
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>> simplify the above issue and possible implementation.
>>
>>
>>
>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>> Python at SAIS 2019
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>> I think that it uses Python UDF support with Arrow in Spark.
>>
>> P.S. I will give a presentation about in-memory data storages for SPark
>> at SAIS 2019
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> :)
>>
>> Kazuaki Ishizaki
>>
>>
>>
>> From:Wenchen Fan 
>> To:Bobby Evans 
>> Cc:Spark dev list 
>> Date:2019/03/26 13:53
>> Subject:  

Re: [DISCUSS] Spark Columnar Processing

2019-04-11 Thread Reynold Xin
e selection and projection phases within a single kernel.  The
>>>>>>> biggest issue comes with UDFs as they cannot easily be vectorized for 
>>>>>>> the
>>>>>>> CPU or GPU.  So to deal with that we have a prototype written by the
>>>>>>> compiler team that is trying to tackle SPARK-14083 which can translate
>>>>>>> basic UDFs into catalyst expressions.  If the UDF is too complicated or
>>>>>>> covers operations not yet supported it will fall back to the original 
>>>>>>> UDF
>>>>>>> processing.  I don't know how close the team is to submit a SPIP or a
>>>>>>> patch for it, but I do know that they have some very basic operations
>>>>>>> working.  The big issue is that it requires java 11+ so it can use
>>>>>>> standard APIs to get the byte code of scala UDFs.  
>>>>>>> 
>>>>>>> 
>>>>>>> We split it this way because we thought it would be simplest to 
>>>>>>> implement,
>>>>>>> and because it would provide a benefit to more than just GPU accelerated
>>>>>>> queries.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> 
>>>>>>> Bobby
>>>>>>> 
>>>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@ jp. ibm. 
>>>>>>> com
>>>>>>> ( ishiz...@jp.ibm.com ) > wrote:
>>>>>>> 
>>>>>>> 
>>>>>>>> Looks interesting discussion.
>>>>>>>> Let me describe the current structure and remaining issues. This is
>>>>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>>>> 
>>>>>>>> The code generation basically consists of three parts.
>>>>>>>> 1. Loading
>>>>>>>> 2. Selection (map, filter, ...)
>>>>>>>> 3. Projection
>>>>>>>> 
>>>>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is 
>>>>>>>> well
>>>>>>>> abstracted by using ColumnVector ( 
>>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> (
>>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> ) ) class. By combining with ColumnarBatchScan, the whole-stage code
>>>>>>>> generation generate code to directly get valus from the columnar 
>>>>>>>> storage
>>>>>>>> if there is no row-based operation.
>>>>>>>> Note: The current master does not support Arrow as a data source. 
>>>>>>>> However,
>>>>>>>> I think it is not technically hard to support Arrow.
>>>>>>>> 
>>>>>>>> 2. The current whole-stage codegen generates code for element-wise
>>>>>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>>>>>> capability depends on a compiler that translates native code from the 
>>>>>>>> code
>>>>>>>> generated by the whole-stage codegen.
>>>>>>>> 
>>>>>>>> 3. The current Projection assume to store row-oriented data, I think 
>>>>>>>> that
>>>>>>>> is a part that Wenchen pointed out
>>>>>>>> 
>>>>>>>> My slides 
>>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>>>>>> (
>>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use
>>>>>>>> ) may simplify the above issue and possible implementation.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru 
>>>>>>>> Python
>>>>>>>> at SAIS 2019 
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>>>>

Re: [DISCUSS] Spark Columnar Processing

2019-04-11 Thread Bobby Evans
 the best of my
>>>>>> knowledge that would still require a separate code path like I am 
>>>>>> proposing
>>>>>> to make the different processing phases generate code that the JVM can
>>>>>> compile down to SIMD instructions.  The generated code is full of null
>>>>>> checks for each element which would prevent the operations we want.  
>>>>>> Also,
>>>>>> the intermediate results are often stored in UnsafeRow instances.  This 
>>>>>> is
>>>>>> really fast for row-based processing, but the complexity of how they 
>>>>>> work I
>>>>>> believe would prevent the JVM from being able to vectorize the 
>>>>>> processing.
>>>>>> If you have a better way to take java code and vectorize it we should put
>>>>>> it into OpenJDK instead of spark so everyone can benefit from it.
>>>>>>
>>>>>> Trying to compile directly from generated java code to something a
>>>>>> GPU can process is something we are tackling but we decided to go a
>>>>>> different route from what you proposed.  From talking with several 
>>>>>> compiler
>>>>>> experts here at NVIDIA my understanding is that IBM in partnership with
>>>>>> NVIDIA attempted in the past to extend the JVM to run at least partially 
>>>>>> on
>>>>>> GPUs, but it was really difficult to get right, especially with how java
>>>>>> does memory management and memory layout.
>>>>>>
>>>>>> To avoid that complexity we decided to split the JITing up into two
>>>>>> separate pieces.  I didn't mention any of this before because this
>>>>>> discussion was intended to just be around the memory layout support, and
>>>>>> not GPU processing.  The first part would be to take the Catalyst AST and
>>>>>> produce CUDA code directly from it.  If properly done we should be able 
>>>>>> to
>>>>>> do the selection and projection phases within a single kernel.  The 
>>>>>> biggest
>>>>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>>>>>> GPU.  So to deal with that we have a prototype written by the compiler 
>>>>>> team
>>>>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>>>>>> catalyst expressions.  If the UDF is too complicated or covers operations
>>>>>> not yet supported it will fall back to the original UDF processing.  I
>>>>>> don't know how close the team is to submit a SPIP or a patch for it, but 
>>>>>> I
>>>>>> do know that they have some very basic operations working.  The big issue
>>>>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>>>>> code of scala UDFs.
>>>>>>
>>>>>> We split it this way because we thought it would be simplest to
>>>>>> implement, and because it would provide a benefit to more than just GPU
>>>>>> accelerated queries.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Bobby
>>>>>>
>>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <
>>>>>> ishiz...@jp.ibm.com> wrote:
>>>>>>
>>>>>> Looks interesting discussion.
>>>>>> Let me describe the current structure and remaining issues. This is
>>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>>
>>>>>> The code generation basically consists of three parts.
>>>>>> 1. Loading
>>>>>> 2. Selection (map, filter, ...)
>>>>>> 3. Projection
>>>>>>
>>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is
>>>>>> well abstracted by using ColumnVector (
>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>>>>> class. By combining with ColumnarBatchScan, the whole-stage code 
>>>>>> generation
>>>>>> generate code to directly get valus from the columnar storage if there is
>>>>>> no row-based operation.
>>>>>> Note: The current master does not support Arrow as a dat

Re: [DISCUSS] Spark Columnar Processing

2019-04-05 Thread Bobby Evans
u have 
>>>>> a
>>>>> better way to take java code and vectorize it we should put it into 
>>>>> OpenJDK
>>>>> instead of spark so everyone can benefit from it.
>>>>>
>>>>> Trying to compile directly from generated java code to something a GPU
>>>>> can process is something we are tackling but we decided to go a different
>>>>> route from what you proposed.  From talking with several compiler experts
>>>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>>>>> attempted in the past to extend the JVM to run at least partially on GPUs,
>>>>> but it was really difficult to get right, especially with how java does
>>>>> memory management and memory layout.
>>>>>
>>>>> To avoid that complexity we decided to split the JITing up into two
>>>>> separate pieces.  I didn't mention any of this before because this
>>>>> discussion was intended to just be around the memory layout support, and
>>>>> not GPU processing.  The first part would be to take the Catalyst AST and
>>>>> produce CUDA code directly from it.  If properly done we should be able to
>>>>> do the selection and projection phases within a single kernel.  The 
>>>>> biggest
>>>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>>>>> GPU.  So to deal with that we have a prototype written by the compiler 
>>>>> team
>>>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>>>>> catalyst expressions.  If the UDF is too complicated or covers operations
>>>>> not yet supported it will fall back to the original UDF processing.  I
>>>>> don't know how close the team is to submit a SPIP or a patch for it, but I
>>>>> do know that they have some very basic operations working.  The big issue
>>>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>>>> code of scala UDFs.
>>>>>
>>>>> We split it this way because we thought it would be simplest to
>>>>> implement, and because it would provide a benefit to more than just GPU
>>>>> accelerated queries.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Bobby
>>>>>
>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki 
>>>>> wrote:
>>>>>
>>>>> Looks interesting discussion.
>>>>> Let me describe the current structure and remaining issues. This is
>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>
>>>>> The code generation basically consists of three parts.
>>>>> 1. Loading
>>>>> 2. Selection (map, filter, ...)
>>>>> 3. Projection
>>>>>
>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is
>>>>> well abstracted by using ColumnVector (
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>>>> class. By combining with ColumnarBatchScan, the whole-stage code 
>>>>> generation
>>>>> generate code to directly get valus from the columnar storage if there is
>>>>> no row-based operation.
>>>>> Note: The current master does not support Arrow as a data source.
>>>>> However, I think it is not technically hard to support Arrow.
>>>>>
>>>>> 2. The current whole-stage codegen generates code for element-wise
>>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>>> capability depends on a compiler that translates native code from the code
>>>>> generated by the whole-stage codegen.
>>>>>
>>>>> 3. The current Projection assume to store row-oriented data, I think
>>>>> that is a part that Wenchen pointed out
>>>>>
>>>>> My slides
>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>>>>> simplify the above issue and possible implementation.
>>>>>
>>>>>
>>>>>
>>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>>>>> Python at SAIS 2019
>>>>> 

Re: [DISCUSS] Spark Columnar Processing

2019-04-03 Thread Bobby Evans
,
>>>> but it was really difficult to get right, especially with how java does
>>>> memory management and memory layout.
>>>>
>>>> To avoid that complexity we decided to split the JITing up into two
>>>> separate pieces.  I didn't mention any of this before because this
>>>> discussion was intended to just be around the memory layout support, and
>>>> not GPU processing.  The first part would be to take the Catalyst AST and
>>>> produce CUDA code directly from it.  If properly done we should be able to
>>>> do the selection and projection phases within a single kernel.  The biggest
>>>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>>>> GPU.  So to deal with that we have a prototype written by the compiler team
>>>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>>>> catalyst expressions.  If the UDF is too complicated or covers operations
>>>> not yet supported it will fall back to the original UDF processing.  I
>>>> don't know how close the team is to submit a SPIP or a patch for it, but I
>>>> do know that they have some very basic operations working.  The big issue
>>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>>> code of scala UDFs.
>>>>
>>>> We split it this way because we thought it would be simplest to
>>>> implement, and because it would provide a benefit to more than just GPU
>>>> accelerated queries.
>>>>
>>>> Thanks,
>>>>
>>>> Bobby
>>>>
>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki 
>>>> wrote:
>>>>
>>>> Looks interesting discussion.
>>>> Let me describe the current structure and remaining issues. This is
>>>> orthogonal to cost-benefit trade-off discussion.
>>>>
>>>> The code generation basically consists of three parts.
>>>> 1. Loading
>>>> 2. Selection (map, filter, ...)
>>>> 3. Projection
>>>>
>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is
>>>> well abstracted by using ColumnVector (
>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>>>> generate code to directly get valus from the columnar storage if there is
>>>> no row-based operation.
>>>> Note: The current master does not support Arrow as a data source.
>>>> However, I think it is not technically hard to support Arrow.
>>>>
>>>> 2. The current whole-stage codegen generates code for element-wise
>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>> capability depends on a compiler that translates native code from the code
>>>> generated by the whole-stage codegen.
>>>>
>>>> 3. The current Projection assume to store row-oriented data, I think
>>>> that is a part that Wenchen pointed out
>>>>
>>>> My slides
>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>>>> simplify the above issue and possible implementation.
>>>>
>>>>
>>>>
>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>>>> Python at SAIS 2019
>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>>>> I think that it uses Python UDF support with Arrow in Spark.
>>>>
>>>> P.S. I will give a presentation about in-memory data storages for SPark
>>>> at SAIS 2019
>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>> :)
>>>>
>>>> Kazuaki Ishizaki
>>>>
>>>>
>>>>
>>>> From:Wenchen Fan 
>>>> To:Bobby Evans 
>>>> Cc:Spark dev list 
>>>> Date:2019/03/26 13:53
>>>> Subject:Re: [DISCUSS] Spark Columnar Processing
>>>> --
>>>>
>>>>
>>>>
>>>> Do you have some initial perf numbers? It seems fine to me to remain
>>>> row-based inside Spark with whole-stage-codegen, and convert rows to
&g

Re: [DISCUSS] Spark Columnar Processing

2019-04-02 Thread Renjie Liu
pressions.  If the UDF is too complicated or covers operations
>>> not yet supported it will fall back to the original UDF processing.  I
>>> don't know how close the team is to submit a SPIP or a patch for it, but I
>>> do know that they have some very basic operations working.  The big issue
>>> is that it requires java 11+ so it can use standard APIs to get the byte
>>> code of scala UDFs.
>>>
>>> We split it this way because we thought it would be simplest to
>>> implement, and because it would provide a benefit to more than just GPU
>>> accelerated queries.
>>>
>>> Thanks,
>>>
>>> Bobby
>>>
>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki 
>>> wrote:
>>>
>>> Looks interesting discussion.
>>> Let me describe the current structure and remaining issues. This is
>>> orthogonal to cost-benefit trade-off discussion.
>>>
>>> The code generation basically consists of three parts.
>>> 1. Loading
>>> 2. Selection (map, filter, ...)
>>> 3. Projection
>>>
>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>>> abstracted by using ColumnVector (
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>>> generate code to directly get valus from the columnar storage if there is
>>> no row-based operation.
>>> Note: The current master does not support Arrow as a data source.
>>> However, I think it is not technically hard to support Arrow.
>>>
>>> 2. The current whole-stage codegen generates code for element-wise
>>> selection (excluding sort and join). The SIMDzation or GPUization
>>> capability depends on a compiler that translates native code from the code
>>> generated by the whole-stage codegen.
>>>
>>> 3. The current Projection assume to store row-oriented data, I think
>>> that is a part that Wenchen pointed out
>>>
>>> My slides
>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>>> simplify the above issue and possible implementation.
>>>
>>>
>>>
>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>>> Python at SAIS 2019
>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>>> I think that it uses Python UDF support with Arrow in Spark.
>>>
>>> P.S. I will give a presentation about in-memory data storages for SPark
>>> at SAIS 2019
>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>> :)
>>>
>>> Kazuaki Ishizaki
>>>
>>>
>>>
>>> From:Wenchen Fan 
>>> To:Bobby Evans 
>>> Cc:Spark dev list 
>>> Date:2019/03/26 13:53
>>> Subject:Re: [DISCUSS] Spark Columnar Processing
>>> --
>>>
>>>
>>>
>>> Do you have some initial perf numbers? It seems fine to me to remain
>>> row-based inside Spark with whole-stage-codegen, and convert rows to
>>> columnar batches when communicating with external systems.
>>>
>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
>>> > wrote:
>>> This thread is to discuss adding in support for data frame processing
>>> using an in-memory columnar format compatible with Apache Arrow.  My main
>>> goal in this is to lay the groundwork so we can add in support for GPU
>>> accelerated processing of data frames, but this feature has a number of
>>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>>> option to exchange data with python for pandas UDF processing. There has
>>> also been discussion around extending this to allow for exchanging data
>>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>>> processing on Arrow compatible data it could eliminate the
>>> serialization/deserialization overhead when going between these systems.
>>> It also would allow for doing optimizations on a CPU with SIMD instructions
>>> similar to what Hive currently supports. Accelerated processing using a GPU
>>> is something that we will start a separate discussion thread on, but

Re: [DISCUSS] Spark Columnar Processing

2019-04-02 Thread Bobby Evans
uld be simplest to
>> implement, and because it would provide a benefit to more than just GPU
>> accelerated queries.
>>
>> Thanks,
>>
>> Bobby
>>
>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki 
>> wrote:
>>
>> Looks interesting discussion.
>> Let me describe the current structure and remaining issues. This is
>> orthogonal to cost-benefit trade-off discussion.
>>
>> The code generation basically consists of three parts.
>> 1. Loading
>> 2. Selection (map, filter, ...)
>> 3. Projection
>>
>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>> abstracted by using ColumnVector (
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>> generate code to directly get valus from the columnar storage if there is
>> no row-based operation.
>> Note: The current master does not support Arrow as a data source.
>> However, I think it is not technically hard to support Arrow.
>>
>> 2. The current whole-stage codegen generates code for element-wise
>> selection (excluding sort and join). The SIMDzation or GPUization
>> capability depends on a compiler that translates native code from the code
>> generated by the whole-stage codegen.
>>
>> 3. The current Projection assume to store row-oriented data, I think that
>> is a part that Wenchen pointed out
>>
>> My slides
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
>> simplify the above issue and possible implementation.
>>
>>
>>
>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>> Python at SAIS 2019
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
>> I think that it uses Python UDF support with Arrow in Spark.
>>
>> P.S. I will give a presentation about in-memory data storages for SPark
>> at SAIS 2019
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> :)
>>
>> Kazuaki Ishizaki
>>
>>
>>
>> From:Wenchen Fan 
>> To:Bobby Evans 
>> Cc:Spark dev list 
>> Date:2019/03/26 13:53
>> Subject:Re: [DISCUSS] Spark Columnar Processing
>> --
>>
>>
>>
>> Do you have some initial perf numbers? It seems fine to me to remain
>> row-based inside Spark with whole-stage-codegen, and convert rows to
>> columnar batches when communicating with external systems.
>>
>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
>> > wrote:
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>> option to exchange data with python for pandas UDF processing. There has
>> also been discussion around extending this to allow for exchanging data
>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>> processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems.
>> It also would allow for doing optimizations on a CPU with SIMD instructions
>> similar to what Hive currently supports. Accelerated processing using a GPU
>> is something that we will start a separate discussion thread on, but I
>> wanted to set the context a bit.
>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>> months to try and understand how to make this work.  What we are proposing
>> is based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>>
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
>> erasure. The code generation is aware that the RDD actually holds
>> ColumnarBatchs and generates code to loop through the data in each batch as
>> InternalRows.
>>
>>
>> Instead, we propose a new set of

Re: [DISCUSS] Spark Columnar Processing

2019-04-01 Thread Reynold Xin
urrent structure and remaining issues. This is
>> orthogonal to cost-benefit trade-off discussion.
>> 
>> The code generation basically consists of three parts.
>> 1. Loading
>> 2. Selection (map, filter, ...)
>> 3. Projection
>> 
>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>> abstracted by using ColumnVector ( 
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>> (
>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>> ) ) class. By combining with ColumnarBatchScan, the whole-stage code
>> generation generate code to directly get valus from the columnar storage
>> if there is no row-based operation.
>> Note: The current master does not support Arrow as a data source. However,
>> I think it is not technically hard to support Arrow.
>> 
>> 2. The current whole-stage codegen generates code for element-wise
>> selection (excluding sort and join). The SIMDzation or GPUization
>> capability depends on a compiler that translates native code from the code
>> generated by the whole-stage codegen.
>> 
>> 3. The current Projection assume to store row-oriented data, I think that
>> is a part that Wenchen pointed out
>> 
>> My slides 
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>> (
>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use
>> ) may simplify the above issue and possible implementation.
>> 
>> 
>> 
>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python
>> at SAIS 2019 
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>> (
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>> ). I think that it uses Python UDF support with Arrow in Spark.
>> 
>> P.S. I will give a presentation about in-memory data storages for SPark at
>> SAIS 2019 
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> (
>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>> ) :)
>> 
>> Kazuaki Ishizaki
>> 
>> 
>> 
>> From:        Wenchen Fan < cloud0fan@ gmail. com ( cloud0...@gmail.com ) >
>> 
>> To:        Bobby Evans < bobby@ apache. org ( bo...@apache.org ) >
>> Cc:        Spark dev list < dev@ spark. apache. org ( dev@spark.apache.org
>> ) >
>> Date:        2019/03/26 13:53
>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>> 
>> 
>> 
>> Do you have some initial perf numbers? It seems fine to me to remain
>> row-based inside Spark with whole-stage-codegen, and convert rows to
>> columnar batches when communicating with external systems.
>> 
>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bo...@apache.org (
>> bo...@apache.org ) > wrote:
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as
>> an option to exchange data with python for pandas UDF processing. There
>> has also been discussion around extending this to allow for exchanging
>> data with other tools like pytorch, tensorflow, xgboost,... If Spark
>> supports processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems. 
>> It also would allow for doing optimizations on a CPU with SIMD
>> instructions similar to what Hive currently supports. Accelerated
>> processing using a GPU is something that we will start a separate
>> discussion thread on, but I wanted to set the context a bit.
>> Jason Lowe, Tom Graves, and I created a prototype over the past few months
>> to try and understand how to make this work.  What we are proposing is
>> based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>> 
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s
>> type erasure. The code generation is aware t

Re: [DISCUSS] Spark Columnar Processing

2019-03-27 Thread Bobby Evans
s
> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
> simplify the above issue and possible implementation.
>
>
>
> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python
> at SAIS 2019
> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
> I think that it uses Python UDF support with Arrow in Spark.
>
> P.S. I will give a presentation about in-memory data storages for SPark at
> SAIS 2019
> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
> :)
>
> Kazuaki Ishizaki
>
>
>
> From:    Wenchen Fan 
> To:Bobby Evans 
> Cc:Spark dev list 
> Date:2019/03/26 13:53
> Subject:Re: [DISCUSS] Spark Columnar Processing
> --
>
>
>
> Do you have some initial perf numbers? It seems fine to me to remain
> row-based inside Spark with whole-stage-codegen, and convert rows to
> columnar batches when communicating with external systems.
>
> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
> > wrote:
> This thread is to discuss adding in support for data frame processing
> using an in-memory columnar format compatible with Apache Arrow.  My main
> goal in this is to lay the groundwork so we can add in support for GPU
> accelerated processing of data frames, but this feature has a number of
> other benefits.  Spark currently supports Apache Arrow formatted data as an
> option to exchange data with python for pandas UDF processing. There has
> also been discussion around extending this to allow for exchanging data
> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
> processing on Arrow compatible data it could eliminate the
> serialization/deserialization overhead when going between these systems.
> It also would allow for doing optimizations on a CPU with SIMD instructions
> similar to what Hive currently supports. Accelerated processing using a GPU
> is something that we will start a separate discussion thread on, but I
> wanted to set the context a bit.
> Jason Lowe, Tom Graves, and I created a prototype over the past few months
> to try and understand how to make this work.  What we are proposing is
> based off of lessons learned when building this prototype, but we really
> wanted to get feedback early on from the community. We will file a SPIP
> once we can get agreement that this is a good direction to go in.
>
> The current support for columnar processing lets a Parquet or Orc file
> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
> erasure. The code generation is aware that the RDD actually holds
> ColumnarBatchs and generates code to loop through the data in each batch as
> InternalRows.
>
>
> Instead, we propose a new set of APIs to work on an
> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
> propose adding in a Rule similar to how WholeStageCodeGen currently works.
> Each part of the physical SparkPlan would expose columnar support through a
> combination of traits and method calls. The rule would then decide when
> columnar processing would start and when it would end. Switching between
> columnar and row based processing is not free, so the rule would make a
> decision based off of an estimate of the cost to do the transformation and
> the estimated speedup in processing time.
>
>
> This should allow us to disable columnar support by simply disabling the
> rule that modifies the physical SparkPlan.  It should be minimal risk to
> the existing row-based code path, as that code should not be touched, and
> in many cases could be reused to implement the columnar version.  This also
> allows for small easily manageable patches. No huge patches that no one
> wants to review.
>
>
> As far as the memory layout is concerned OnHeapColumnVector and
> OffHeapColumnVector are already really close to being Apache Arrow
> compatible so shifting them over would be a relatively simple change.
> Alternatively we could add in a new implementation that is Arrow compatible
> if there are reasons to keep the old ones.
>
>
> Again this is just to get the discussion started, any feedback is welcome,
> and we will file a SPIP on it once we feel like the major changes we are
> proposing are acceptable.
>
> Thanks,
>
> Bobby Evans
>
>


Re: [DISCUSS] Spark Columnar Processing

2019-03-26 Thread Kazuaki Ishizaki
Looks interesting discussion.
Let me describe the current structure and remaining issues. This is 
orthogonal to cost-benefit trade-off discussion.

The code generation basically consists of three parts.
1. Loading
2. Selection (map, filter, ...)
3. Projection

1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well 
abstracted by using ColumnVector (
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
) class. By combining with ColumnarBatchScan, the whole-stage code 
generation generate code to directly get valus from the columnar storage 
if there is no row-based operation.
Note: The current master does not support Arrow as a data source. However, 
I think it is not technically hard to support Arrow.

2. The current whole-stage codegen generates code for element-wise 
selection (excluding sort and join). The SIMDzation or GPUization 
capability depends on a compiler that translates native code from the code 
generated by the whole-stage codegen.

3. The current Projection assume to store row-oriented data, I think that 
is a part that Wenchen pointed out

My slides 
https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
 
may simplify the above issue and possible implementation.



FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python 
at SAIS 2019 
https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
. I think that it uses Python UDF support with Arrow in Spark.

P.S. I will give a presentation about in-memory data storages for SPark at 
SAIS 2019 
https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 
:)

Kazuaki Ishizaki



From:   Wenchen Fan 
To: Bobby Evans 
Cc: Spark dev list 
Date:   2019/03/26 13:53
Subject:Re: [DISCUSS] Spark Columnar Processing



Do you have some initial perf numbers? It seems fine to me to remain 
row-based inside Spark with whole-stage-codegen, and convert rows to 
columnar batches when communicating with external systems.

On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans  wrote:
This thread is to discuss adding in support for data frame processing 
using an in-memory columnar format compatible with Apache Arrow.  My main 
goal in this is to lay the groundwork so we can add in support for GPU 
accelerated processing of data frames, but this feature has a number of 
other benefits.  Spark currently supports Apache Arrow formatted data as 
an option to exchange data with python for pandas UDF processing. There 
has also been discussion around extending this to allow for exchanging 
data with other tools like pytorch, tensorflow, xgboost,... If Spark 
supports processing on Arrow compatible data it could eliminate the 
serialization/deserialization overhead when going between these systems.  
It also would allow for doing optimizations on a CPU with SIMD 
instructions similar to what Hive currently supports. Accelerated 
processing using a GPU is something that we will start a separate 
discussion thread on, but I wanted to set the context a bit.
Jason Lowe, Tom Graves, and I created a prototype over the past few months 
to try and understand how to make this work.  What we are proposing is 
based off of lessons learned when building this prototype, but we really 
wanted to get feedback early on from the community. We will file a SPIP 
once we can get agreement that this is a good direction to go in.

The current support for columnar processing lets a Parquet or Orc file 
format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s 
type erasure. The code generation is aware that the RDD actually holds 
ColumnarBatchs and generates code to loop through the data in each batch 
as InternalRows.

Instead, we propose a new set of APIs to work on an 
RDD[InternalColumnarBatch] instead of abusing type erasure. With this we 
propose adding in a Rule similar to how WholeStageCodeGen currently works. 
Each part of the physical SparkPlan would expose columnar support through 
a combination of traits and method calls. The rule would then decide when 
columnar processing would start and when it would end. Switching between 
columnar and row based processing is not free, so the rule would make a 
decision based off of an estimate of the cost to do the transformation and 
the estimated speedup in processing time. 

This should allow us to disable columnar support by simply disabling the 
rule that modifies the physical SparkPlan.  It should be minimal risk to 
the existing row-based code path, as that code should not be touched, and 
in many cases could be reused to implement the columnar version.  This 
also allows for small easily manageable patches. No huge patches that no 
one wants to review.

As far as the memory layout is concerned OnHeapColumnVector and 
OffHeapColumnVector are already really close to being Apache Arrow 
compatible so shifting them over would be a relatively simple

Re: [DISCUSS] Spark Columnar Processing

2019-03-26 Thread Bobby Evans
Reynold,

>From our experiments, it is not a massive refactoring of the code.  Most
expressions can be supported by a relatively small change while leaving the
existing code path untouched.  We didn't try to do columnar with code
generation, but I suspect it would be similar, although the code generation
code itself is rather complicated which is why we avoided it initially.

To address your other concerns.

> 1. GPU machines are much more expensive & difficult to get.
In our experiments, a 4x performance improvement in a query offsets the
cost of a GPU, at least for the latest generation T4 GPUs.  In some
queries, we were able to do the entire processing for half the cost because
we could finish faster.  But yes it does depend on the type of processing
you are doing and the more numerically complicated the processing is the
more of a speedup you will see.  One of the reasons that we started down
this path is because our customers were seeing real-world ML use cases
where the data preparation (ETL) was dominating the training time for their
models. They already had GPU nodes to train their models on and were using
Spark to do the data preparation and orchestrate the training so while ETL
was happening they were paying for GPUs that were sitting idle, which was
most of the time. Hopefully, the new GPU aware scheduling feature will
remove the need for idle GPUs, but I still think the cost story is
compelling even without it. And if the cost model makes sense, GPU enabled
nodes will become easier to get ahold of.

> 2. Bandwidth from system to GPUs is usually small
This does vary a lot depending on the hardware you have.  The theoretical
limit of a 16 pair PCIe 3.0 connection is about 15 GB/s.  In GCP we
measured a 12GB/s bandwidth between GPU and main memory (bidirectional 12
up and 12 down simultaneously).  Yes, this is tiny in comparison to the
bandwidth you can get from L1 cache, but if you do need to hit main memory,
which I would expect for big data, the bandwidth typically is not that
different.  PCIe 4.0 and 5.0 are coming and should surpass the theoretical
limits of what DDR4 can deliver.  So even if it does not make sense right
now the cost calculations are likely to change over the next few years.
You still take a hit because the data is typically staged in main memory
before moving it to the GPU, which is why you need a cost-based optimizer
to decide if it is worth it.  We also have options to use compression as
part of the transfers if necessary to increase the practical bandwidth.

> 3. It's a massive effort.
> In general it's a cost-benefit trade-off.
If we include testing, documentation etc. yes it does add up to a lot of
work. I get that you don't want someone to show up, dump a ton of untested
code, and walk away.  I don't want that either.  That is why the proposal
makes it a separate optional code path that can be off by default and
allows us to add functionality one operator at a time.  We are willing to
make a long term commitment to this and see it through so that it is a
success. If that means providing hardware for testing we already started
discussions internally about what is the best way to do that.  If it means
ongoing support in the community we are willing to do that.

Please let me know if I missed any of your concerns, or if I misunderstood
any of them.

Thanks,

Bobby



On Tue, Mar 26, 2019 at 12:21 PM Reynold Xin  wrote:

> 26% improvement is underwhelming if it requires massive refactoring of the
> codebase. Also you can't just add the benefits up this way, because:
>
> - Both vectorization and codegen reduces the overhead in virtual function
> calls
> - Vectorization code is more friendly to compilers / CPUs, but requires
> materializing a lot of data in memory (or cache)
> - Codegen reduces the amount of data that flows through memory, but for
> complex queries the generated code might not be very compiler / CPU friendly
>
>
> I see massive benefits in leveraging GPUs (and other accelerators) for
> numeric workloads (e.g. machine learning), so I think it makes a lot of
> sense to be able to get data out of Spark quickly into UDFs for such
> workloads.
>
> I don't see as much benefits for general data processing, for a few
> reasons:
>
> 1. GPU machines are much more expensive & difficult to get (e.g. in the
> cloud they are 3 to 5x more expensive, with limited availability based on
> my experience), so it is difficult to build a farm
> 2. Bandwidth from system to GPUs is usually small, so if you could fit the
> working set in GPU memory and repeatedly work on it (e.g. machine
> learning), it's great, but otherwise it's not great.
> 3. It's a massive effort.
>
> In general it's a cost-benefit trade-off. I'm not aware of any general
> framework that allows us to write code once and have it work against both
> GPUs and CPUs reliably. If such framework exists, it will change the
> equation.
>
>
>
>
>
>
> On Tue, Mar 26, 2019 at 6:57 AM, Bobby Evans  wrote:
>
>> Cloudera 

Re: [DISCUSS] Spark Columnar Processing

2019-03-26 Thread Reynold Xin
26% improvement is underwhelming if it requires massive refactoring of the 
codebase. Also you can't just add the benefits up this way, because:

- Both vectorization and codegen reduces the overhead in virtual function calls

- Vectorization code is more friendly to compilers / CPUs, but requires 
materializing a lot of data in memory (or cache)

- Codegen reduces the amount of data that flows through memory, but for complex 
queries the generated code might not be very compiler / CPU friendly

I see massive benefits in leveraging GPUs (and other accelerators) for numeric 
workloads (e.g. machine learning), so I think it makes a lot of sense to be 
able to get data out of Spark quickly into UDFs for such workloads.

I don't see as much benefits for general data processing, for a few reasons:

1. GPU machines are much more expensive & difficult to get (e.g. in the cloud 
they are 3 to 5x more expensive, with limited availability based on my 
experience), so it is difficult to build a farm

2. Bandwidth from system to GPUs is usually small, so if you could fit the 
working set in GPU memory and repeatedly work on it (e.g. machine learning), 
it's great, but otherwise it's not great.

3. It's a massive effort.

In general it's a cost-benefit trade-off. I'm not aware of any general 
framework that allows us to write code once and have it work against both GPUs 
and CPUs reliably. If such framework exists, it will change the equation.

On Tue, Mar 26, 2019 at 6:57 AM, Bobby Evans < bo...@apache.org > wrote:

> 
> Cloudera reports a 26% improvement in hive query runtimes by enabling
> vectorization. I would expect to see similar improvements but at the cost
> of keeping more data in memory.  But remember this also enables a number
> of different hardware acceleration techniques.  If the data format is
> arrow compatible and off-heap someone could offload the processing to
> native code which typically results in a 2x improvement over java (and the
> cost of a JNI call would be amortized over processing an entire batch at
> once).  Also, we plan on adding in GPU acceleration and ideally making it
> a standard part of Spark.  In our initial prototype, we saw queries which
> we could make fully columnar/GPU enabled being 5-6x faster.  But that
> really was just a proof of concept and we expect to be able to do quite a
> bit better when we are completely done.  Many commercial GPU enabled SQL
> engines claim to be 20x to 200x faster than Spark, depending on the use
> case. Digging deeply you see that they are not apples to apples
> comparisons, i.e. reading from cached GPU memory and having spark read
> from a file, or using parquet as input but asking spark to read CSV.  That
> being said I would expect that we can achieve something close to the 20x
> range for most queries and possibly more if they are computationally
> intensive.
> 
> 
> Also as a side note, we initially thought that the conversion would not be
> too expensive and that we could just move computationally intensive
> processing onto the GPU piecemeal with conversions on both ends.  In
> practice, we found that the cost of conversion quickly starts to dominate
> the queries we were testing.
> 
> On Mon, Mar 25, 2019 at 11:53 PM Wenchen Fan < cloud0fan@ gmail. com (
> cloud0...@gmail.com ) > wrote:
> 
> 
>> Do you have some initial perf numbers? It seems fine to me to remain
>> row-based inside Spark with whole-stage-codegen, and convert rows to
>> columnar batches when communicating with external systems.
>> 
>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bobby@ apache. org (
>> bo...@apache.org ) > wrote:
>> 
>> 
>>> 
>>> 
>>> This thread is to discuss adding in support for data frame processing
>>> using an in-memory columnar format compatible with Apache Arrow.  My main
>>> goal in this is to lay the groundwork so we can add in support for GPU
>>> accelerated processing of data frames, but this feature has a number of
>>> other benefits.  Spark currently supports Apache Arrow formatted data as
>>> an option to exchange data with python for pandas UDF processing. There
>>> has also been discussion around extending this to allow for exchanging
>>> data with other tools like pytorch, tensorflow, xgboost,... If Spark
>>> supports processing on Arrow compatible data it could eliminate the
>>> serialization/deserialization overhead when going between these systems. 
>>> It also would allow for doing optimizations on a CPU with SIMD
>>> instructions similar to what Hive currently supports. Accelerated
>>> processing using a GPU is something that we will start a separate
>>> discussion thread on, but I wanted to set the context a bit.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Jason Lowe, Tom Graves, and I created a prototype over the past few months
>>> to try and understand how to make this work.  What we are proposing is
>>> based off of lessons learned when building this prototype, but we really
>>> wanted to get feedback early on from the community. 

Re: [DISCUSS] Spark Columnar Processing

2019-03-26 Thread Bobby Evans
Cloudera reports a 26% improvement in hive query runtimes by enabling
vectorization. I would expect to see similar improvements but at the cost
of keeping more data in memory.  But remember this also enables a number of
different hardware acceleration techniques.  If the data format is arrow
compatible and off-heap someone could offload the processing to native code
which typically results in a 2x improvement over java (and the cost of a
JNI call would be amortized over processing an entire batch at once).
Also, we plan on adding in GPU acceleration and ideally making it a
standard part of Spark.  In our initial prototype, we saw queries which we
could make fully columnar/GPU enabled being 5-6x faster.  But that really
was just a proof of concept and we expect to be able to do quite a bit
better when we are completely done.  Many commercial GPU enabled SQL
engines claim to be 20x to 200x faster than Spark, depending on the use
case. Digging deeply you see that they are not apples to apples
comparisons, i.e. reading from cached GPU memory and having spark read from
a file, or using parquet as input but asking spark to read CSV.  That being
said I would expect that we can achieve something close to the 20x range
for most queries and possibly more if they are computationally intensive.

Also as a side note, we initially thought that the conversion would not be
too expensive and that we could just move computationally intensive
processing onto the GPU piecemeal with conversions on both ends.  In
practice, we found that the cost of conversion quickly starts to dominate
the queries we were testing.

On Mon, Mar 25, 2019 at 11:53 PM Wenchen Fan  wrote:

> Do you have some initial perf numbers? It seems fine to me to remain
> row-based inside Spark with whole-stage-codegen, and convert rows to
> columnar batches when communicating with external systems.
>
> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans  wrote:
>
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>> option to exchange data with python for pandas UDF processing. There has
>> also been discussion around extending this to allow for exchanging data
>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>> processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems.
>> It also would allow for doing optimizations on a CPU with SIMD instructions
>> similar to what Hive currently supports. Accelerated processing using a GPU
>> is something that we will start a separate discussion thread on, but I
>> wanted to set the context a bit.
>>
>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>> months to try and understand how to make this work.  What we are proposing
>> is based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>>
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
>> erasure. The code generation is aware that the RDD actually holds
>> ColumnarBatchs and generates code to loop through the data in each batch as
>> InternalRows.
>>
>> Instead, we propose a new set of APIs to work on an
>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>> Each part of the physical SparkPlan would expose columnar support through a
>> combination of traits and method calls. The rule would then decide when
>> columnar processing would start and when it would end. Switching between
>> columnar and row based processing is not free, so the rule would make a
>> decision based off of an estimate of the cost to do the transformation and
>> the estimated speedup in processing time.
>>
>> This should allow us to disable columnar support by simply disabling the
>> rule that modifies the physical SparkPlan.  It should be minimal risk to
>> the existing row-based code path, as that code should not be touched, and
>> in many cases could be reused to implement the columnar version.  This also
>> allows for small easily manageable patches. No huge patches that no one
>> wants to review.
>>
>> As far as the memory layout is concerned OnHeapColumnVector and
>> OffHeapColumnVector are already really close to being Apache Arrow
>> compatible so shifting them over would be a relatively simple change.
>> Alternatively we could add in a new implementation that is Arrow compatible
>> if there are 

Re: [DISCUSS] Spark Columnar Processing

2019-03-25 Thread Wenchen Fan
Do you have some initial perf numbers? It seems fine to me to remain
row-based inside Spark with whole-stage-codegen, and convert rows to
columnar batches when communicating with external systems.

On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans  wrote:

> This thread is to discuss adding in support for data frame processing
> using an in-memory columnar format compatible with Apache Arrow.  My main
> goal in this is to lay the groundwork so we can add in support for GPU
> accelerated processing of data frames, but this feature has a number of
> other benefits.  Spark currently supports Apache Arrow formatted data as an
> option to exchange data with python for pandas UDF processing. There has
> also been discussion around extending this to allow for exchanging data
> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
> processing on Arrow compatible data it could eliminate the
> serialization/deserialization overhead when going between these systems.
> It also would allow for doing optimizations on a CPU with SIMD instructions
> similar to what Hive currently supports. Accelerated processing using a GPU
> is something that we will start a separate discussion thread on, but I
> wanted to set the context a bit.
>
> Jason Lowe, Tom Graves, and I created a prototype over the past few months
> to try and understand how to make this work.  What we are proposing is
> based off of lessons learned when building this prototype, but we really
> wanted to get feedback early on from the community. We will file a SPIP
> once we can get agreement that this is a good direction to go in.
>
> The current support for columnar processing lets a Parquet or Orc file
> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
> erasure. The code generation is aware that the RDD actually holds
> ColumnarBatchs and generates code to loop through the data in each batch as
> InternalRows.
>
> Instead, we propose a new set of APIs to work on an
> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
> propose adding in a Rule similar to how WholeStageCodeGen currently works.
> Each part of the physical SparkPlan would expose columnar support through a
> combination of traits and method calls. The rule would then decide when
> columnar processing would start and when it would end. Switching between
> columnar and row based processing is not free, so the rule would make a
> decision based off of an estimate of the cost to do the transformation and
> the estimated speedup in processing time.
>
> This should allow us to disable columnar support by simply disabling the
> rule that modifies the physical SparkPlan.  It should be minimal risk to
> the existing row-based code path, as that code should not be touched, and
> in many cases could be reused to implement the columnar version.  This also
> allows for small easily manageable patches. No huge patches that no one
> wants to review.
>
> As far as the memory layout is concerned OnHeapColumnVector and
> OffHeapColumnVector are already really close to being Apache Arrow
> compatible so shifting them over would be a relatively simple change.
> Alternatively we could add in a new implementation that is Arrow compatible
> if there are reasons to keep the old ones.
>
> Again this is just to get the discussion started, any feedback is welcome,
> and we will file a SPIP on it once we feel like the major changes we are
> proposing are acceptable.
>
> Thanks,
>
> Bobby Evans
>