[ 
https://issues.apache.org/jira/browse/SPARK-38785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516846#comment-17516846
 ] 

zhengruifeng commented on SPARK-38785:
--------------------------------------

h1. Pandas API on Spark: 
[EWM|https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.ewm.html]
h2. Goal:
 * only support function *ewm.mean* ({{{}ewma{}}} in the old versions), since 
it seems most freqeuent on the website (as to other funcitons: var/std/corr, I 
dont find the exact formulas and descreptions right now); 

 * support params: com, span, halflife, alpha;

 * do not support chaining with groupby (df.groupby.ewm.mean) for now (will be 
supported in the future);

 * do not support params: adjust (will be supported in the future), times 
(Deprecated in pandas) , ignore_na , axis , method;

 * do not support DatetimeIndex;

{{}}
{code:java}
In [14]: df = pd.DataFrame({'s1': [.2, .0, .6, .2, np.nan, .5, .6], 's2': [.3, 
.6, .0, .1, 0, 0, 0]})
    ...: 

In [15]: df.ewm(com=0.5).mean()
Out[15]: 
         s1        s2
0  0.200000  0.300000
1  0.050000  0.525000
2  0.430769  0.161538
3  0.275000  0.120000
4  0.275000  0.039669
5  0.468198  0.013187
6  0.563142  0.004392

In [16]: df.groupby('s1').ewm(com=0.5).mean()
Out[16]: 
         s2
s1         
0.0 1  0.60
0.2 0  0.30
    3  0.15
0.5 5  0.00
0.6 2  0.00
    6  0.00

In [17]: df.s1.ewm(com=0.5).mean()
Out[17]: 
0    0.200000
1    0.050000
2    0.430769
3    0.275000
4    0.275000
5    0.468198
6    0.563142
Name: s1, dtype: float64 {code}
 

unlike {{{}Rolling{}}}, {{ewm}} compute on an increasing frame, so I try to 
implement incremental methods. Right now, I have tried two different methods in 
the scala side.
h3. method 1: impl in SQL (https://github.com/apache/spark/pull/36063)
 * impl a new expression {{EWM extends AggregateWindowFunction}} , which 
compute the EWMA in an online fashion;

 * expose this method to pyspark in some way;

pros: benifit from the SQL optimizer

cons: like other operations in Pandas API on Spark, it will suffer the 
performance degradation due to single partition.
h3. method 2: impl in ML (https://github.com/apache/spark/pull/36062)
 * like {{{}ml.stat.Correlation{}}}, impl it in the ml side, and call it in 
pyspark;

 * compute in a parallel way (pretty like the way to compute 
{{{}zipWithIndex{}}}):

 ** perform a global sort by the indices;

 ** an extra pass is need to compute statistics on each partition, including 
partition size and some algorithm-specific accumulated values;

 ** use the statistics to compute the {{internal status}} for each partition, 
then {{ewma}} computation can be initialized by those {{internal status}} and 
then get the final values on each rows;

 ** I looked into two algorithms: {{ewma}} with {{{}adjust=true/false{}}}, both 
seems could be implemented in this way;

pros:
 * even if partitionSpec is not defined, it still can compute EWMA in a prallel 
way;

cons:
 * it need to be implemented based on RDD API, so it is a blackbox for SQL 
optimizer.

 * in-efficient when the dataset is small

 

They are quiet different, so before I go further, could you plz give me some 
suggestions and feedbacks? Thanks! 

 

[~hyukjin.kwon] [~XinrongM] [~itholic] [~ueshin] 

> impl Series.ewm and DataFrame.ewm
> ---------------------------------
>
>                 Key: SPARK-38785
>                 URL: https://issues.apache.org/jira/browse/SPARK-38785
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 3.4.0
>            Reporter: zhengruifeng
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to