[ 
https://issues.apache.org/jira/browse/SPARK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell updated SPARK-8638:
-------------------------------------
    Attachment: perf_test3.scala

Another round of Benchmarking. This test benchmarks the performance of the 
current master and pull request with respect to frame type and partition size. 
This test uncovered a performance regression in the shrinking frame scenario; 
this has been fixed.

The results are shown below. In the smaller partition cases performance is 
about the same. From size 32 and onward you see that the growing frame case 
starts to perform better. From size 4096 and onward you see that the sliding 
frame case starts to perform better. The shrinking case is also performing a 
bit better (still horrible) in the larger partition sizes.

{noformat}
+-----+----------------+------+----------+-----+
| Size|            Name|Master|SPARK-8638| Diff|
+-----+----------------+------+----------+-----+
|    2|Entire Partition|  6963|      6896|0.990|
|    2|         Growing|  7525|      7371|0.980|
|    2|       Shrinking|  7668|      7769|1.013|
|    2|         Sliding|  7672|      7545|0.983|
|    8|Entire Partition|  6913|      6664|0.964|
|    8|         Growing|  7751|      7702|0.994|
|    8|       Shrinking|  7728|      7697|0.996|
|    8|         Sliding|  7651|      7544|0.986|
|   32|Entire Partition|  6941|      6617|0.953|
|   32|         Growing|  8225|      7359|0.895|
|   32|       Shrinking|  8082|      8030|0.994|
|   32|         Sliding|  7500|      7454|0.994|
|  128|Entire Partition|  6435|      6388|0.993|
|  128|         Growing| 10044|      7073|0.704|
|  128|       Shrinking| 10048|      9753|0.971|
|  128|         Sliding|  7311|      7249|0.992|
|  256|Entire Partition|  6282|      6462|1.029|
|  256|         Growing| 13031|      7039|0.540|
|  256|       Shrinking| 12777|     12040|0.942|
|  256|         Sliding|  7336|      7171|0.978|
| 1024|Entire Partition|  6196|      6171|0.996|
| 1024|         Growing| 31100|      6929|0.223|
| 1024|       Shrinking| 29719|     27215|0.916|
| 1024|         Sliding|  7255|      7012|0.967|
| 4096|Entire Partition|  6048|      5931|0.981|
| 4096|         Growing|122824|      6906|0.056|
| 4096|       Shrinking| 98196|     87880|0.895|
| 4096|         Sliding|  7403|      6961|0.940|
|16192|Entire Partition|  5678|      5713|1.006|
|16192|         Growing|600710|      6563|0.011|
|16192|       Shrinking|383676|    354638|0.924|
|16192|         Sliding| 12375|      7930|0.641|
+-----+----------------+------+----------+-----+

Master/Spark-8638 are the total processing times in milliseconds.
{noformat}

> Window Function Performance Improvements
> ----------------------------------------
>
>                 Key: SPARK-8638
>                 URL: https://issues.apache.org/jira/browse/SPARK-8638
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Herman van Hovell
>         Attachments: perf_test.scala, perf_test2.scala, perf_test3.scala
>
>
> Improve the performance of Spark Window Functions in the following cases:
> # Much better performance (10x) in the running case (e.g. BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW). The current implementation in spark uses a 
> sliding window approach in these cases. This means that an aggregate is 
> maintained for every row, so space usage is N (N being the number of rows). 
> This also means that all these aggregates all need to be updated separately, 
> this takes N*(N-1)/2 updates. The running case differs from the Sliding case 
> because we are only adding data to an aggregate function (no reset is 
> required), we only need to maintain one aggregate (like in the UNBOUNDED 
> PRECEDING AND UNBOUNDED case), update the aggregate for each row, and get the 
> aggregate value after each update. This is what the new implementation does. 
> This approach only uses 1 buffer, and only requires N updates; I am currently 
> working on data with window sizes of 500-1000 doing running sums and this 
> saves a lot of time.
> #. Fewer comparisons in the sliding case. The current implementation 
> determines frame boundaries for every input row. The new implementation makes 
> more use of the fact that the window is sorted, maintains the boundaries, and 
> only moves them when the current row order changes. This is a minor 
> improvement.
> # A single Window node is able to process all types of Frames for the same 
> Partitioning/Ordering. This saves a little time/memory spent buffering and 
> managing partitions. This will be enabled in a follow-up PR.
> # A lot of the staging code is moved from the execution phase to the 
> initialization phase. Minor performance improvement, and improves readability 
> of the execution code.
> The attached perf_test.scala file contains s number of queries which can be 
> used to measure the differences between the current and the proposed window 
> function implementation. In the tests the new implementation outperforms the 
> current implementation by a factor 7x in sliding window cases, and by a 
> factor 14x in the running window cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to