Ok thanks, does it mean version 1.9.2 is what I need to use? On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li <jingsongl...@gmail.com> wrote:
> Blink planner was introduced in 1.9. We recommend use blink planner after > 1.9. > After some bug fix, I think the latest version of 1.9 is OK. The > production environment has also been set up in some places. > > Best, > Jingsong Lee > > On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkand...@gmail.com> wrote: > >> Thanks Jingsong and Kurt for more details. >> >> Yes, I'm planning to try out DeDuplication when I'm done upgrading to >> version 1.9. Hopefully deduplication is done by only one task and reused >> everywhere else. >> >> One more follow-up question, I see "For production use cases, we >> recommend the old planner that was present before Flink 1.9 for now." warning >> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/ >> This is actually the reason why started with version 1.8, could you >> please let me know your opinion about this? and do you think there is any >> production code running on version 1.9 >> >> Thanks, >> Reva >> >> >> >> >> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> BTW, you could also have a more efficient version of deduplicating >>> user table by using the topn feature [1]. >>> >>> Best, >>> Kurt >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n >>> >>> >>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Hi RKandoji, >>>> >>>> In theory, you don't need to do something. >>>> First, the optimizer will optimize by doing duplicate nodes. >>>> Second, after SQL optimization, if the optimized plan still has >>>> duplicate nodes, the planner will automatically reuse them. >>>> There are config options to control whether we should reuse plan, their >>>> default value is true. So you don't need modify them. >>>> - table.optimizer.reuse-sub-plan-enabled >>>> - table.optimizer.reuse-source-enabled >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote: >>>> >>>>> Thanks Terry and Jingsong, >>>>> >>>>> Currently I'm on 1.8 version using Flink planner for stream proessing, >>>>> I'll switch to 1.9 version to try out blink planner. >>>>> >>>>> Could you please point me to any examples (Java preferred) using >>>>> SubplanReuser? >>>>> >>>>> Thanks, >>>>> RK >>>>> >>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi RKandoji, >>>>>> >>>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available. >>>>>> >>>>>> Join Join >>>>>> / \ / \ >>>>>> Filter1 Filter2 Filter1 Filter2 >>>>>> | | => \ / >>>>>> Project1 Project2 Project1 >>>>>> | | | >>>>>> Scan1 Scan2 Scan1 >>>>>> >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala >>>>>> >>>>>> Best, >>>>>> Jingsong Lee >>>>>> >>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi RKandoji~ >>>>>>> >>>>>>> Could you provide more info about your poc environment? >>>>>>> Stream or batch? Flink planner or blink planner? >>>>>>> AFAIK, blink planner has done some optimization to deal such >>>>>>> duplicate task for one same query. You can have a try with blink >>>>>>> planner : >>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment >>>>>>> >>>>>>> Best, >>>>>>> Terry Wang >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> I'm doing a POC with flink to understand if it's a good fit for my >>>>>>> use case. >>>>>>> >>>>>>> As part of the process, I need to filter duplicate items and created >>>>>>> below query to get only the latest records based on timestamp. For >>>>>>> instance, I have "Users" table which may contain multiple messages for >>>>>>> the >>>>>>> same "userId". So I wrote below query to get only the latest message >>>>>>> for a >>>>>>> given "userId" >>>>>>> >>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE >>>>>>> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users >>>>>>> GROUP BY userId)"); >>>>>>> >>>>>>> The above query works as expected and contains only the latest users >>>>>>> based on timestamp. >>>>>>> >>>>>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN >>>>>>> operation, I see multiple tasks in the flink dashboard for the same >>>>>>> query >>>>>>> that is creating "uniqueUsers" table. It is simply creating as many >>>>>>> tasks >>>>>>> as many times I'm using the table. >>>>>>> >>>>>>> Below is the JOIN query. >>>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers); >>>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c >>>>>>> LEFT JOIN uniqueUsersTbl aa >>>>>>> ON c.userId = aa.userId >>>>>>> LEFT JOIN uniqueUsersTbl ab >>>>>>> ON c.ownerId = ab.userId >>>>>>> LEFT JOIN uniqueUsersTbl ac >>>>>>> ON c.sellerId = ac.userId >>>>>>> LEFT JOIN uniqueUsersTbl ad >>>>>>> ON c.buyerId = ad.userId"); >>>>>>> >>>>>>> Could someone please help me understand how I can avoid these >>>>>>> duplicate tasks? >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> R Kandoji >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best, Jingsong Lee >>>>>> >>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> > > -- > Best, Jingsong Lee >