Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Till Rohrmann
In all cases (session and per-job mode cluster) except for the JM recovery
of the application mode [1], the main() function only runs once in order to
generate the JobGraph which is sent to the cluster and which is also used
for recoveries.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/#application-mode

Cheers,
Till

On Mon, Nov 23, 2020 at 3:24 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> The source will poll the state of T operator periodicly. The it find the
> offset is 0 then it can fallback to latest committed offset.
>
> Till Rohrmann  于2020年11月23日周一 下午9:35写道:
>
>> Hi Si-li Liu,
>>
>> if you want to run T with a parallelism of 1, then your parallelism of A
>> should be limited by the total number of slots on your TM. Otherwise you
>> would have some A_i which are not running on a machine with T.
>>
>> For the approach with the colocation constraint, you can take a look at
>> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
>> operators whose sub tasks need to run on the same machine (e.g. A_i runs
>> together with B_i on the same machine, even in the same slot). However,
>> this is pretty much an internal feature which might change in future
>> versions.
>>
>> What I did not fully understand is what should happen if your TM dies.
>> Wouldn't then the information of T be lost and the sources would start from
>> offset 0 again? According to your explanation, this should be intolerable
>> given the business requirements.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>>
>> Cheers,
>> Till
>>
>> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:
>>
>>> If you would prefer to have T with parallelism 1, one complete
>>> alternative solution would be to leave the timestamp in the state of T and
>>> extract the timestamp from the savepoint/checkpoint upon start of the
>>> application using the state processor API [1]. Unfortunately, it may be a
>>> bit hacky when you do a normal recovery as there is not a single entrypoint
>>> (if you start new you could just extract that timestamp from main()). Of
>>> course, you could also store the information in an external storage but
>>> that would also make the architecture more complicated.
>>>
>>> Let's see if anyone has an idea on the co-location topic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>>>
 Thanks for your reply!

 Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
 should have 1 parallism in topo, also all A_i can start from the same
 timestamp, but some minor difference of resume timestamp in different A_i
 source is also acceptable. So I think multiple T operator is also ok to me
 here. But the prerequisite of this topo can work is I can make sure T and A
 always reside same TM.

 The problem here both stream A and stream B is very huge. 200k ~ 300k
 messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
 compressed) per messages, and I have to keep the whole message in cache. So
 it's hard to fit into Flink state.



 Arvid Heise  于2020年11月21日周六 上午3:35写道:

> Your topology is definitively interesting and makes sense to me on a
> high level. The main question remaining is the parallelism. I'm assuming
> you run your pipeline with parallelism p and both source A and
> timestampcalculator T are run with parallelism p. You want to create a
> situation where for A_i, there is an T_i which run in the same slot. Am I
> right?
>
> If so, then as you have noticed that there is currently no way to
> express that in Flink on a high level. One more idea before trying to 
> solve
> it in a hacky way: How large is B? Could use a broadcast to avoid the
> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
> because then it's easy to produce an operator chain, where everything even
> runs within the same thread.
>
> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> I want to join two stream A and stream B. Items in stream A come in
>> first then I keep them in memory cache, as join key and item, then serval
>> minutes later the items in stream B come in then the join work is
>> performed. The timestamp of the latest expired item in memory cache is 
>> the
>> safe rollback timestamp, I can resume source A from that timestamp when I
>> restart.
>>
>> It's not very percise, maybe lost same items or send same items
>> twice, but seems useful to me in my situation. But if job restart, both
>> source A and source B resume from last consumed offset, it will make the
>> absense of serval minutes join result, which is unacceptable.

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Si-li Liu
Thanks for your reply.

The source will poll the state of T operator periodicly. The it find the
offset is 0 then it can fallback to latest committed offset.

Till Rohrmann  于2020年11月23日周一 下午9:35写道:

> Hi Si-li Liu,
>
> if you want to run T with a parallelism of 1, then your parallelism of A
> should be limited by the total number of slots on your TM. Otherwise you
> would have some A_i which are not running on a machine with T.
>
> For the approach with the colocation constraint, you can take a look at
> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
> operators whose sub tasks need to run on the same machine (e.g. A_i runs
> together with B_i on the same machine, even in the same slot). However,
> this is pretty much an internal feature which might change in future
> versions.
>
> What I did not fully understand is what should happen if your TM dies.
> Wouldn't then the information of T be lost and the sources would start from
> offset 0 again? According to your explanation, this should be intolerable
> given the business requirements.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>
> Cheers,
> Till
>
> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:
>
>> If you would prefer to have T with parallelism 1, one complete
>> alternative solution would be to leave the timestamp in the state of T and
>> extract the timestamp from the savepoint/checkpoint upon start of the
>> application using the state processor API [1]. Unfortunately, it may be a
>> bit hacky when you do a normal recovery as there is not a single entrypoint
>> (if you start new you could just extract that timestamp from main()). Of
>> course, you could also store the information in an external storage but
>> that would also make the architecture more complicated.
>>
>> Let's see if anyone has an idea on the co-location topic.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>>
>>> Thanks for your reply!
>>>
>>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>>> should have 1 parallism in topo, also all A_i can start from the same
>>> timestamp, but some minor difference of resume timestamp in different A_i
>>> source is also acceptable. So I think multiple T operator is also ok to me
>>> here. But the prerequisite of this topo can work is I can make sure T and A
>>> always reside same TM.
>>>
>>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>>> compressed) per messages, and I have to keep the whole message in cache. So
>>> it's hard to fit into Flink state.
>>>
>>>
>>>
>>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>>
 Your topology is definitively interesting and makes sense to me on a
 high level. The main question remaining is the parallelism. I'm assuming
 you run your pipeline with parallelism p and both source A and
 timestampcalculator T are run with parallelism p. You want to create a
 situation where for A_i, there is an T_i which run in the same slot. Am I
 right?

 If so, then as you have noticed that there is currently no way to
 express that in Flink on a high level. One more idea before trying to solve
 it in a hacky way: How large is B? Could use a broadcast to avoid the
 shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
 because then it's easy to produce an operator chain, where everything even
 runs within the same thread.

 On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> I want to join two stream A and stream B. Items in stream A come in
> first then I keep them in memory cache, as join key and item, then serval
> minutes later the items in stream B come in then the join work is
> performed. The timestamp of the latest expired item in memory cache is the
> safe rollback timestamp, I can resume source A from that timestamp when I
> restart.
>
> It's not very percise, maybe lost same items or send same items twice,
> but seems useful to me in my situation. But if job restart, both source A
> and source B resume from last consumed offset, it will make the absense of
> serval minutes join result, which is unacceptable.
>
> The topo I consider is like
>
> source A -> parser --shuffle--> join -> sink
> source B -> parser ...(parallel)  |--->timestampcalculator
>
> Memory cache aside in join operator, the join operator will broadcast
> the timestamp of latest expired cache item to the timestampcalculator. 
> Then
> timestampcalculator will use them to calculate a safe rollback timestamp 
> (a
> moving minimum) that source A can resume from that timestamp, source B 
> will

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Si-li Liu
Thanks for your reply!

Seems state processor api can solve my problem, the state written by T
operator's checkpoint can be read by main function when job restart. My
question is, when streaming job restarts due to some reason, does the main
function will also rerun again?

Arvid Heise  于2020年11月23日周一 下午6:00写道:

> If you would prefer to have T with parallelism 1, one complete alternative
> solution would be to leave the timestamp in the state of T and extract the
> timestamp from the savepoint/checkpoint upon start of the application using
> the state processor API [1]. Unfortunately, it may be a bit hacky when you
> do a normal recovery as there is not a single entrypoint (if you start new
> you could just extract that timestamp from main()). Of course, you could
> also store the information in an external storage but that would also make
> the architecture more complicated.
>
> Let's see if anyone has an idea on the co-location topic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>
>> Thanks for your reply!
>>
>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>> should have 1 parallism in topo, also all A_i can start from the same
>> timestamp, but some minor difference of resume timestamp in different A_i
>> source is also acceptable. So I think multiple T operator is also ok to me
>> here. But the prerequisite of this topo can work is I can make sure T and A
>> always reside same TM.
>>
>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>> compressed) per messages, and I have to keep the whole message in cache. So
>> it's hard to fit into Flink state.
>>
>>
>>
>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>
>>> Your topology is definitively interesting and makes sense to me on a
>>> high level. The main question remaining is the parallelism. I'm assuming
>>> you run your pipeline with parallelism p and both source A and
>>> timestampcalculator T are run with parallelism p. You want to create a
>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>> right?
>>>
>>> If so, then as you have noticed that there is currently no way to
>>> express that in Flink on a high level. One more idea before trying to solve
>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>> because then it's easy to produce an operator chain, where everything even
>>> runs within the same thread.
>>>
>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 I want to join two stream A and stream B. Items in stream A come in
 first then I keep them in memory cache, as join key and item, then serval
 minutes later the items in stream B come in then the join work is
 performed. The timestamp of the latest expired item in memory cache is the
 safe rollback timestamp, I can resume source A from that timestamp when I
 restart.

 It's not very percise, maybe lost same items or send same items twice,
 but seems useful to me in my situation. But if job restart, both source A
 and source B resume from last consumed offset, it will make the absense of
 serval minutes join result, which is unacceptable.

 The topo I consider is like

 source A -> parser --shuffle--> join -> sink
 source B -> parser ...(parallel)  |--->timestampcalculator

 Memory cache aside in join operator, the join operator will broadcast
 the timestamp of latest expired cache item to the timestampcalculator. Then
 timestampcalculator will use them to calculate a safe rollback timestamp (a
 moving minimum) that source A can resume from that timestamp, source B will
 also restart from that timestamp. I will add a bloomfilter in sink's state
 to avoid duplicate items.

 So I want to let timestampcalculator operator and source A are located
 in one TM, then I can send this timestamp from timestampcalculator to
 source A by static variable.

 Hope I make my problem clear with my poor English, it seems a little
 tricky. But I think it's the only way to do two streams join and avoid to
 store very huge state.



 Arvid Heise  于2020年11月20日周五 下午2:58写道:

> I still haven't fully understood. Do you mean you can't infer the
> timestamp in source A because it depends on some internal field of source 
> B?
>
> How is that actually working in a parallel setting? Which timestamp is
> used in the different instances of a source?
>
> Say, we have task A1 which is the first subtask of source A and task
> B2 as the second subtask of source B. How would you like them to be
> located? How does that correlate to the third subtask of the 

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Till Rohrmann
Hi Si-li Liu,

if you want to run T with a parallelism of 1, then your parallelism of A
should be limited by the total number of slots on your TM. Otherwise you
would have some A_i which are not running on a machine with T.

For the approach with the colocation constraint, you can take a look at
Transformation.setCoLocationGroupKey() [1]. Using this API one can define
operators whose sub tasks need to run on the same machine (e.g. A_i runs
together with B_i on the same machine, even in the same slot). However,
this is pretty much an internal feature which might change in future
versions.

What I did not fully understand is what should happen if your TM dies.
Wouldn't then the information of T be lost and the sources would start from
offset 0 again? According to your explanation, this should be intolerable
given the business requirements.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426

Cheers,
Till

On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:

> If you would prefer to have T with parallelism 1, one complete alternative
> solution would be to leave the timestamp in the state of T and extract the
> timestamp from the savepoint/checkpoint upon start of the application using
> the state processor API [1]. Unfortunately, it may be a bit hacky when you
> do a normal recovery as there is not a single entrypoint (if you start new
> you could just extract that timestamp from main()). Of course, you could
> also store the information in an external storage but that would also make
> the architecture more complicated.
>
> Let's see if anyone has an idea on the co-location topic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>
>> Thanks for your reply!
>>
>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>> should have 1 parallism in topo, also all A_i can start from the same
>> timestamp, but some minor difference of resume timestamp in different A_i
>> source is also acceptable. So I think multiple T operator is also ok to me
>> here. But the prerequisite of this topo can work is I can make sure T and A
>> always reside same TM.
>>
>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>> compressed) per messages, and I have to keep the whole message in cache. So
>> it's hard to fit into Flink state.
>>
>>
>>
>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>
>>> Your topology is definitively interesting and makes sense to me on a
>>> high level. The main question remaining is the parallelism. I'm assuming
>>> you run your pipeline with parallelism p and both source A and
>>> timestampcalculator T are run with parallelism p. You want to create a
>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>> right?
>>>
>>> If so, then as you have noticed that there is currently no way to
>>> express that in Flink on a high level. One more idea before trying to solve
>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>> because then it's easy to produce an operator chain, where everything even
>>> runs within the same thread.
>>>
>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 I want to join two stream A and stream B. Items in stream A come in
 first then I keep them in memory cache, as join key and item, then serval
 minutes later the items in stream B come in then the join work is
 performed. The timestamp of the latest expired item in memory cache is the
 safe rollback timestamp, I can resume source A from that timestamp when I
 restart.

 It's not very percise, maybe lost same items or send same items twice,
 but seems useful to me in my situation. But if job restart, both source A
 and source B resume from last consumed offset, it will make the absense of
 serval minutes join result, which is unacceptable.

 The topo I consider is like

 source A -> parser --shuffle--> join -> sink
 source B -> parser ...(parallel)  |--->timestampcalculator

 Memory cache aside in join operator, the join operator will broadcast
 the timestamp of latest expired cache item to the timestampcalculator. Then
 timestampcalculator will use them to calculate a safe rollback timestamp (a
 moving minimum) that source A can resume from that timestamp, source B will
 also restart from that timestamp. I will add a bloomfilter in sink's state
 to avoid duplicate items.

 So I want to let timestampcalculator operator and source A are located
 in one TM, then I can send this timestamp from timestampcalculator to
 source A by static variable.

 Hope I make my problem clear 

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Arvid Heise
If you would prefer to have T with parallelism 1, one complete alternative
solution would be to leave the timestamp in the state of T and extract the
timestamp from the savepoint/checkpoint upon start of the application using
the state processor API [1]. Unfortunately, it may be a bit hacky when you
do a normal recovery as there is not a single entrypoint (if you start new
you could just extract that timestamp from main()). Of course, you could
also store the information in an external storage but that would also make
the architecture more complicated.

Let's see if anyone has an idea on the co-location topic.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:

> Thanks for your reply!
>
> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
> should have 1 parallism in topo, also all A_i can start from the same
> timestamp, but some minor difference of resume timestamp in different A_i
> source is also acceptable. So I think multiple T operator is also ok to me
> here. But the prerequisite of this topo can work is I can make sure T and A
> always reside same TM.
>
> The problem here both stream A and stream B is very huge. 200k ~ 300k
> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
> compressed) per messages, and I have to keep the whole message in cache. So
> it's hard to fit into Flink state.
>
>
>
> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>
>> Your topology is definitively interesting and makes sense to me on a high
>> level. The main question remaining is the parallelism. I'm assuming you run
>> your pipeline with parallelism p and both source A and timestampcalculator
>> T are run with parallelism p. You want to create a situation where for A_i,
>> there is an T_i which run in the same slot. Am I right?
>>
>> If so, then as you have noticed that there is currently no way to express
>> that in Flink on a high level. One more idea before trying to solve it in a
>> hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
>> I'm thinking of creating a pipeline A->J(side input B)->T, because then
>> it's easy to produce an operator chain, where everything even runs within
>> the same thread.
>>
>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>
>>> Thanks for your reply.
>>>
>>> I want to join two stream A and stream B. Items in stream A come in
>>> first then I keep them in memory cache, as join key and item, then serval
>>> minutes later the items in stream B come in then the join work is
>>> performed. The timestamp of the latest expired item in memory cache is the
>>> safe rollback timestamp, I can resume source A from that timestamp when I
>>> restart.
>>>
>>> It's not very percise, maybe lost same items or send same items twice,
>>> but seems useful to me in my situation. But if job restart, both source A
>>> and source B resume from last consumed offset, it will make the absense of
>>> serval minutes join result, which is unacceptable.
>>>
>>> The topo I consider is like
>>>
>>> source A -> parser --shuffle--> join -> sink
>>> source B -> parser ...(parallel)  |--->timestampcalculator
>>>
>>> Memory cache aside in join operator, the join operator will broadcast
>>> the timestamp of latest expired cache item to the timestampcalculator. Then
>>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>>> moving minimum) that source A can resume from that timestamp, source B will
>>> also restart from that timestamp. I will add a bloomfilter in sink's state
>>> to avoid duplicate items.
>>>
>>> So I want to let timestampcalculator operator and source A are located
>>> in one TM, then I can send this timestamp from timestampcalculator to
>>> source A by static variable.
>>>
>>> Hope I make my problem clear with my poor English, it seems a little
>>> tricky. But I think it's the only way to do two streams join and avoid to
>>> store very huge state.
>>>
>>>
>>>
>>> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>>>
 I still haven't fully understood. Do you mean you can't infer the
 timestamp in source A because it depends on some internal field of source 
 B?

 How is that actually working in a parallel setting? Which timestamp is
 used in the different instances of a source?

 Say, we have task A1 which is the first subtask of source A and task B2
 as the second subtask of source B. How would you like them to be located?
 How does that correlate to the third subtask of the join (let's call it 
 J3).

 Remember that through the shuffling before the join there is no clear
 correlation between any subtask of A or B to J...

 On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:

> Thanks for your help!
>
> Now the timestamps already go with the items in streaming. My
> streaming pipeline is like this:
>
> source -> parser --shuffle--> join -> sink

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply!

Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should
have 1 parallism in topo, also all A_i can start from the same timestamp,
but some minor difference of resume timestamp in different A_i source is
also acceptable. So I think multiple T operator is also ok to me here. But
the prerequisite of this topo can work is I can make sure T and A always
reside same TM.

The problem here both stream A and stream B is very huge. 200k ~ 300k
messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
compressed) per messages, and I have to keep the whole message in cache. So
it's hard to fit into Flink state.



Arvid Heise  于2020年11月21日周六 上午3:35写道:

> Your topology is definitively interesting and makes sense to me on a high
> level. The main question remaining is the parallelism. I'm assuming you run
> your pipeline with parallelism p and both source A and timestampcalculator
> T are run with parallelism p. You want to create a situation where for A_i,
> there is an T_i which run in the same slot. Am I right?
>
> If so, then as you have noticed that there is currently no way to express
> that in Flink on a high level. One more idea before trying to solve it in a
> hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
> I'm thinking of creating a pipeline A->J(side input B)->T, because then
> it's easy to produce an operator chain, where everything even runs within
> the same thread.
>
> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> I want to join two stream A and stream B. Items in stream A come in first
>> then I keep them in memory cache, as join key and item, then serval
>> minutes later the items in stream B come in then the join work is
>> performed. The timestamp of the latest expired item in memory cache is the
>> safe rollback timestamp, I can resume source A from that timestamp when I
>> restart.
>>
>> It's not very percise, maybe lost same items or send same items twice,
>> but seems useful to me in my situation. But if job restart, both source A
>> and source B resume from last consumed offset, it will make the absense of
>> serval minutes join result, which is unacceptable.
>>
>> The topo I consider is like
>>
>> source A -> parser --shuffle--> join -> sink
>> source B -> parser ...(parallel)  |--->timestampcalculator
>>
>> Memory cache aside in join operator, the join operator will broadcast the
>> timestamp of latest expired cache item to the timestampcalculator. Then
>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>> moving minimum) that source A can resume from that timestamp, source B will
>> also restart from that timestamp. I will add a bloomfilter in sink's state
>> to avoid duplicate items.
>>
>> So I want to let timestampcalculator operator and source A are located in
>> one TM, then I can send this timestamp from timestampcalculator to source A
>> by static variable.
>>
>> Hope I make my problem clear with my poor English, it seems a little
>> tricky. But I think it's the only way to do two streams join and avoid to
>> store very huge state.
>>
>>
>>
>> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>>
>>> I still haven't fully understood. Do you mean you can't infer the
>>> timestamp in source A because it depends on some internal field of source B?
>>>
>>> How is that actually working in a parallel setting? Which timestamp is
>>> used in the different instances of a source?
>>>
>>> Say, we have task A1 which is the first subtask of source A and task B2
>>> as the second subtask of source B. How would you like them to be located?
>>> How does that correlate to the third subtask of the join (let's call it J3).
>>>
>>> Remember that through the shuffling before the join there is no clear
>>> correlation between any subtask of A or B to J...
>>>
>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>>>
 Thanks for your help!

 Now the timestamps already go with the items in streaming. My streaming
 pipeline is like this:

 source -> parser --shuffle--> join -> sink

 Streaming A and streaming B go through this pipeline, I keep logs in
 streaming A in memory cache (linkedHashmap) in join operator, then all logs
 in streaming B tries to lookup up the cache and perform the actual join
 work.

 I try to use the timestamp of the lastest expire item in memory as a
 safe rollback timestamp, if I restart job, the source should use this
 timestamp as start offset. The safe rollback timestamp is calucated in join
 operator, but I want to use it in source. So the simplest way to pass this
 information from join operator to source is use static variable, which
 require source operator and join operator always locate in same TM process.

 Arvid Heise  于2020年11月20日周五 上午3:33写道:

> Hi Si-li,
>
> couldn't you also add the timestamp as a state to the source? So the
> time 

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Arvid Heise
Your topology is definitively interesting and makes sense to me on a high
level. The main question remaining is the parallelism. I'm assuming you run
your pipeline with parallelism p and both source A and timestampcalculator
T are run with parallelism p. You want to create a situation where for A_i,
there is an T_i which run in the same slot. Am I right?

If so, then as you have noticed that there is currently no way to express
that in Flink on a high level. One more idea before trying to solve it in a
hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
I'm thinking of creating a pipeline A->J(side input B)->T, because then
it's easy to produce an operator chain, where everything even runs within
the same thread.

On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> I want to join two stream A and stream B. Items in stream A come in first
> then I keep them in memory cache, as join key and item, then serval
> minutes later the items in stream B come in then the join work is
> performed. The timestamp of the latest expired item in memory cache is the
> safe rollback timestamp, I can resume source A from that timestamp when I
> restart.
>
> It's not very percise, maybe lost same items or send same items twice, but
> seems useful to me in my situation. But if job restart, both source A and
> source B resume from last consumed offset, it will make the absense of
> serval minutes join result, which is unacceptable.
>
> The topo I consider is like
>
> source A -> parser --shuffle--> join -> sink
> source B -> parser ...(parallel)  |--->timestampcalculator
>
> Memory cache aside in join operator, the join operator will broadcast the
> timestamp of latest expired cache item to the timestampcalculator. Then
> timestampcalculator will use them to calculate a safe rollback timestamp (a
> moving minimum) that source A can resume from that timestamp, source B will
> also restart from that timestamp. I will add a bloomfilter in sink's state
> to avoid duplicate items.
>
> So I want to let timestampcalculator operator and source A are located in
> one TM, then I can send this timestamp from timestampcalculator to source A
> by static variable.
>
> Hope I make my problem clear with my poor English, it seems a little
> tricky. But I think it's the only way to do two streams join and avoid to
> store very huge state.
>
>
>
> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>
>> I still haven't fully understood. Do you mean you can't infer the
>> timestamp in source A because it depends on some internal field of source B?
>>
>> How is that actually working in a parallel setting? Which timestamp is
>> used in the different instances of a source?
>>
>> Say, we have task A1 which is the first subtask of source A and task B2
>> as the second subtask of source B. How would you like them to be located?
>> How does that correlate to the third subtask of the join (let's call it J3).
>>
>> Remember that through the shuffling before the join there is no clear
>> correlation between any subtask of A or B to J...
>>
>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>>
>>> Thanks for your help!
>>>
>>> Now the timestamps already go with the items in streaming. My streaming
>>> pipeline is like this:
>>>
>>> source -> parser --shuffle--> join -> sink
>>>
>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>>> in streaming B tries to lookup up the cache and perform the actual join
>>> work.
>>>
>>> I try to use the timestamp of the lastest expire item in memory as a
>>> safe rollback timestamp, if I restart job, the source should use this
>>> timestamp as start offset. The safe rollback timestamp is calucated in join
>>> operator, but I want to use it in source. So the simplest way to pass this
>>> information from join operator to source is use static variable, which
>>> require source operator and join operator always locate in same TM process.
>>>
>>> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>>>
 Hi Si-li,

 couldn't you also add the timestamp as a state to the source? So the
 time would store the timestamp of the last emitted record.
 It's nearly identical to your solution but would fit the recovery model
 of Flink much better.
 If you want to go further back to account for the records that have
 been actually processed in the join, you could also replay the data from
  - .

 On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:

> Thanks, I'll try it.
>
> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>
>> Hi Si-li,
>> trying to answer your initial question: Theoretically, you could try
>> using the co-location constraints to achieve this. But keep in mind that
>> this might lead to multiple Join operators running in the same JVM 
>> reducing
>> the amount of memory each operator can utilize.
>>
>> Best,

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply.

I want to join two stream A and stream B. Items in stream A come in first
then I keep them in memory cache, as join key and item, then serval
minutes later the items in stream B come in then the join work is
performed. The timestamp of the latest expired item in memory cache is the
safe rollback timestamp, I can resume source A from that timestamp when I
restart.

It's not very percise, maybe lost same items or send same items twice, but
seems useful to me in my situation. But if job restart, both source A and
source B resume from last consumed offset, it will make the absense of
serval minutes join result, which is unacceptable.

The topo I consider is like

source A -> parser --shuffle--> join -> sink
source B -> parser ...(parallel)  |--->timestampcalculator

Memory cache aside in join operator, the join operator will broadcast the
timestamp of latest expired cache item to the timestampcalculator. Then
timestampcalculator will use them to calculate a safe rollback timestamp (a
moving minimum) that source A can resume from that timestamp, source B will
also restart from that timestamp. I will add a bloomfilter in sink's state
to avoid duplicate items.

So I want to let timestampcalculator operator and source A are located in
one TM, then I can send this timestamp from timestampcalculator to source A
by static variable.

Hope I make my problem clear with my poor English, it seems a little
tricky. But I think it's the only way to do two streams join and avoid to
store very huge state.



Arvid Heise  于2020年11月20日周五 下午2:58写道:

> I still haven't fully understood. Do you mean you can't infer the
> timestamp in source A because it depends on some internal field of source B?
>
> How is that actually working in a parallel setting? Which timestamp is
> used in the different instances of a source?
>
> Say, we have task A1 which is the first subtask of source A and task B2 as
> the second subtask of source B. How would you like them to be located? How
> does that correlate to the third subtask of the join (let's call it J3).
>
> Remember that through the shuffling before the join there is no clear
> correlation between any subtask of A or B to J...
>
> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>
>> Thanks for your help!
>>
>> Now the timestamps already go with the items in streaming. My streaming
>> pipeline is like this:
>>
>> source -> parser --shuffle--> join -> sink
>>
>> Streaming A and streaming B go through this pipeline, I keep logs in
>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>> in streaming B tries to lookup up the cache and perform the actual join
>> work.
>>
>> I try to use the timestamp of the lastest expire item in memory as a safe
>> rollback timestamp, if I restart job, the source should use this timestamp
>> as start offset. The safe rollback timestamp is calucated in join operator,
>> but I want to use it in source. So the simplest way to pass this
>> information from join operator to source is use static variable, which
>> require source operator and join operator always locate in same TM process.
>>
>> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>>
>>> Hi Si-li,
>>>
>>> couldn't you also add the timestamp as a state to the source? So the
>>> time would store the timestamp of the last emitted record.
>>> It's nearly identical to your solution but would fit the recovery model
>>> of Flink much better.
>>> If you want to go further back to account for the records that have been
>>> actually processed in the join, you could also replay the data from >> timestamp> - .
>>>
>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>>>
 Thanks, I'll try it.

 Matthias Pohl  于2020年11月14日周六 上午12:53写道:

> Hi Si-li,
> trying to answer your initial question: Theoretically, you could try
> using the co-location constraints to achieve this. But keep in mind that
> this might lead to multiple Join operators running in the same JVM 
> reducing
> the amount of memory each operator can utilize.
>
> Best,
> Matthias
>
> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> It's a streaming job. The join operator is doing join work, such as
>> join. The join state is too large so I don't want to keep the state using
>> the mechanism that Flink provided, and also I don't need very precise 
>> join.
>> So I prefer to let the join operator to calculate a backward timestamp as
>> state, if the cluster restarts, the consumer can use 
>> setStartFromTimestamp
>> to start from that timestamp.
>>
>> Now my problem is, consumer can't read the state that join operator
>> written, so I need a way to need small message (64bit long) from 
>> downstream
>> to upstream. Redis may be a solution, but add external  dependency is a
>> secondary option if I can pass this message through memory.
>>
>>
>> 

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
I still haven't fully understood. Do you mean you can't infer the timestamp
in source A because it depends on some internal field of source B?

How is that actually working in a parallel setting? Which timestamp is used
in the different instances of a source?

Say, we have task A1 which is the first subtask of source A and task B2 as
the second subtask of source B. How would you like them to be located? How
does that correlate to the third subtask of the join (let's call it J3).

Remember that through the shuffling before the join there is no clear
correlation between any subtask of A or B to J...

On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:

> Thanks for your help!
>
> Now the timestamps already go with the items in streaming. My streaming
> pipeline is like this:
>
> source -> parser --shuffle--> join -> sink
>
> Streaming A and streaming B go through this pipeline, I keep logs in
> streaming A in memory cache (linkedHashmap) in join operator, then all logs
> in streaming B tries to lookup up the cache and perform the actual join
> work.
>
> I try to use the timestamp of the lastest expire item in memory as a safe
> rollback timestamp, if I restart job, the source should use this timestamp
> as start offset. The safe rollback timestamp is calucated in join operator,
> but I want to use it in source. So the simplest way to pass this
> information from join operator to source is use static variable, which
> require source operator and join operator always locate in same TM process.
>
> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>
>> Hi Si-li,
>>
>> couldn't you also add the timestamp as a state to the source? So the time
>> would store the timestamp of the last emitted record.
>> It's nearly identical to your solution but would fit the recovery model
>> of Flink much better.
>> If you want to go further back to account for the records that have been
>> actually processed in the join, you could also replay the data from > timestamp> - .
>>
>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>>
>>> Thanks, I'll try it.
>>>
>>> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>>>
 Hi Si-li,
 trying to answer your initial question: Theoretically, you could try
 using the co-location constraints to achieve this. But keep in mind that
 this might lead to multiple Join operators running in the same JVM reducing
 the amount of memory each operator can utilize.

 Best,
 Matthias

 On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:

> Thanks for your reply.
>
> It's a streaming job. The join operator is doing join work, such as
> join. The join state is too large so I don't want to keep the state using
> the mechanism that Flink provided, and also I don't need very precise 
> join.
> So I prefer to let the join operator to calculate a backward timestamp as
> state, if the cluster restarts, the consumer can use setStartFromTimestamp
> to start from that timestamp.
>
> Now my problem is, consumer can't read the state that join operator
> written, so I need a way to need small message (64bit long) from 
> downstream
> to upstream. Redis may be a solution, but add external  dependency is a
> secondary option if I can pass this message through memory.
>
>
> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>
>> It would be good if you could elaborate a bit more on your use-case.
>> Are you using batch or streaming? What kind of "message" are we
>> talking about? Why are you thinking of using a static variable, instead 
>> of
>> just treating this message as part of the data(set/stream)?
>>
>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>
>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>> some messages from downstream operators to upstream operators, which I
>> consider use static variable.
>>
>> But it makes me have to make sure in one taskmanager process it
>> always has these two operators, can I use CoLocationGroup to solve this
>> problem? Or can anyone give me an example to demostrate the usage
>> of CoLocationGroup ?
>>
>> Thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>>
>
> --
> Best regards
>
> Sili Liu
>

>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Si-li Liu
Thanks for your help!

Now the timestamps already go with the items in streaming. My streaming
pipeline is like this:

source -> parser --shuffle--> join -> sink

Streaming A and streaming B go through this pipeline, I keep logs in
streaming A in memory cache (linkedHashmap) in join operator, then all logs
in streaming B tries to lookup up the cache and perform the actual join
work.

I try to use the timestamp of the lastest expire item in memory as a safe
rollback timestamp, if I restart job, the source should use this timestamp
as start offset. The safe rollback timestamp is calucated in join operator,
but I want to use it in source. So the simplest way to pass this
information from join operator to source is use static variable, which
require source operator and join operator always locate in same TM process.

Arvid Heise  于2020年11月20日周五 上午3:33写道:

> Hi Si-li,
>
> couldn't you also add the timestamp as a state to the source? So the time
> would store the timestamp of the last emitted record.
> It's nearly identical to your solution but would fit the recovery model of
> Flink much better.
> If you want to go further back to account for the records that have been
> actually processed in the join, you could also replay the data from  timestamp> - .
>
> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>
>> Thanks, I'll try it.
>>
>> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>>
>>> Hi Si-li,
>>> trying to answer your initial question: Theoretically, you could try
>>> using the co-location constraints to achieve this. But keep in mind that
>>> this might lead to multiple Join operators running in the same JVM reducing
>>> the amount of memory each operator can utilize.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 It's a streaming job. The join operator is doing join work, such as
 join. The join state is too large so I don't want to keep the state using
 the mechanism that Flink provided, and also I don't need very precise join.
 So I prefer to let the join operator to calculate a backward timestamp as
 state, if the cluster restarts, the consumer can use setStartFromTimestamp
 to start from that timestamp.

 Now my problem is, consumer can't read the state that join operator
 written, so I need a way to need small message (64bit long) from downstream
 to upstream. Redis may be a solution, but add external  dependency is a
 secondary option if I can pass this message through memory.


 Chesnay Schepler  于2020年11月6日周五 上午7:06写道:

> It would be good if you could elaborate a bit more on your use-case.
> Are you using batch or streaming? What kind of "message" are we
> talking about? Why are you thinking of using a static variable, instead of
> just treating this message as part of the data(set/stream)?
>
> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>
> Currently I use Flink 1.9.1. The actual thing I want to do is send
> some messages from downstream operators to upstream operators, which I
> consider use static variable.
>
> But it makes me have to make sure in one taskmanager process it always
> has these two operators, can I use CoLocationGroup to solve this problem?
> Or can anyone give me an example to demostrate the usage of 
> CoLocationGroup
> ?
>
> Thanks!
> --
> Best regards
>
> Sili Liu
>
>
>

 --
 Best regards

 Sili Liu

>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best regards

Sili Liu


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
Hi Si-li,

couldn't you also add the timestamp as a state to the source? So the time
would store the timestamp of the last emitted record.
It's nearly identical to your solution but would fit the recovery model of
Flink much better.
If you want to go further back to account for the records that have been
actually processed in the join, you could also replay the data from  - .

On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:

> Thanks, I'll try it.
>
> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>
>> Hi Si-li,
>> trying to answer your initial question: Theoretically, you could try
>> using the co-location constraints to achieve this. But keep in mind that
>> this might lead to multiple Join operators running in the same JVM reducing
>> the amount of memory each operator can utilize.
>>
>> Best,
>> Matthias
>>
>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>>
>>> Thanks for your reply.
>>>
>>> It's a streaming job. The join operator is doing join work, such as
>>> join. The join state is too large so I don't want to keep the state using
>>> the mechanism that Flink provided, and also I don't need very precise join.
>>> So I prefer to let the join operator to calculate a backward timestamp as
>>> state, if the cluster restarts, the consumer can use setStartFromTimestamp
>>> to start from that timestamp.
>>>
>>> Now my problem is, consumer can't read the state that join operator
>>> written, so I need a way to need small message (64bit long) from downstream
>>> to upstream. Redis may be a solution, but add external  dependency is a
>>> secondary option if I can pass this message through memory.
>>>
>>>
>>> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>>>
 It would be good if you could elaborate a bit more on your use-case.
 Are you using batch or streaming? What kind of "message" are we talking
 about? Why are you thinking of using a static variable, instead of just
 treating this message as part of the data(set/stream)?

 On 11/5/2020 12:55 PM, Si-li Liu wrote:

 Currently I use Flink 1.9.1. The actual thing I want to do is send some
 messages from downstream operators to upstream operators, which I consider
 use static variable.

 But it makes me have to make sure in one taskmanager process it always
 has these two operators, can I use CoLocationGroup to solve this problem?
 Or can anyone give me an example to demostrate the usage of CoLocationGroup
 ?

 Thanks!
 --
 Best regards

 Sili Liu



>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-15 Thread Si-li Liu
Thanks, I'll try it.

Matthias Pohl  于2020年11月14日周六 上午12:53写道:

> Hi Si-li,
> trying to answer your initial question: Theoretically, you could try using
> the co-location constraints to achieve this. But keep in mind that this
> might lead to multiple Join operators running in the same JVM reducing the
> amount of memory each operator can utilize.
>
> Best,
> Matthias
>
> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> It's a streaming job. The join operator is doing join work, such as join.
>> The join state is too large so I don't want to keep the state using the
>> mechanism that Flink provided, and also I don't need very precise join. So
>> I prefer to let the join operator to calculate a backward timestamp as
>> state, if the cluster restarts, the consumer can use setStartFromTimestamp
>> to start from that timestamp.
>>
>> Now my problem is, consumer can't read the state that join operator
>> written, so I need a way to need small message (64bit long) from downstream
>> to upstream. Redis may be a solution, but add external  dependency is a
>> secondary option if I can pass this message through memory.
>>
>>
>> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>>
>>> It would be good if you could elaborate a bit more on your use-case.
>>> Are you using batch or streaming? What kind of "message" are we talking
>>> about? Why are you thinking of using a static variable, instead of just
>>> treating this message as part of the data(set/stream)?
>>>
>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>
>>> Currently I use Flink 1.9.1. The actual thing I want to do is send some
>>> messages from downstream operators to upstream operators, which I consider
>>> use static variable.
>>>
>>> But it makes me have to make sure in one taskmanager process it always
>>> has these two operators, can I use CoLocationGroup to solve this problem?
>>> Or can anyone give me an example to demostrate the usage of CoLocationGroup
>>> ?
>>>
>>> Thanks!
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-13 Thread Matthias Pohl
Hi Si-li,
trying to answer your initial question: Theoretically, you could try using
the co-location constraints to achieve this. But keep in mind that this
might lead to multiple Join operators running in the same JVM reducing the
amount of memory each operator can utilize.

Best,
Matthias

On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:

> Thanks for your reply.
>
> It's a streaming job. The join operator is doing join work, such as join.
> The join state is too large so I don't want to keep the state using the
> mechanism that Flink provided, and also I don't need very precise join. So
> I prefer to let the join operator to calculate a backward timestamp as
> state, if the cluster restarts, the consumer can use setStartFromTimestamp
> to start from that timestamp.
>
> Now my problem is, consumer can't read the state that join operator
> written, so I need a way to need small message (64bit long) from downstream
> to upstream. Redis may be a solution, but add external  dependency is a
> secondary option if I can pass this message through memory.
>
>
> Chesnay Schepler  于2020年11月6日周五 上午7:06写道:
>
>> It would be good if you could elaborate a bit more on your use-case.
>> Are you using batch or streaming? What kind of "message" are we talking
>> about? Why are you thinking of using a static variable, instead of just
>> treating this message as part of the data(set/stream)?
>>
>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>
>> Currently I use Flink 1.9.1. The actual thing I want to do is send some
>> messages from downstream operators to upstream operators, which I consider
>> use static variable.
>>
>> But it makes me have to make sure in one taskmanager process it always
>> has these two operators, can I use CoLocationGroup to solve this problem?
>> Or can anyone give me an example to demostrate the usage of CoLocationGroup
>> ?
>>
>> Thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>>
>
> --
> Best regards
>
> Sili Liu
>


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-08 Thread Si-li Liu
Thanks for your reply.

It's a streaming job. The join operator is doing join work, such as join.
The join state is too large so I don't want to keep the state using the
mechanism that Flink provided, and also I don't need very precise join. So
I prefer to let the join operator to calculate a backward timestamp as
state, if the cluster restarts, the consumer can use setStartFromTimestamp
to start from that timestamp.

Now my problem is, consumer can't read the state that join operator
written, so I need a way to need small message (64bit long) from downstream
to upstream. Redis may be a solution, but add external  dependency is a
secondary option if I can pass this message through memory.


Chesnay Schepler  于2020年11月6日周五 上午7:06写道:

> It would be good if you could elaborate a bit more on your use-case.
> Are you using batch or streaming? What kind of "message" are we talking
> about? Why are you thinking of using a static variable, instead of just
> treating this message as part of the data(set/stream)?
>
> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>
> Currently I use Flink 1.9.1. The actual thing I want to do is send some
> messages from downstream operators to upstream operators, which I consider
> use static variable.
>
> But it makes me have to make sure in one taskmanager process it always has
> these two operators, can I use CoLocationGroup to solve this problem? Or
> can anyone give me an example to demostrate the usage of CoLocationGroup ?
>
> Thanks!
> --
> Best regards
>
> Sili Liu
>
>
>

-- 
Best regards

Sili Liu


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-05 Thread Chesnay Schepler

It would be good if you could elaborate a bit more on your use-case.
Are you using batch or streaming? What kind of "message" are we talking 
about? Why are you thinking of using a static variable, instead of just 
treating this message as part of the data(set/stream)?


On 11/5/2020 12:55 PM, Si-li Liu wrote:
Currently I use Flink 1.9.1. The actual thing I want to do is send 
some messages from downstream operators to upstream operators, which I 
consider use static variable.


But it makes me have to make sure in one taskmanager process it always 
has these two operators, can I use CoLocationGroup to solve this 
problem? Or can anyone give me an example to demostrate the usage 
of CoLocationGroup ?


Thanks!
--
Best regards

Sili Liu





Is possible that make two operators always locate in same taskmanager?

2020-11-05 Thread Si-li Liu
Currently I use Flink 1.9.1. The actual thing I want to do is send some
messages from downstream operators to upstream operators, which I consider
use static variable.

But it makes me have to make sure in one taskmanager process it always has
these two operators, can I use CoLocationGroup to solve this problem? Or
can anyone give me an example to demostrate the usage of CoLocationGroup ?

Thanks!
-- 
Best regards

Sili Liu