GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/22482
WIP - [SPARK-10816][SS] Support session window natively ## What changes were proposed in this pull request? This patch proposes native support of session window, like Spark has been supporting for time window. Please refer the attached doc in [SPARK-10816](https://issues.apache.org/jira/browse/SPARK-10816) for more details on rationalization, concepts, and limitation, etc. In point of end users' view, only the change is addition of "session" SQL function. End users could define query with session window as replacing "window" function to "session" function, and "window" column to "session" column. After then the patch will provide same experience with time window. Internally, this patch will change the physical plan of aggregation a bit: if there's session function being used in query, it will sort the input rows as "grouping keys" + "session", and merge overlapped sessions into one with applying aggregations, so it's like a sort based aggregation but the unit of group is grouping keys + session. Due to handle late event, there's a case multiple session windows co-exist per key which are not yet to evict. This patch handles the case via borrowing state implementation from streaming join which can handle multiple values for given key. ## How was this patch tested? Many UTs are added to verify session window queries for both batch and streaming. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-10816 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22482 ---- commit a1af74611df7dd5b979fc1a288de96e0b3d415da Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-04T23:10:47Z WIP nothing worked, just recording the progress commit be502485047283e203933a4d78e3b580a0c567df Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-06T04:36:11Z WIP not working yet... lots of implementations needed commit 7c60c0ad922ddacf025ad4762b85d06ab7cb258f Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-06T13:31:08Z WIP Finished implementing UpdatingSessionIterator commit 4e8c260a6e6b73b9bcd347ca242b8e77aedf8d1e Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-07T08:35:32Z WIP add verification on precondition "rows in iterator are sorted by key" commit 39069ded62dc5836b0b0f7c8ec7fb8ce869e5292 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-08T04:36:46Z Rename SymmetricHashJoinStateManager to MultiValuesStateManager * This will be also used from session window state as well commit c2716340e008000e1fcc5e4d3fcf9befa419ff77 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-08T04:41:37Z Move package of UpdatingSessionIterator commit df4cffd5fd1ea82be509f1cd97e5fc3a7ef8acb6 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-10T05:52:28Z WIP add MergingSortWithMultiValuesStateIterator, now integrating with stateful operators (WIP...) commit 79e32b918c3db41c7d6c1c1d55276d3f696746d5 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T06:54:37Z WIP the first version of working one! Still have lots of TODOs and FIXMEs to go commit fb7aa17488e5753c5460f383e1b0f4bedca6dee8 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T08:13:45Z Add more explanations commit 9f41b9d6e7960031c52603bd1da9aeca747e1dfb Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T08:49:01Z Silly bugfix & block session window for batch query as of now We can enable it but there're lots of approaches on aggregations in batch side... * AggUtils.planAggregateWithoutDistinct * AggUtils.planAggregateWithOneDistinct * RewriteDistinctAggregates * AggregateInPandasExec So unless we are sure which things to support, just block them for now... commit 0a62b1f0c274859061c0f3ab2c63450052985ac7 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T09:28:34Z More works: majorly split out updating session to individual physical node * we will leverage such node for batch case if we want commit acb5a0c42641041ca3adae2c9f2293b4dfa837cf Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T09:38:00Z Fix a silly bug and also add check for session window against batch query commit 1b6502c92231b7aaa9d0d6f620a5bcc624b862ec Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T11:30:15Z WIP Fixed eviction on update mode commit fec9a8ae5c1d421322738bd474fcb5508421f51a Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T12:48:07Z WIP found root reason of broken UT... fixed it commit c87e4eebcc53c81328d52e4d4ea270bcede8b26e Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T12:50:31Z WIP remove printing "explain" on UTs commit c0726d7447ce84440e46013d1cc392f1e397f183 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T14:43:32Z WIP address session to batch query (+ python) as well... not having tests for some aggregations * distinct * two distincts * pandas commit 69015789ab4f74bccf22fa5908205fa159145417 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-13T14:59:39Z WIP add more test on session batch query commit d0888f99f0321066c086022f191dc3a08994491c Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-14T05:13:32Z WIP add UT for sessions with keys overlapped commit fce4c435794505872d4c924319a4ba672d493183 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-14T05:49:47Z WIP refactor a bit commit e688f4d44c75f575769af60c27f39a267a3c8406 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-14T06:22:45Z WIP add more FIXMEs for javadoc, and remove invalid FIXMEs commit 5e4f08da3da0ec48ae8826a628a0084a18f4cb59 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-17T04:41:56Z WIP Repackage & remove unnecessary field commit 978cc70c9576de574515d60b8e21dce067ecb76e Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-17T06:12:50Z WIP addressed UPDATE mode, but doesn't look like performant commit 4f903b52d7c70b6849ab66da2a31fe0899c37e7a Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-17T06:57:19Z WIP remove FIXME since it is not relevant commit ce56ef201c056ff9caf7df3d48e1dd89c03e0119 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-17T08:50:29Z WIP update numOutputRows for Append mode commit e8c0f383eba641fd4456ddabbecaa5189c5b04d3 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T03:35:18Z WIP apply aggregations when merging sessions commit 56dd2f21f2978685932b021bc68f409efccefb45 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T05:28:54Z WIP simplify the code a bit commit 24cd615a08bc4629c6abdeb4a49aa565e4a2e271 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T05:56:41Z WIP address batch distinct query for sessionization commit b47f99f72d7eb508bce651a41eb37dfa2fc34096 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T06:17:15Z WIP remove debug statements for test code commit f182dd5afaa05cb7a9a874cf7959b273b19bc539 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T06:29:21Z WIP remove debug informations commit df0a430649c77195186f31be34d12200b3419123 Author: Jungtaek Lim <kabhwan@...> Date: 2018-09-18T20:51:50Z WIP port Sessionization example to UT of session window ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org