Re: Duplicate tasks for the same query

2020-01-07 Thread RKandoji
hi Kurt,

Thanks for the additional info.

RK

On Sun, Jan 5, 2020 at 8:33 PM Kurt Young  wrote:

> Another common skew case we've seen is null handling, the value of the
> join key
> is NULL. We will shuffle the NULL value into one task even if the join
> condition
> won't stand by definition.
>
> For DeDuplication, I just want to make sure this behavior meets your
> requirement.
> Because for some other usages, users might be only interested with the
> earliest
> records because the updating for the same key is purely redundant, like
> caused by
> upstream failure and process the same data again. In that case, each key
> will only have
> at most one record and you won't face any join key skewing issue.
>
> Best,
> Kurt
>
>
> On Mon, Jan 6, 2020 at 6:55 AM RKandoji  wrote:
>
>> Hi Kurt,
>>
>> I understand what you mean, some userIds may appear more frequently than
>> the others but this distribution doesn't look in proportionate with the
>> data skew. Do you think of any other possible reasons or anything I can try
>> out to investigate this more?
>>
>> For DeDuplication, I query for the latest record. Sorry I didn't follow
>> above sentence, do you mean that for each update to user table the
>> record(s) that were updated will be sent via retract stream.I think that's
>> expected as I need to process latest records, as long as it is sending only
>> the record(s) that's been updated.
>>
>> Thanks,
>> RKandoji
>>
>> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:
>>
>>> Hi RKandoji,
>>>
>>> It looks like you have a data skew issue with your input data. Some or
>>> maybe only one "userId" appears more frequent than others. For join
>>> operator to work correctly, Flink will apply "shuffle by join key"
>>> before the
>>> operator, so same "userId" will go to the same sub-task to perform join
>>> operation. In this case, I'm afraid there is nothing much you can do for
>>> now.
>>>
>>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>>> If
>>> you keep the latest version, Flink will tigger retraction and then send
>>> the latest
>>> record again every time when your user table changes.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>>>
 Hi,

 Thanks a ton for the help with earlier questions, I updated code to
 version 1.9 and started using Blink Planner (DeDuplication). This is
 working as expected!

 I have a new question, but thought of asking in the same email chain as
 this has more context about my use case etc.

 Workflow:
 Currently I'm reading from a couple of Kafka topics, DeDuplicating the
 input data, performing JOINs and writing the joined data to another Kafka
 topic.

 Issue:
 I set Parallelism to 8 and on analyzing the subtasks found that the
 data is not distributed well among 8 parallel tasks for the last Join
 query. One of a subtask is taking huge load, whereas others taking pretty
 low load.

 Tried a couple of things below, but no use. Not sure if they are
 actually related to the problem as I couldn't yet understand what's the
 issue here.
 1. increasing the number of partitions of output Kafka topic.
 2. tried adding keys to output so key partitioning happens at Kafka end.

 Below is a snapshot for reference:
 [image: image.png]

 Below are the config changes I made:

 taskmanager.numberOfTaskSlots: 8
 parallelism.default: 8
 jobmanager.heap.size: 5000m
 taskmanager.heap.size: 5000m
 state.backend: rocksdb
 state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
 state.backend.incremental: true

 I don't see any errors and job seems to be running smoothly (and
 slowly). I need to make it distribute the load well for faster processing,
 any pointers on what could be wrong and how to fix it would be very 
 helpful.

 Thanks,
 RKandoji


 On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
> wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>> wrote:
>>>
 Blink planner was introduced in 1.9. We recommend use blink planner
 after 1.9.
 After some bug fix, I think the latest version of 1.9 is OK. The
 production environment has also been set up in some places.

 Best,
 Jingsong Lee

 On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading
> to version 1.9. Hopefully 

Re: Duplicate tasks for the same query

2020-01-05 Thread Kurt Young
Another common skew case we've seen is null handling, the value of the join
key
is NULL. We will shuffle the NULL value into one task even if the join
condition
won't stand by definition.

For DeDuplication, I just want to make sure this behavior meets your
requirement.
Because for some other usages, users might be only interested with the
earliest
records because the updating for the same key is purely redundant, like
caused by
upstream failure and process the same data again. In that case, each key
will only have
at most one record and you won't face any join key skewing issue.

Best,
Kurt


On Mon, Jan 6, 2020 at 6:55 AM RKandoji  wrote:

> Hi Kurt,
>
> I understand what you mean, some userIds may appear more frequently than
> the others but this distribution doesn't look in proportionate with the
> data skew. Do you think of any other possible reasons or anything I can try
> out to investigate this more?
>
> For DeDuplication, I query for the latest record. Sorry I didn't follow
> above sentence, do you mean that for each update to user table the
> record(s) that were updated will be sent via retract stream.I think that's
> expected as I need to process latest records, as long as it is sending only
> the record(s) that's been updated.
>
> Thanks,
> RKandoji
>
> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:
>
>> Hi RKandoji,
>>
>> It looks like you have a data skew issue with your input data. Some or
>> maybe only one "userId" appears more frequent than others. For join
>> operator to work correctly, Flink will apply "shuffle by join key" before
>> the
>> operator, so same "userId" will go to the same sub-task to perform join
>> operation. In this case, I'm afraid there is nothing much you can do for
>> now.
>>
>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>> If
>> you keep the latest version, Flink will tigger retraction and then send
>> the latest
>> record again every time when your user table changes.
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>>
>>> Hi,
>>>
>>> Thanks a ton for the help with earlier questions, I updated code to
>>> version 1.9 and started using Blink Planner (DeDuplication). This is
>>> working as expected!
>>>
>>> I have a new question, but thought of asking in the same email chain as
>>> this has more context about my use case etc.
>>>
>>> Workflow:
>>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>>> input data, performing JOINs and writing the joined data to another Kafka
>>> topic.
>>>
>>> Issue:
>>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>>> is not distributed well among 8 parallel tasks for the last Join query. One
>>> of a subtask is taking huge load, whereas others taking pretty low load.
>>>
>>> Tried a couple of things below, but no use. Not sure if they are
>>> actually related to the problem as I couldn't yet understand what's the
>>> issue here.
>>> 1. increasing the number of partitions of output Kafka topic.
>>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>>
>>> Below is a snapshot for reference:
>>> [image: image.png]
>>>
>>> Below are the config changes I made:
>>>
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 8
>>> jobmanager.heap.size: 5000m
>>> taskmanager.heap.size: 5000m
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>>> state.backend.incremental: true
>>>
>>> I don't see any errors and job seems to be running smoothly (and
>>> slowly). I need to make it distribute the load well for faster processing,
>>> any pointers on what could be wrong and how to fix it would be very helpful.
>>>
>>> Thanks,
>>> RKandoji
>>>
>>>
>>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>>>
 Thanks!

 On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
 wrote:

> Yes,
>
> 1.9.2 or Coming soon 1.10
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>
>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>
>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>> wrote:
>>
>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>> after 1.9.
>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>> production environment has also been set up in some places.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>
 Thanks Jingsong and Kurt for more details.

 Yes, I'm planning to try out DeDuplication when I'm done upgrading
 to version 1.9. Hopefully deduplication is done by only one task and 
 reused
 everywhere else.

 One more follow-up question, I see "For production use cases, we
 recommend the old planner that was present before Flink 1.9 for now." 
 warning
 

Re: Duplicate tasks for the same query

2020-01-05 Thread RKandoji
Hi Kurt,

I understand what you mean, some userIds may appear more frequently than
the others but this distribution doesn't look in proportionate with the
data skew. Do you think of any other possible reasons or anything I can try
out to investigate this more?

For DeDuplication, I query for the latest record. Sorry I didn't follow
above sentence, do you mean that for each update to user table the
record(s) that were updated will be sent via retract stream.I think that's
expected as I need to process latest records, as long as it is sending only
the record(s) that's been updated.

Thanks,
RKandoji

On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:

> Hi RKandoji,
>
> It looks like you have a data skew issue with your input data. Some or
> maybe only one "userId" appears more frequent than others. For join
> operator to work correctly, Flink will apply "shuffle by join key" before
> the
> operator, so same "userId" will go to the same sub-task to perform join
> operation. In this case, I'm afraid there is nothing much you can do for
> now.
>
> BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
> you keep the latest version, Flink will tigger retraction and then send
> the latest
> record again every time when your user table changes.
>
> Best,
> Kurt
>
>
> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>
>> Hi,
>>
>> Thanks a ton for the help with earlier questions, I updated code to
>> version 1.9 and started using Blink Planner (DeDuplication). This is
>> working as expected!
>>
>> I have a new question, but thought of asking in the same email chain as
>> this has more context about my use case etc.
>>
>> Workflow:
>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>> input data, performing JOINs and writing the joined data to another Kafka
>> topic.
>>
>> Issue:
>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>> is not distributed well among 8 parallel tasks for the last Join query. One
>> of a subtask is taking huge load, whereas others taking pretty low load.
>>
>> Tried a couple of things below, but no use. Not sure if they are actually
>> related to the problem as I couldn't yet understand what's the issue here.
>> 1. increasing the number of partitions of output Kafka topic.
>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>
>> Below is a snapshot for reference:
>> [image: image.png]
>>
>> Below are the config changes I made:
>>
>> taskmanager.numberOfTaskSlots: 8
>> parallelism.default: 8
>> jobmanager.heap.size: 5000m
>> taskmanager.heap.size: 5000m
>> state.backend: rocksdb
>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>> state.backend.incremental: true
>>
>> I don't see any errors and job seems to be running smoothly (and slowly).
>> I need to make it distribute the load well for faster processing, any
>> pointers on what could be wrong and how to fix it would be very helpful.
>>
>> Thanks,
>> RKandoji
>>
>>
>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>>
>>> Thanks!
>>>
>>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>>> wrote:
>>>
 Yes,

 1.9.2 or Coming soon 1.10

 Best,
 Jingsong Lee

 On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:

> Ok thanks, does it mean version 1.9.2 is what I need to use?
>
> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
> wrote:
>
>> Blink planner was introduced in 1.9. We recommend use blink planner
>> after 1.9.
>> After some bug fix, I think the latest version of 1.9 is OK. The
>> production environment has also been set up in some places.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>
>>> Thanks Jingsong and Kurt for more details.
>>>
>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading
>>> to version 1.9. Hopefully deduplication is done by only one task and 
>>> reused
>>> everywhere else.
>>>
>>> One more follow-up question, I see "For production use cases, we
>>> recommend the old planner that was present before Flink 1.9 for now." 
>>> warning
>>> here
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>> This is actually the reason why started with version 1.8, could you
>>> please let me know your opinion about this? and do you think there is 
>>> any
>>> production code running on version 1.9
>>>
>>> Thanks,
>>> Reva
>>>
>>>
>>>
>>>
>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>>
 BTW, you could also have a more efficient version of deduplicating
 user table by using the topn feature [1].

 Best,
 Kurt

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


 On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
 wrote:

Re: Duplicate tasks for the same query

2020-01-03 Thread Kurt Young
Hi RKandoji,

It looks like you have a data skew issue with your input data. Some or
maybe only one "userId" appears more frequent than others. For join
operator to work correctly, Flink will apply "shuffle by join key" before
the
operator, so same "userId" will go to the same sub-task to perform join
operation. In this case, I'm afraid there is nothing much you can do for
now.

BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
you keep the latest version, Flink will tigger retraction and then send the
latest
record again every time when your user table changes.

Best,
Kurt


On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:

> Hi,
>
> Thanks a ton for the help with earlier questions, I updated code to
> version 1.9 and started using Blink Planner (DeDuplication). This is
> working as expected!
>
> I have a new question, but thought of asking in the same email chain as
> this has more context about my use case etc.
>
> Workflow:
> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
> input data, performing JOINs and writing the joined data to another Kafka
> topic.
>
> Issue:
> I set Parallelism to 8 and on analyzing the subtasks found that the data
> is not distributed well among 8 parallel tasks for the last Join query. One
> of a subtask is taking huge load, whereas others taking pretty low load.
>
> Tried a couple of things below, but no use. Not sure if they are actually
> related to the problem as I couldn't yet understand what's the issue here.
> 1. increasing the number of partitions of output Kafka topic.
> 2. tried adding keys to output so key partitioning happens at Kafka end.
>
> Below is a snapshot for reference:
> [image: image.png]
>
> Below are the config changes I made:
>
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 8
> jobmanager.heap.size: 5000m
> taskmanager.heap.size: 5000m
> state.backend: rocksdb
> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
> state.backend.incremental: true
>
> I don't see any errors and job seems to be running smoothly (and slowly).
> I need to make it distribute the load well for faster processing, any
> pointers on what could be wrong and how to fix it would be very helpful.
>
> Thanks,
> RKandoji
>
>
> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>
>> Thanks!
>>
>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>> wrote:
>>
>>> Yes,
>>>
>>> 1.9.2 or Coming soon 1.10
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>>
 Ok thanks, does it mean version 1.9.2 is what I need to use?

 On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
 wrote:

> Blink planner was introduced in 1.9. We recommend use blink planner
> after 1.9.
> After some bug fix, I think the latest version of 1.9 is OK. The
> production environment has also been set up in some places.
>
> Best,
> Jingsong Lee
>
> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>
>> Thanks Jingsong and Kurt for more details.
>>
>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>> version 1.9. Hopefully deduplication is done by only one task and reused
>> everywhere else.
>>
>> One more follow-up question, I see "For production use cases, we
>> recommend the old planner that was present before Flink 1.9 for now." 
>> warning
>> here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>> This is actually the reason why started with version 1.8, could you
>> please let me know your opinion about this? and do you think there is any
>> production code running on version 1.9
>>
>> Thanks,
>> Reva
>>
>>
>>
>>
>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>
>>> BTW, you could also have a more efficient version of deduplicating
>>> user table by using the topn feature [1].
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>
>>>
>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 In theory, you don't need to do something.
 First, the optimizer will optimize by doing duplicate nodes.
 Second, after SQL optimization, if the optimized plan still has
 duplicate nodes, the planner will automatically reuse them.
 There are config options to control whether we should reuse plan,
 their default value is true. So you don't need modify them.
 - table.optimizer.reuse-sub-plan-enabled
 - table.optimizer.reuse-source-enabled

 Best,
 Jingsong Lee

 On Tue, Dec 31, 2019 at 6:29 AM RKandoji 
 wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream
> proessing, I'll 

Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Hi,

Thanks a ton for the help with earlier questions, I updated code to version
1.9 and started using Blink Planner (DeDuplication). This is working as
expected!

I have a new question, but thought of asking in the same email chain as
this has more context about my use case etc.

Workflow:
Currently I'm reading from a couple of Kafka topics, DeDuplicating the
input data, performing JOINs and writing the joined data to another Kafka
topic.

Issue:
I set Parallelism to 8 and on analyzing the subtasks found that the data is
not distributed well among 8 parallel tasks for the last Join query. One of
a subtask is taking huge load, whereas others taking pretty low load.

Tried a couple of things below, but no use. Not sure if they are actually
related to the problem as I couldn't yet understand what's the issue here.
1. increasing the number of partitions of output Kafka topic.
2. tried adding keys to output so key partitioning happens at Kafka end.

Below is a snapshot for reference:
[image: image.png]

Below are the config changes I made:

taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
jobmanager.heap.size: 5000m
taskmanager.heap.size: 5000m
state.backend: rocksdb
state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
state.backend.incremental: true

I don't see any errors and job seems to be running smoothly (and slowly). I
need to make it distribute the load well for faster processing, any
pointers on what could be wrong and how to fix it would be very helpful.

Thanks,
RKandoji


On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:

> Thanks!
>
> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:
>
>> Yes,
>>
>> 1.9.2 or Coming soon 1.10
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>
>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>
>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>> wrote:
>>>
 Blink planner was introduced in 1.9. We recommend use blink planner
 after 1.9.
 After some bug fix, I think the latest version of 1.9 is OK. The
 production environment has also been set up in some places.

 Best,
 Jingsong Lee

 On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
> version 1.9. Hopefully deduplication is done by only one task and reused
> everywhere else.
>
> One more follow-up question, I see "For production use cases, we
> recommend the old planner that was present before Flink 1.9 for now." 
> warning
> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>
> This is actually the reason why started with version 1.8, could you
> please let me know your opinion about this? and do you think there is any
> production code running on version 1.9
>
> Thanks,
> Reva
>
>
>
>
> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>
>> BTW, you could also have a more efficient version of deduplicating
>> user table by using the topn feature [1].
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>
>>
>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> In theory, you don't need to do something.
>>> First, the optimizer will optimize by doing duplicate nodes.
>>> Second, after SQL optimization, if the optimized plan still has
>>> duplicate nodes, the planner will automatically reuse them.
>>> There are config options to control whether we should reuse plan,
>>> their default value is true. So you don't need modify them.
>>> - table.optimizer.reuse-sub-plan-enabled
>>> - table.optimizer.reuse-source-enabled
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>>
 Thanks Terry and Jingsong,

 Currently I'm on 1.8 version using Flink planner for stream
 proessing, I'll switch to 1.9 version to try out blink planner.

 Could you please point me to any examples (Java preferred) using
 SubplanReuser?

 Thanks,
 RK

 On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li <
 jingsongl...@gmail.com> wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> 

Re: Duplicate tasks for the same query

2020-01-03 Thread RKandoji
Thanks!

On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li  wrote:

> Yes,
>
> 1.9.2 or Coming soon 1.10
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>
>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>
>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>> wrote:
>>
>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>> after 1.9.
>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>> production environment has also been set up in some places.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>
 Thanks Jingsong and Kurt for more details.

 Yes, I'm planning to try out DeDuplication when I'm done upgrading to
 version 1.9. Hopefully deduplication is done by only one task and reused
 everywhere else.

 One more follow-up question, I see "For production use cases, we
 recommend the old planner that was present before Flink 1.9 for now." 
 warning
 here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
 This is actually the reason why started with version 1.8, could you
 please let me know your opinion about this? and do you think there is any
 production code running on version 1.9

 Thanks,
 Reva




 On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:

> BTW, you could also have a more efficient version of deduplicating
> user table by using the topn feature [1].
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>
>
> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> In theory, you don't need to do something.
>> First, the optimizer will optimize by doing duplicate nodes.
>> Second, after SQL optimization, if the optimized plan still has
>> duplicate nodes, the planner will automatically reuse them.
>> There are config options to control whether we should reuse plan,
>> their default value is true. So you don't need modify them.
>> - table.optimizer.reuse-sub-plan-enabled
>> - table.optimizer.reuse-source-enabled
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>
>>> Thanks Terry and Jingsong,
>>>
>>> Currently I'm on 1.8 version using Flink planner for stream
>>> proessing, I'll switch to 1.9 version to try out blink planner.
>>>
>>> Could you please point me to any examples (Java preferred) using
>>> SubplanReuser?
>>>
>>> Thanks,
>>> RK
>>>
>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 FYI: Blink-planner subplan reusing: [1] 1.9 available.

Join  Join
  /  \  /  \
  Filter1  Filter2  Filter1  Filter2
 ||=>   \ /
  Project1 Project2Project1
 ||   |
   Scan1Scan2   Scan1


 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

 Best,
 Jingsong Lee

 On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
 wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such
> duplicate task for one same query. You can have a try with blink 
> planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my
> use case.
>
> As part of the process, I need to filter duplicate items and
> created below query to get only the latest records based on 
> timestamp. For
> instance, I have "Users" table which may contain multiple messages 
> for the
> same "userId". So I wrote below query to get only the latest message 
> for a
> given "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM 
> Users
> GROUP BY userId)");
>
> The above query works as expected and contains only the latest
> users based on timestamp.
>
> The issue is when I use 

Re: Duplicate tasks for the same query

2020-01-02 Thread Jingsong Li
Yes,

1.9.2 or Coming soon 1.10

Best,
Jingsong Lee

On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:

> Ok thanks, does it mean version 1.9.2 is what I need to use?
>
> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
> wrote:
>
>> Blink planner was introduced in 1.9. We recommend use blink planner after
>> 1.9.
>> After some bug fix, I think the latest version of 1.9 is OK. The
>> production environment has also been set up in some places.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>
>>> Thanks Jingsong and Kurt for more details.
>>>
>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>>> version 1.9. Hopefully deduplication is done by only one task and reused
>>> everywhere else.
>>>
>>> One more follow-up question, I see "For production use cases, we
>>> recommend the old planner that was present before Flink 1.9 for now." 
>>> warning
>>> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>> This is actually the reason why started with version 1.8, could you
>>> please let me know your opinion about this? and do you think there is any
>>> production code running on version 1.9
>>>
>>> Thanks,
>>> Reva
>>>
>>>
>>>
>>>
>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>>
 BTW, you could also have a more efficient version of deduplicating
 user table by using the topn feature [1].

 Best,
 Kurt

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


 On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
 wrote:

> Hi RKandoji,
>
> In theory, you don't need to do something.
> First, the optimizer will optimize by doing duplicate nodes.
> Second, after SQL optimization, if the optimized plan still has
> duplicate nodes, the planner will automatically reuse them.
> There are config options to control whether we should reuse plan,
> their default value is true. So you don't need modify them.
> - table.optimizer.reuse-sub-plan-enabled
> - table.optimizer.reuse-source-enabled
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>
>> Thanks Terry and Jingsong,
>>
>> Currently I'm on 1.8 version using Flink planner for stream
>> proessing, I'll switch to 1.9 version to try out blink planner.
>>
>> Could you please point me to any examples (Java preferred) using
>> SubplanReuser?
>>
>> Thanks,
>> RK
>>
>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>
>>>Join  Join
>>>  /  \  /  \
>>>  Filter1  Filter2  Filter1  Filter2
>>> ||=>   \ /
>>>  Project1 Project2Project1
>>> ||   |
>>>   Scan1Scan2   Scan1
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
>>> wrote:
>>>
 Hi RKandoji~

 Could you provide more info about your poc environment?
 Stream or batch? Flink planner or blink planner?
 AFAIK, blink planner has done some optimization to deal such
 duplicate task for one same query. You can have a try with blink 
 planner :
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment

 Best,
 Terry Wang



 2019年12月30日 03:07,RKandoji  写道:

 Hi Team,

 I'm doing a POC with flink to understand if it's a good fit for my
 use case.

 As part of the process, I need to filter duplicate items and
 created below query to get only the latest records based on timestamp. 
 For
 instance, I have "Users" table which may contain multiple messages for 
 the
 same "userId". So I wrote below query to get only the latest message 
 for a
 given "userId"

 Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
 (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM 
 Users
 GROUP BY userId)");

 The above query works as expected and contains only the latest
 users based on timestamp.

 The issue is when I use "uniqueUsers" table multiple times in a
 JOIN operation, I see multiple tasks in the flink dashboard for the 
 same
 query that is creating "uniqueUsers" table. It is simply creating as 
 many

Re: Duplicate tasks for the same query

2020-01-02 Thread RKandoji
Ok thanks, does it mean version 1.9.2 is what I need to use?

On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li  wrote:

> Blink planner was introduced in 1.9. We recommend use blink planner after
> 1.9.
> After some bug fix, I think the latest version of 1.9 is OK. The
> production environment has also been set up in some places.
>
> Best,
> Jingsong Lee
>
> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>
>> Thanks Jingsong and Kurt for more details.
>>
>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>> version 1.9. Hopefully deduplication is done by only one task and reused
>> everywhere else.
>>
>> One more follow-up question, I see "For production use cases, we
>> recommend the old planner that was present before Flink 1.9 for now." warning
>> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>> This is actually the reason why started with version 1.8, could you
>> please let me know your opinion about this? and do you think there is any
>> production code running on version 1.9
>>
>> Thanks,
>> Reva
>>
>>
>>
>>
>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>
>>> BTW, you could also have a more efficient version of deduplicating
>>> user table by using the topn feature [1].
>>>
>>> Best,
>>> Kurt
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>>
>>>
>>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 In theory, you don't need to do something.
 First, the optimizer will optimize by doing duplicate nodes.
 Second, after SQL optimization, if the optimized plan still has
 duplicate nodes, the planner will automatically reuse them.
 There are config options to control whether we should reuse plan, their
 default value is true. So you don't need modify them.
 - table.optimizer.reuse-sub-plan-enabled
 - table.optimizer.reuse-source-enabled

 Best,
 Jingsong Lee

 On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream proessing,
> I'll switch to 1.9 version to try out blink planner.
>
> Could you please point me to any examples (Java preferred) using
> SubplanReuser?
>
> Thanks,
> RK
>
> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>
>>Join  Join
>>  /  \  /  \
>>  Filter1  Filter2  Filter1  Filter2
>> ||=>   \ /
>>  Project1 Project2Project1
>> ||   |
>>   Scan1Scan2   Scan1
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>
>> Best,
>> Jingsong Lee
>>
>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
>> wrote:
>>
>>> Hi RKandoji~
>>>
>>> Could you provide more info about your poc environment?
>>> Stream or batch? Flink planner or blink planner?
>>> AFAIK, blink planner has done some optimization to deal such
>>> duplicate task for one same query. You can have a try with blink 
>>> planner :
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>
>>> Best,
>>> Terry Wang
>>>
>>>
>>>
>>> 2019年12月30日 03:07,RKandoji  写道:
>>>
>>> Hi Team,
>>>
>>> I'm doing a POC with flink to understand if it's a good fit for my
>>> use case.
>>>
>>> As part of the process, I need to filter duplicate items and created
>>> below query to get only the latest records based on timestamp. For
>>> instance, I have "Users" table which may contain multiple messages for 
>>> the
>>> same "userId". So I wrote below query to get only the latest message 
>>> for a
>>> given "userId"
>>>
>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE
>>> (userId, userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users
>>> GROUP BY userId)");
>>>
>>> The above query works as expected and contains only the latest users
>>> based on timestamp.
>>>
>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>> operation, I see multiple tasks in the flink dashboard for the same 
>>> query
>>> that is creating "uniqueUsers" table. It is simply creating as many 
>>> tasks
>>> as many times I'm using the table.
>>>
>>> Below is the JOIN query.
>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>LEFT JOIN 

Re: Duplicate tasks for the same query

2020-01-01 Thread Jingsong Li
Blink planner was introduced in 1.9. We recommend use blink planner after
1.9.
After some bug fix, I think the latest version of 1.9 is OK. The production
environment has also been set up in some places.

Best,
Jingsong Lee

On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:

> Thanks Jingsong and Kurt for more details.
>
> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
> version 1.9. Hopefully deduplication is done by only one task and reused
> everywhere else.
>
> One more follow-up question, I see "For production use cases, we
> recommend the old planner that was present before Flink 1.9 for now." warning
> here https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
> This is actually the reason why started with version 1.8, could you please
> let me know your opinion about this? and do you think there is any
> production code running on version 1.9
>
> Thanks,
> Reva
>
>
>
>
> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>
>> BTW, you could also have a more efficient version of deduplicating
>> user table by using the topn feature [1].
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>>
>>
>> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> In theory, you don't need to do something.
>>> First, the optimizer will optimize by doing duplicate nodes.
>>> Second, after SQL optimization, if the optimized plan still has
>>> duplicate nodes, the planner will automatically reuse them.
>>> There are config options to control whether we should reuse plan, their
>>> default value is true. So you don't need modify them.
>>> - table.optimizer.reuse-sub-plan-enabled
>>> - table.optimizer.reuse-source-enabled
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>>
 Thanks Terry and Jingsong,

 Currently I'm on 1.8 version using Flink planner for stream proessing,
 I'll switch to 1.9 version to try out blink planner.

 Could you please point me to any examples (Java preferred) using
 SubplanReuser?

 Thanks,
 RK

 On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
 wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang 
> wrote:
>
>> Hi RKandoji~
>>
>> Could you provide more info about your poc environment?
>> Stream or batch? Flink planner or blink planner?
>> AFAIK, blink planner has done some optimization to deal such
>> duplicate task for one same query. You can have a try with blink planner 
>> :
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 2019年12月30日 03:07,RKandoji  写道:
>>
>> Hi Team,
>>
>> I'm doing a POC with flink to understand if it's a good fit for my
>> use case.
>>
>> As part of the process, I need to filter duplicate items and created
>> below query to get only the latest records based on timestamp. For
>> instance, I have "Users" table which may contain multiple messages for 
>> the
>> same "userId". So I wrote below query to get only the latest message for 
>> a
>> given "userId"
>>
>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>> userId)");
>>
>> The above query works as expected and contains only the latest users
>> based on timestamp.
>>
>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>> operation, I see multiple tasks in the flink dashboard for the same query
>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>> as many times I'm using the table.
>>
>> Below is the JOIN query.
>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>LEFT JOIN uniqueUsersTbl aa ON
>> c.userId = aa.userId
>>LEFT JOIN uniqueUsersTbl ab
>> ON c.ownerId = ab.userId
>>LEFT JOIN uniqueUsersTbl ac
>> ON c.sellerId = ac.userId
>> 

Re: Duplicate tasks for the same query

2019-12-31 Thread RKandoji
Thanks Jingsong and Kurt for more details.

Yes, I'm planning to try out DeDuplication when I'm done upgrading to
version 1.9. Hopefully deduplication is done by only one task and reused
everywhere else.

One more follow-up question, I see "For production use cases, we recommend
the old planner that was present before Flink 1.9 for now." warning here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
This is actually the reason why started with version 1.8, could you please
let me know your opinion about this? and do you think there is any
production code running on version 1.9

Thanks,
Reva




On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:

> BTW, you could also have a more efficient version of deduplicating
> user table by using the topn feature [1].
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>
>
> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> In theory, you don't need to do something.
>> First, the optimizer will optimize by doing duplicate nodes.
>> Second, after SQL optimization, if the optimized plan still has duplicate
>> nodes, the planner will automatically reuse them.
>> There are config options to control whether we should reuse plan, their
>> default value is true. So you don't need modify them.
>> - table.optimizer.reuse-sub-plan-enabled
>> - table.optimizer.reuse-source-enabled
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>
>>> Thanks Terry and Jingsong,
>>>
>>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>>> I'll switch to 1.9 version to try out blink planner.
>>>
>>> Could you please point me to any examples (Java preferred) using
>>> SubplanReuser?
>>>
>>> Thanks,
>>> RK
>>>
>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 FYI: Blink-planner subplan reusing: [1] 1.9 available.

Join  Join
  /  \  /  \
  Filter1  Filter2  Filter1  Filter2
 ||=>   \ /
  Project1 Project2Project1
 ||   |
   Scan1Scan2   Scan1


 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

 Best,
 Jingsong Lee

 On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such duplicate
> task for one same query. You can have a try with blink planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my use
> case.
>
> As part of the process, I need to filter duplicate items and created
> below query to get only the latest records based on timestamp. For
> instance, I have "Users" table which may contain multiple messages for the
> same "userId". So I wrote below query to get only the latest message for a
> given "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
> userId)");
>
> The above query works as expected and contains only the latest users
> based on timestamp.
>
> The issue is when I use "uniqueUsers" table multiple times in a JOIN
> operation, I see multiple tasks in the flink dashboard for the same query
> that is creating "uniqueUsers" table. It is simply creating as many tasks
> as many times I'm using the table.
>
> Below is the JOIN query.
> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>LEFT JOIN uniqueUsersTbl aa ON
> c.userId = aa.userId
>LEFT JOIN uniqueUsersTbl ab
> ON c.ownerId = ab.userId
>LEFT JOIN uniqueUsersTbl ac ON
> c.sellerId = ac.userId
>LEFT JOIN uniqueUsersTbl ad
> ON c.buyerId = ad.userId");
>
> Could someone please help me understand how I can avoid these
> duplicate tasks?
>
>
> Thanks,
> R Kandoji
>
>
>

 --
 Best, Jingsong Lee

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: Duplicate tasks for the same query

2019-12-30 Thread Kurt Young
BTW, you could also have a more efficient version of deduplicating
user table by using the topn feature [1].

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li  wrote:

> Hi RKandoji,
>
> In theory, you don't need to do something.
> First, the optimizer will optimize by doing duplicate nodes.
> Second, after SQL optimization, if the optimized plan still has duplicate
> nodes, the planner will automatically reuse them.
> There are config options to control whether we should reuse plan, their
> default value is true. So you don't need modify them.
> - table.optimizer.reuse-sub-plan-enabled
> - table.optimizer.reuse-source-enabled
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>
>> Thanks Terry and Jingsong,
>>
>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>> I'll switch to 1.9 version to try out blink planner.
>>
>> Could you please point me to any examples (Java preferred) using
>> SubplanReuser?
>>
>> Thanks,
>> RK
>>
>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>
>>>Join  Join
>>>  /  \  /  \
>>>  Filter1  Filter2  Filter1  Filter2
>>> ||=>   \ /
>>>  Project1 Project2Project1
>>> ||   |
>>>   Scan1Scan2   Scan1
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>>>
 Hi RKandoji~

 Could you provide more info about your poc environment?
 Stream or batch? Flink planner or blink planner?
 AFAIK, blink planner has done some optimization to deal such duplicate
 task for one same query. You can have a try with blink planner :
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment

 Best,
 Terry Wang



 2019年12月30日 03:07,RKandoji  写道:

 Hi Team,

 I'm doing a POC with flink to understand if it's a good fit for my use
 case.

 As part of the process, I need to filter duplicate items and created
 below query to get only the latest records based on timestamp. For
 instance, I have "Users" table which may contain multiple messages for the
 same "userId". So I wrote below query to get only the latest message for a
 given "userId"

 Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
 userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
 userId)");

 The above query works as expected and contains only the latest users
 based on timestamp.

 The issue is when I use "uniqueUsers" table multiple times in a JOIN
 operation, I see multiple tasks in the flink dashboard for the same query
 that is creating "uniqueUsers" table. It is simply creating as many tasks
 as many times I'm using the table.

 Below is the JOIN query.
 tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
 Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
LEFT JOIN uniqueUsersTbl aa ON
 c.userId = aa.userId
LEFT JOIN uniqueUsersTbl ab
 ON c.ownerId = ab.userId
LEFT JOIN uniqueUsersTbl ac ON
 c.sellerId = ac.userId
LEFT JOIN uniqueUsersTbl ad
 ON c.buyerId = ad.userId");

 Could someone please help me understand how I can avoid these duplicate
 tasks?


 Thanks,
 R Kandoji



>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Duplicate tasks for the same query

2019-12-30 Thread Jingsong Li
Hi RKandoji,

In theory, you don't need to do something.
First, the optimizer will optimize by doing duplicate nodes.
Second, after SQL optimization, if the optimized plan still has duplicate
nodes, the planner will automatically reuse them.
There are config options to control whether we should reuse plan, their
default value is true. So you don't need modify them.
- table.optimizer.reuse-sub-plan-enabled
- table.optimizer.reuse-source-enabled

Best,
Jingsong Lee

On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:

> Thanks Terry and Jingsong,
>
> Currently I'm on 1.8 version using Flink planner for stream proessing,
> I'll switch to 1.9 version to try out blink planner.
>
> Could you please point me to any examples (Java preferred) using
> SubplanReuser?
>
> Thanks,
> RK
>
> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>
>>Join  Join
>>  /  \  /  \
>>  Filter1  Filter2  Filter1  Filter2
>> ||=>   \ /
>>  Project1 Project2Project1
>> ||   |
>>   Scan1Scan2   Scan1
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>
>> Best,
>> Jingsong Lee
>>
>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>>
>>> Hi RKandoji~
>>>
>>> Could you provide more info about your poc environment?
>>> Stream or batch? Flink planner or blink planner?
>>> AFAIK, blink planner has done some optimization to deal such duplicate
>>> task for one same query. You can have a try with blink planner :
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>
>>> Best,
>>> Terry Wang
>>>
>>>
>>>
>>> 2019年12月30日 03:07,RKandoji  写道:
>>>
>>> Hi Team,
>>>
>>> I'm doing a POC with flink to understand if it's a good fit for my use
>>> case.
>>>
>>> As part of the process, I need to filter duplicate items and created
>>> below query to get only the latest records based on timestamp. For
>>> instance, I have "Users" table which may contain multiple messages for the
>>> same "userId". So I wrote below query to get only the latest message for a
>>> given "userId"
>>>
>>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>>> userId)");
>>>
>>> The above query works as expected and contains only the latest users
>>> based on timestamp.
>>>
>>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>>> operation, I see multiple tasks in the flink dashboard for the same query
>>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>>> as many times I'm using the table.
>>>
>>> Below is the JOIN query.
>>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>>LEFT JOIN uniqueUsersTbl aa ON
>>> c.userId = aa.userId
>>>LEFT JOIN uniqueUsersTbl ab
>>> ON c.ownerId = ab.userId
>>>LEFT JOIN uniqueUsersTbl ac ON
>>> c.sellerId = ac.userId
>>>LEFT JOIN uniqueUsersTbl ad
>>> ON c.buyerId = ad.userId");
>>>
>>> Could someone please help me understand how I can avoid these duplicate
>>> tasks?
>>>
>>>
>>> Thanks,
>>> R Kandoji
>>>
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: Duplicate tasks for the same query

2019-12-30 Thread RKandoji
Thanks Terry and Jingsong,

Currently I'm on 1.8 version using Flink planner for stream proessing, I'll
switch to 1.9 version to try out blink planner.

Could you please point me to any examples (Java preferred) using
SubplanReuser?

Thanks,
RK

On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li  wrote:

> Hi RKandoji,
>
> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>
>Join  Join
>  /  \  /  \
>  Filter1  Filter2  Filter1  Filter2
> ||=>   \ /
>  Project1 Project2Project1
> ||   |
>   Scan1Scan2   Scan1
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>
>> Hi RKandoji~
>>
>> Could you provide more info about your poc environment?
>> Stream or batch? Flink planner or blink planner?
>> AFAIK, blink planner has done some optimization to deal such duplicate
>> task for one same query. You can have a try with blink planner :
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 2019年12月30日 03:07,RKandoji  写道:
>>
>> Hi Team,
>>
>> I'm doing a POC with flink to understand if it's a good fit for my use
>> case.
>>
>> As part of the process, I need to filter duplicate items and created
>> below query to get only the latest records based on timestamp. For
>> instance, I have "Users" table which may contain multiple messages for the
>> same "userId". So I wrote below query to get only the latest message for a
>> given "userId"
>>
>> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
>> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
>> userId)");
>>
>> The above query works as expected and contains only the latest users
>> based on timestamp.
>>
>> The issue is when I use "uniqueUsers" table multiple times in a JOIN
>> operation, I see multiple tasks in the flink dashboard for the same query
>> that is creating "uniqueUsers" table. It is simply creating as many tasks
>> as many times I'm using the table.
>>
>> Below is the JOIN query.
>> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
>> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>>LEFT JOIN uniqueUsersTbl aa ON
>> c.userId = aa.userId
>>LEFT JOIN uniqueUsersTbl ab
>> ON c.ownerId = ab.userId
>>LEFT JOIN uniqueUsersTbl ac ON
>> c.sellerId = ac.userId
>>LEFT JOIN uniqueUsersTbl ad
>> ON c.buyerId = ad.userId");
>>
>> Could someone please help me understand how I can avoid these duplicate
>> tasks?
>>
>>
>> Thanks,
>> R Kandoji
>>
>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Duplicate tasks for the same query

2019-12-29 Thread Jingsong Li
Hi RKandoji,

FYI: Blink-planner subplan reusing: [1] 1.9 available.

   Join  Join
 /  \  /  \
 Filter1  Filter2  Filter1  Filter2
||=>   \ /
 Project1 Project2Project1
||   |
  Scan1Scan2   Scan1


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

Best,
Jingsong Lee

On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such duplicate
> task for one same query. You can have a try with blink planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my use
> case.
>
> As part of the process, I need to filter duplicate items and created below
> query to get only the latest records based on timestamp. For instance, I
> have "Users" table which may contain multiple messages for the same
> "userId". So I wrote below query to get only the latest message for a given
> "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
> userId)");
>
> The above query works as expected and contains only the latest users based
> on timestamp.
>
> The issue is when I use "uniqueUsers" table multiple times in a JOIN
> operation, I see multiple tasks in the flink dashboard for the same query
> that is creating "uniqueUsers" table. It is simply creating as many tasks
> as many times I'm using the table.
>
> Below is the JOIN query.
> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>LEFT JOIN uniqueUsersTbl aa ON
> c.userId = aa.userId
>LEFT JOIN uniqueUsersTbl ab
> ON c.ownerId = ab.userId
>LEFT JOIN uniqueUsersTbl ac ON
> c.sellerId = ac.userId
>LEFT JOIN uniqueUsersTbl ad
> ON c.buyerId = ad.userId");
>
> Could someone please help me understand how I can avoid these duplicate
> tasks?
>
>
> Thanks,
> R Kandoji
>
>
>

-- 
Best, Jingsong Lee


Re: Duplicate tasks for the same query

2019-12-29 Thread Terry Wang
Hi RKandoji~

Could you provide more info about your poc environment? 
Stream or batch? Flink planner or blink planner?
AFAIK, blink planner has done some optimization to deal such duplicate task for 
one same query. You can have a try with blink planner : 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
 


Best,
Terry Wang



> 2019年12月30日 03:07,RKandoji  写道:
> 
> Hi Team,
> 
> I'm doing a POC with flink to understand if it's a good fit for my use case. 
> 
> As part of the process, I need to filter duplicate items and created below 
> query to get only the latest records based on timestamp. For instance, I have 
> "Users" table which may contain multiple messages for the same "userId". So I 
> wrote below query to get only the latest message for a given "userId"
> 
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId, 
> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY 
> userId)");
> 
> The above query works as expected and contains only the latest users based on 
> timestamp.
> 
> The issue is when I use "uniqueUsers" table multiple times in a JOIN 
> operation, I see multiple tasks in the flink dashboard for the same query 
> that is creating "uniqueUsers" table. It is simply creating as many tasks as 
> many times I'm using the table.
> 
> Below is the JOIN query.
> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c 
>LEFT JOIN uniqueUsersTbl aa ON 
> c.userId = aa.userId 
>LEFT JOIN uniqueUsersTbl ab ON 
> c.ownerId = ab.userId 
>LEFT JOIN uniqueUsersTbl ac ON 
> c.sellerId = ac.userId 
>LEFT JOIN uniqueUsersTbl ad ON 
> c.buyerId = ad.userId");
> 
> Could someone please help me understand how I can avoid these duplicate tasks?
> 
> 
> Thanks,
> R Kandoji



Fwd: Duplicate tasks for the same query

2019-12-29 Thread RKandoji
Hi Team,

I'm doing a POC with flink to understand if it's a good fit for my use
case.

As part of the process, I need to filter duplicate items and created below
query to get only the latest records based on timestamp. For instance, I
have "Users" table which may contain multiple messages for the same
"userId". So I wrote below query to get only the latest message for a given
"userId"

Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
userId)");

The above query works as expected and contains only the latest users based
on timestamp.

The issue is when I use "uniqueUsers" table multiple times in a JOIN
operation, I see multiple tasks in the flink dashboard for the same query
that is creating "uniqueUsers" table. It is simply creating as many tasks
as many times I'm using the table.

Below is the JOIN query.
tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
   LEFT JOIN uniqueUsersTbl aa ON
c.userId = aa.userId
   LEFT JOIN uniqueUsersTbl ab
ON c.ownerId = ab.userId
   LEFT JOIN uniqueUsersTbl ac ON
c.sellerId = ac.userId
   LEFT JOIN uniqueUsersTbl ad
ON c.buyerId = ad.userId");

Could someone please help me understand how I can avoid these duplicate
tasks?


Thanks,
R Kandoji