BTW, you could also have a more efficient version of deduplicating user table by using the topn feature [1].
Best, Kurt [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi RKandoji, > > In theory, you don't need to do something. > First, the optimizer will optimize by doing duplicate nodes. > Second, after SQL optimization, if the optimized plan still has duplicate > nodes, the planner will automatically reuse them. > There are config options to control whether we should reuse plan, their > default value is true. So you don't need modify them. > - table.optimizer.reuse-sub-plan-enabled > - table.optimizer.reuse-source-enabled > > Best, > Jingsong Lee > > On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote: > >> Thanks Terry and Jingsong, >> >> Currently I'm on 1.8 version using Flink planner for stream proessing, >> I'll switch to 1.9 version to try out blink planner. >> >> Could you please point me to any examples (Java preferred) using >> SubplanReuser? >> >> Thanks, >> RK >> >> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi RKandoji, >>> >>> FYI: Blink-planner subplan reusing: [1] 1.9 available. >>> >>> Join Join >>> / \ / \ >>> Filter1 Filter2 Filter1 Filter2 >>> | | => \ / >>> Project1 Project2 Project1 >>> | | | >>> Scan1 Scan2 Scan1 >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala >>> >>> Best, >>> Jingsong Lee >>> >>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> wrote: >>> >>>> Hi RKandoji~ >>>> >>>> Could you provide more info about your poc environment? >>>> Stream or batch? Flink planner or blink planner? >>>> AFAIK, blink planner has done some optimization to deal such duplicate >>>> task for one same query. You can have a try with blink planner : >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment >>>> >>>> Best, >>>> Terry Wang >>>> >>>> >>>> >>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道: >>>> >>>> Hi Team, >>>> >>>> I'm doing a POC with flink to understand if it's a good fit for my use >>>> case. >>>> >>>> As part of the process, I need to filter duplicate items and created >>>> below query to get only the latest records based on timestamp. For >>>> instance, I have "Users" table which may contain multiple messages for the >>>> same "userId". So I wrote below query to get only the latest message for a >>>> given "userId" >>>> >>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId, >>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY >>>> userId)"); >>>> >>>> The above query works as expected and contains only the latest users >>>> based on timestamp. >>>> >>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN >>>> operation, I see multiple tasks in the flink dashboard for the same query >>>> that is creating "uniqueUsers" table. It is simply creating as many tasks >>>> as many times I'm using the table. >>>> >>>> Below is the JOIN query. >>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers); >>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c >>>> LEFT JOIN uniqueUsersTbl aa ON >>>> c.userId = aa.userId >>>> LEFT JOIN uniqueUsersTbl ab >>>> ON c.ownerId = ab.userId >>>> LEFT JOIN uniqueUsersTbl ac ON >>>> c.sellerId = ac.userId >>>> LEFT JOIN uniqueUsersTbl ad >>>> ON c.buyerId = ad.userId"); >>>> >>>> Could someone please help me understand how I can avoid these duplicate >>>> tasks? >>>> >>>> >>>> Thanks, >>>> R Kandoji >>>> >>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> > > -- > Best, Jingsong Lee >