Hi,

Thanks a ton for the help with earlier questions, I updated code to version
1.9 and started using Blink Planner (DeDuplication). This is working as
expected!

I have a new question, but thought of asking in the same email chain as
this has more context about my use case etc.

Workflow:
Currently I'm reading from a couple of Kafka topics, DeDuplicating the
input data, performing JOINs and writing the joined data to another Kafka
topic.

Issue:
I set Parallelism to 8 and on analyzing the subtasks found that the data is
not distributed well among 8 parallel tasks for the last Join query. One of
a subtask is taking huge load, whereas others taking pretty low load.

Tried a couple of things below, but no use. Not sure if they are actually
related to the problem as I couldn't yet understand what's the issue here.
1. increasing the number of partitions of output Kafka topic.
2. tried adding keys to output so key partitioning happens at Kafka end.

Below is a snapshot for reference:
[image: image.png]

Below are the config changes I made:

taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
jobmanager.heap.size: 5000m
taskmanager.heap.size: 5000m
state.backend: rocksdb
state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
state.backend.incremental: true

I don't see any errors and job seems to be running smoothly (and slowly). I
need to make it distribute the load well for faster processing, any
pointers on what could be wrong and how to fix it would be very helpful.

Thanks,
RKandoji


On Fri, Jan 3, 2020 at 1:06 PM RKandoji <rkand...@gmail.com> wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji <rkand...@gmail.com> wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>>
>>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>>> after 1.9.
>>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>>> production environment has also been set up in some places.
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji <rkand...@gmail.com> wrote:
>>>>
>>>>> Thanks Jingsong and Kurt for more details.
>>>>>
>>>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>>>>> version 1.9. Hopefully deduplication is done by only one task and reused
>>>>> everywhere else.
>>>>>
>>>>> One more follow-up question, I see "For production use cases, we
>>>>> recommend the old planner that was present before Flink 1.9 for now." 
>>>>> warning
>>>>> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>>>>
>>>>> This is actually the reason why started with version 1.8, could you
>>>>> please let me know your opinion about this? and do you think there is any
>>>>> production code running on version 1.9
>>>>>
>>>>> Thanks,
>>>>> Reva
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>
>>>>>> BTW, you could also have a more efficient version of deduplicating
>>>>>> user table by using the topn feature [1].
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li <jingsongl...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi RKandoji,
>>>>>>>
>>>>>>> In theory, you don't need to do something.
>>>>>>> First, the optimizer will optimize by doing duplicate nodes.
>>>>>>> Second, after SQL optimization, if the optimized plan still has
>>>>>>> duplicate nodes, the planner will automatically reuse them.
>>>>>>> There are config options to control whether we should reuse plan,
>>>>>>> their default value is true. So you don't need modify them.
>>>>>>> - table.optimizer.reuse-sub-plan-enabled
>>>>>>> - table.optimizer.reuse-source-enabled
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji <rkand...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Terry and Jingsong,
>>>>>>>>
>>>>>>>> Currently I'm on 1.8 version using Flink planner for stream
>>>>>>>> proessing, I'll switch to 1.9 version to try out blink planner.
>>>>>>>>
>>>>>>>> Could you please point me to any examples (Java preferred) using
>>>>>>>> SubplanReuser?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> RK
>>>>>>>>
>>>>>>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <
>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi RKandoji,
>>>>>>>>>
>>>>>>>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>>>>>>>
>>>>>>>>>        Join                      Join
>>>>>>>>>      /      \                  /      \
>>>>>>>>>  Filter1  Filter2          Filter1  Filter2
>>>>>>>>>     |        |        =>       \     /
>>>>>>>>>  Project1 Project2            Project1
>>>>>>>>>     |        |                   |
>>>>>>>>>   Scan1    Scan2               Scan1
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong Lee
>>>>>>>>>
>>>>>>>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang <zjuwa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi RKandoji~
>>>>>>>>>>
>>>>>>>>>> Could you provide more info about your poc environment?
>>>>>>>>>> Stream or batch? Flink planner or blink planner?
>>>>>>>>>> AFAIK, blink planner has done some optimization to deal such
>>>>>>>>>> duplicate task for one same query. You can have a try with blink 
>>>>>>>>>> planner :
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Terry Wang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2019年12月30日 03:07,RKandoji <rkand...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>> I'm doing a POC with flink to understand if it's a good fit for
>>>>>>>>>> my use case.
>>>>>>>>>>
>>>>>>>>>> As part of the process, I need to filter duplicate items and
>>>>>>>>>> created below query to get only the latest records based on 
>>>>>>>>>> timestamp. For
>>>>>>>>>> instance, I have "Users" table which may contain multiple messages 
>>>>>>>>>> for the
>>>>>>>>>> same "userId". So I wrote below query to get only the latest message 
>>>>>>>>>> for a
>>>>>>>>>> given "userId"
>>>>>>>>>>
>>>>>>>>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
>>>>>>>>>> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM 
>>>>>>>>>> Users
>>>>>>>>>> GROUP BY userId)");
>>>>>>>>>>
>>>>>>>>>> The above query works as expected and contains only the latest
>>>>>>>>>> users based on timestamp.
>>>>>>>>>>
>>>>>>>>>> The issue is when I use "uniqueUsers" table multiple times in a
>>>>>>>>>> JOIN operation, I see multiple tasks in the flink dashboard for the 
>>>>>>>>>> same
>>>>>>>>>> query that is creating "uniqueUsers" table. It is simply creating as 
>>>>>>>>>> many
>>>>>>>>>> tasks as many times I'm using the table.
>>>>>>>>>>
>>>>>>>>>> Below is the JOIN query.
>>>>>>>>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>>>>>>>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>>>>>>>>                                        LEFT JOIN uniqueUsersTbl
>>>>>>>>>> aa ON c.userId = aa.userId
>>>>>>>>>>                                        LEFT JOIN uniqueUsersTbl
>>>>>>>>>> ab ON c.ownerId = ab.userId
>>>>>>>>>>                                        LEFT JOIN uniqueUsersTbl
>>>>>>>>>> ac ON c.sellerId = ac.userId
>>>>>>>>>>                                        LEFT JOIN uniqueUsersTbl
>>>>>>>>>> ad ON c.buyerId = ad.userId");
>>>>>>>>>>
>>>>>>>>>> Could someone please help me understand how I can avoid these
>>>>>>>>>> duplicate tasks?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> R Kandoji
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Reply via email to