[ https://issues.apache.org/jira/browse/SPARK-38785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516846#comment-17516846 ]
zhengruifeng commented on SPARK-38785: -------------------------------------- h1. Pandas API on Spark: [EWM|https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.ewm.html] h2. Goal: * only support function *ewm.mean* ({{{}ewma{}}} in the old versions), since it seems most freqeuent on the website (as to other funcitons: var/std/corr, I dont find the exact formulas and descreptions right now); * support params: com, span, halflife, alpha; * do not support chaining with groupby (df.groupby.ewm.mean) for now (will be supported in the future); * do not support params: adjust (will be supported in the future), times (Deprecated in pandas) , ignore_na , axis , method; * do not support DatetimeIndex; {{}} {code:java} In [14]: df = pd.DataFrame({'s1': [.2, .0, .6, .2, np.nan, .5, .6], 's2': [.3, .6, .0, .1, 0, 0, 0]}) ...: In [15]: df.ewm(com=0.5).mean() Out[15]: s1 s2 0 0.200000 0.300000 1 0.050000 0.525000 2 0.430769 0.161538 3 0.275000 0.120000 4 0.275000 0.039669 5 0.468198 0.013187 6 0.563142 0.004392 In [16]: df.groupby('s1').ewm(com=0.5).mean() Out[16]: s2 s1 0.0 1 0.60 0.2 0 0.30 3 0.15 0.5 5 0.00 0.6 2 0.00 6 0.00 In [17]: df.s1.ewm(com=0.5).mean() Out[17]: 0 0.200000 1 0.050000 2 0.430769 3 0.275000 4 0.275000 5 0.468198 6 0.563142 Name: s1, dtype: float64 {code} unlike {{{}Rolling{}}}, {{ewm}} compute on an increasing frame, so I try to implement incremental methods. Right now, I have tried two different methods in the scala side. h3. method 1: impl in SQL (https://github.com/apache/spark/pull/36063) * impl a new expression {{EWM extends AggregateWindowFunction}} , which compute the EWMA in an online fashion; * expose this method to pyspark in some way; pros: benifit from the SQL optimizer cons: like other operations in Pandas API on Spark, it will suffer the performance degradation due to single partition. h3. method 2: impl in ML (https://github.com/apache/spark/pull/36062) * like {{{}ml.stat.Correlation{}}}, impl it in the ml side, and call it in pyspark; * compute in a parallel way (pretty like the way to compute {{{}zipWithIndex{}}}): ** perform a global sort by the indices; ** an extra pass is need to compute statistics on each partition, including partition size and some algorithm-specific accumulated values; ** use the statistics to compute the {{internal status}} for each partition, then {{ewma}} computation can be initialized by those {{internal status}} and then get the final values on each rows; ** I looked into two algorithms: {{ewma}} with {{{}adjust=true/false{}}}, both seems could be implemented in this way; pros: * even if partitionSpec is not defined, it still can compute EWMA in a prallel way; cons: * it need to be implemented based on RDD API, so it is a blackbox for SQL optimizer. * in-efficient when the dataset is small They are quiet different, so before I go further, could you plz give me some suggestions and feedbacks? Thanks! [~hyukjin.kwon] [~XinrongM] [~itholic] [~ueshin] > impl Series.ewm and DataFrame.ewm > --------------------------------- > > Key: SPARK-38785 > URL: https://issues.apache.org/jira/browse/SPARK-38785 > Project: Spark > Issue Type: Sub-task > Components: PySpark > Affects Versions: 3.4.0 > Reporter: zhengruifeng > Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org