BTW, you could also have a more efficient version of deduplicating
user table by using the topn feature [1].

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi RKandoji,
>
> In theory, you don't need to do something.
> First, the optimizer will optimize by doing duplicate nodes.
> Second, after SQL optimization, if the optimized plan still has duplicate
> nodes, the planner will automatically reuse them.
> There are config options to control whether we should reuse plan, their
> default value is true. So you don't need modify them.
> - table.optimizer.reuse-sub-plan-enabled
> - table.optimizer.reuse-source-enabled
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote:
>
>> Thanks Terry and Jingsong,
>>
>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>> I'll switch to 1.9 version to try out blink planner.
>>
>> Could you please point me to any examples (Java preferred) using
>> SubplanReuser?
>>
>> Thanks,
>> RK
>>
>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>
>>>        Join                      Join
>>>      /      \                  /      \
>>>  Filter1  Filter2          Filter1  Filter2
>>>     |        |        =>       \     /
>>>  Project1 Project2            Project1
>>>     |        |                   |
>>>   Scan1    Scan2               Scan1
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com> wrote:
>>>
>>>> Hi RKandoji~
>>>>
>>>> Could you provide more info about your poc environment?
>>>> Stream or batch? Flink planner or blink planner?
>>>> AFAIK, blink planner has done some optimization to deal such duplicate
>>>> task for one same query. You can have a try with blink planner :
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>
>>>> Best,
>>>> Terry Wang
>>>>
>>>>
>>>>
>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>>>>
>>>> Hi Team,
>>>>
>>>> I'm doing a POC with flink to understand if it's a good fit for my use
>>>> case.
>>>>
>>>> As part of the process, I need to filter duplicate items and created
>>>> below query to get only the latest records based on timestamp. For
>>>> instance, I have "Users" table which may contain multiple messages for the
>>>> same "userId". So I wrote below query to get only the latest message for a
>>>> given "userId"
>>>>
>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>>>> userId)");
>>>>
>>>> The above query works as expected and contains only the latest users
>>>> based on timestamp.
>>>>
>>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>>> operation, I see multiple tasks in the flink dashboard for the same query
>>>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>>>> as many times I'm using the table.
>>>>
>>>> Below is the JOIN query.
>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>>                                        LEFT JOIN uniqueUsersTbl aa ON
>>>> c.userId = aa.userId
>>>>                                        LEFT JOIN uniqueUsersTbl ab
>>>> ON c.ownerId = ab.userId
>>>>                                        LEFT JOIN uniqueUsersTbl ac ON
>>>> c.sellerId = ac.userId
>>>>                                        LEFT JOIN uniqueUsersTbl ad
>>>> ON c.buyerId = ad.userId");
>>>>
>>>> Could someone please help me understand how I can avoid these duplicate
>>>> tasks?
>>>>
>>>>
>>>> Thanks,
>>>> R Kandoji
>>>>
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>

Reply via email to