Re: Need help with HashAggregateExec, TungstenAggregationIterator and UnsafeFixedWidthAggregationMap

2018-09-08 Thread Jacek Laskowski
Hi Herman,

Right. No @deprecated, but something that would tell people who review the
code "be extra careful since you're reading code that is no longer in use"
for SparkPlans that do support WSCG. That would help a lot as I got tricked
few times already while trying to understand something that I should not
have been bothered much with.

Thanks Russ and Herman for your help to get my thinking right. That will
also help my Spark clients, esp. during Spark SQL workshops!

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 8, 2018 at 3:53 PM Herman van Hovell 
wrote:

> ...pressed send to early...
>
> Moreover the we can't always use whole stage code generation. In that case
> we fall back to vulcano style execution, and chain together doExecute()
> calls.
>
> On Sat, Sep 8, 2018 at 3:51 PM Herman van Hovell 
> wrote:
>
>> SparkPlan.doExecute() is the only way you can execute a physical SQL
>> plan, so it should *not* be marked as deprecated. Wholestage code
>> generation collapses a subtree of SparkPlans (that support whole stage
>> codegeneration) into a single WholeStageCodegenExec pyhsical plan.
>> During execution we call doExecute() on the WholeStageCodegenExec node.
>>
>> On Sat, Sep 8, 2018 at 11:55 AM Jacek Laskowski  wrote:
>>
>>> Thanks Russ! That helps a lot.
>>>
>>> On the other hand makes reviewing the codebase of Spark SQL slightly
>>> harder since Java code generation is so much about string concatenation :(
>>>
>>> p.s. Should all the code in doExecute be considered and marked
>>> @deprecated?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Fri, Sep 7, 2018 at 10:05 PM Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
 That's my understanding :) doExecute is for non-codegen while doProduce
 and Consume are for generating code

 On Fri, Sep 7, 2018 at 2:59 PM Jacek Laskowski  wrote:

> Hi Devs,
>
> Sorry for bothering you with my questions (and concerns), but I really
> need to understand this piece of code (= my personal challenge :))
>
> Is this true that SparkPlan.doExecute (to "execute" a physical
> operator) is only used when whole-stage code gen is disabled (which is not
> by default)? May I call this execution path traditional (even
> "old-fashioned")?
>
> Is this true that these days SparkPlan.doProduce and
> SparkPlan.doConsume (and others) are used for "executing" a physical
> operator (i.e. to generate the Java source code) since whole-stage code
> generation is enabled and is currently the proper execution path?
>
> p.s. This SparkPlan.doExecute is used to trigger whole-stage code gen
> by WholeStageCodegenExec (and InputAdapter), but that's all the code that
> is to be executed by doExecute, isn't it?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Sep 7, 2018 at 7:24 PM Jacek Laskowski 
> wrote:
>
>> Hi Spark Devs,
>>
>> I really need your help understanding the relationship
>> between HashAggregateExec, TungstenAggregationIterator and
>> UnsafeFixedWidthAggregationMap.
>>
>> While exploring UnsafeFixedWidthAggregationMap and how it's used I've
>> noticed that it's for HashAggregateExec and TungstenAggregationIterator
>> exclusively. And given that TungstenAggregationIterator is used 
>> exclusively
>> in HashAggregateExec and the use of UnsafeFixedWidthAggregationMap in 
>> both
>> seems to be almost the same (if not the same), I've got a question I 
>> cannot
>> seem to answer myself.
>>
>> Since HashAggregateExec supports Whole-Stage Codegen
>> HashAggregateExec.doExecute won't be used at all, but doConsume and
>> doProduce (unless codegen is disabled). Is that correct?
>>
>> If so, TungstenAggregationIterator is not used at all, but
>> UnsafeFixedWidthAggregationMap is used directly instead (in the Java code
>> that uses createHashMap or finishAggregate). Is that correct?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> Mastering Spark SQ

Re: [DISCUSS] PySpark Window UDF

2018-09-08 Thread Wes McKinney
hi Li,

These results are very cool. I'm excited to see you continuing to push
this effort forward.

- Wes
On Wed, Sep 5, 2018 at 5:52 PM Li Jin  wrote:
>
> Hello again!
>
> I recently implemented a proof-of-concept implementation of proposal above. I 
> think the results are pretty exciting so I want to share my findings with the 
> community. I have implemented two variants of the pandas window UDF - one 
> that takes pandas.Series as input and one that takes numpy array as input. I 
> benchmarked with rolling mean on 1M doubles and here are some results:
>
> Spark SQL window function: 20s
> Pandas variant: ~60s
> Numpy variant: 10s
> Numpy variant with numba: 4s
>
> You can see the benchmark code here:
> https://gist.github.com/icexelloss/845beb3d0d6bfc3d51b3c7419edf0dcb
>
> I think the results are quite exciting because:
> (1) numpy variant even outperforms the Spark SQL window function
> (2) numpy variant with numba has the best performance as well as the 
> flexibility to allow users to write window functions in pure python
>
> The Pandas variant is not bad either (1.5x faster than existing UDF with 
> collect_list) but the numpy variant definitely has much better performance.
>
> So far all Pandas UDFs interacts with Pandas data structure rather than numpy 
> data structure, but the window UDF result might be a good reason to open up 
> numpy variants of Pandas UDFs. What do people think? I'd love to hear 
> community's feedbacks.
>
>
> Links:
> You can reproduce benchmark with numpy variant by using the branch:
> https://github.com/icexelloss/spark/tree/window-udf-numpy
>
> PR link:
> https://github.com/apache/spark/pull/22305
>
> On Wed, May 16, 2018 at 3:34 PM Li Jin  wrote:
>>
>> Hi All,
>>
>> I have been looking into leverage the Arrow and Pandas UDF work we have done 
>> so far for Window UDF in PySpark. I have done some investigation and believe 
>> there is a way to do PySpark window UDF efficiently.
>>
>> The basic idea is instead of passing each window to Python separately, we 
>> can pass a "batch of windows" as an Arrow Batch of rows + begin/end indices 
>> for each window (indices are computed on the Java side), and then rolling 
>> over the begin/end indices in Python and applies the UDF.
>>
>> I have written my investigation in more details here:
>> https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#
>>
>> I think this is a pretty promising and hope to get some feedback from the 
>> community about this approach. Let's discuss! :)
>>
>> Li

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Need help with HashAggregateExec, TungstenAggregationIterator and UnsafeFixedWidthAggregationMap

2018-09-08 Thread Jacek Laskowski
Thanks Russ! That helps a lot.

On the other hand makes reviewing the codebase of Spark SQL slightly harder
since Java code generation is so much about string concatenation :(

p.s. Should all the code in doExecute be considered and marked @deprecated?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Fri, Sep 7, 2018 at 10:05 PM Russell Spitzer 
wrote:

> That's my understanding :) doExecute is for non-codegen while doProduce
> and Consume are for generating code
>
> On Fri, Sep 7, 2018 at 2:59 PM Jacek Laskowski  wrote:
>
>> Hi Devs,
>>
>> Sorry for bothering you with my questions (and concerns), but I really
>> need to understand this piece of code (= my personal challenge :))
>>
>> Is this true that SparkPlan.doExecute (to "execute" a physical operator)
>> is only used when whole-stage code gen is disabled (which is not by
>> default)? May I call this execution path traditional (even "old-fashioned")?
>>
>> Is this true that these days SparkPlan.doProduce and SparkPlan.doConsume
>> (and others) are used for "executing" a physical operator (i.e. to generate
>> the Java source code) since whole-stage code generation is enabled and is
>> currently the proper execution path?
>>
>> p.s. This SparkPlan.doExecute is used to trigger whole-stage code gen
>> by WholeStageCodegenExec (and InputAdapter), but that's all the code that
>> is to be executed by doExecute, isn't it?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Sep 7, 2018 at 7:24 PM Jacek Laskowski  wrote:
>>
>>> Hi Spark Devs,
>>>
>>> I really need your help understanding the relationship
>>> between HashAggregateExec, TungstenAggregationIterator and
>>> UnsafeFixedWidthAggregationMap.
>>>
>>> While exploring UnsafeFixedWidthAggregationMap and how it's used I've
>>> noticed that it's for HashAggregateExec and TungstenAggregationIterator
>>> exclusively. And given that TungstenAggregationIterator is used exclusively
>>> in HashAggregateExec and the use of UnsafeFixedWidthAggregationMap in both
>>> seems to be almost the same (if not the same), I've got a question I cannot
>>> seem to answer myself.
>>>
>>> Since HashAggregateExec supports Whole-Stage Codegen
>>> HashAggregateExec.doExecute won't be used at all, but doConsume and
>>> doProduce (unless codegen is disabled). Is that correct?
>>>
>>> If so, TungstenAggregationIterator is not used at all, but
>>> UnsafeFixedWidthAggregationMap is used directly instead (in the Java code
>>> that uses createHashMap or finishAggregate). Is that correct?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> Mastering Spark SQL https://bit.ly/mastering-spark-sql
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>