Re: Dynamic rescaling in Flink

2020-06-14 Thread Xintong Song
Hi Prasanna,

IIUC, your screenshot shows the scaling feature of an EMR cluster, not
Flink.

Let me try to better understand your question. Which kind of rescaling do
you need?
- If you deploy a long running streaming job, and want it to dynamically
rescale based on the real-time incoming data stream. Flink does not support
it at the moment.
- If you have various jobs, and the amount of jobs need to be executed
changes along the time, this can be supported in either of the following
ways.
  - You can submit your workloads as Flink Single Jobs[1]. In this way, you
can simply rescale your EMR cluster, and Flink does not need to be aware of
that.
  - You can deploy a Flink Session[2], and submit your jobs to this
session. In this way, Flink will automatically request new workers from and
release idle workers to Yarn.

AFAIK, AWS EMR provides out-of-box Flink integration only for the single
job mode. The session mode is not supported. But I haven't checked this for
quite a while. It could have been changed.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#flink-yarn-session

On Mon, Jun 15, 2020 at 11:55 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Thanks Xintong and Yu Yang for the replies,
>
> I see AWS provides deploying Flink on EMR out of the box. There they have
> an option of EMR cluster scaling based on the load.
>
> Is this not equal to dynamic rescaling ?
>
> [image: Screen Shot 2020-06-15 at 9.23.24 AM.png]
>
>
>
> https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-scaling.html
>
> Let me know your thoughts on the same.
>
> Prasanna.
>
>
> On Wed, Jun 10, 2020 at 7:33 AM Xintong Song 
> wrote:
>
>> Hi Prasanna,
>>
>> Flink does not support dynamic rescaling at the moment.
>>
>> AFAIK, there are some companies in China already have solutions for
>> dynamic scaling Flink jobs (Alibaba, 360, etc.), but none of them are yet
>> available to the community version. These solutions rely on an external
>> system to monitor the workload and rescale the job accordingly. In case of
>> rescaling, it requires a full stop of the data processing, then rescale,
>> then recover from the most recent checkpoint.
>>
>> The Flink community is also preparing a declarative resource management
>> approach, which should allow the job to dynamically adapt to the available
>> resources (e.g., add/reduce pods on kubernetes). AFAIK, this is still in
>> the design discussion.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 10, 2020 at 2:44 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Does flink support dynamic scaling. Say try to add/reduce nodes based
>>> upon incoming load.
>>>
>>> Because our use case is such that we get peak loads for 4 hours and then
>>> medium loads for 8 hours and then light to no load for rest 2 hours.
>>>
>>> Or peak load would be atleast 5 times the medium load.
>>>
>>> Has anyone used flink in these type of scenario? We are looking at flink
>>> for it's low latency performance.
>>>
>>> Earlier I worked with Spark+YARN which provides a features to dynamicaly
>>> add/reduce executors.
>>>
>>> Wanted to know the same on flink.
>>>
>>> Thanks,
>>> Prasanna
>>>
>>


Re: Dynamic rescaling in Flink

2020-06-14 Thread Prasanna kumar
Thanks Xintong and Yu Yang for the replies,

I see AWS provides deploying Flink on EMR out of the box. There they have
an option of EMR cluster scaling based on the load.

Is this not equal to dynamic rescaling ?

[image: Screen Shot 2020-06-15 at 9.23.24 AM.png]


https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-scaling.html

Let me know your thoughts on the same.

Prasanna.


On Wed, Jun 10, 2020 at 7:33 AM Xintong Song  wrote:

> Hi Prasanna,
>
> Flink does not support dynamic rescaling at the moment.
>
> AFAIK, there are some companies in China already have solutions for
> dynamic scaling Flink jobs (Alibaba, 360, etc.), but none of them are yet
> available to the community version. These solutions rely on an external
> system to monitor the workload and rescale the job accordingly. In case of
> rescaling, it requires a full stop of the data processing, then rescale,
> then recover from the most recent checkpoint.
>
> The Flink community is also preparing a declarative resource management
> approach, which should allow the job to dynamically adapt to the available
> resources (e.g., add/reduce pods on kubernetes). AFAIK, this is still in
> the design discussion.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 10, 2020 at 2:44 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi all,
>>
>> Does flink support dynamic scaling. Say try to add/reduce nodes based
>> upon incoming load.
>>
>> Because our use case is such that we get peak loads for 4 hours and then
>> medium loads for 8 hours and then light to no load for rest 2 hours.
>>
>> Or peak load would be atleast 5 times the medium load.
>>
>> Has anyone used flink in these type of scenario? We are looking at flink
>> for it's low latency performance.
>>
>> Earlier I worked with Spark+YARN which provides a features to dynamicaly
>> add/reduce executors.
>>
>> Wanted to know the same on flink.
>>
>> Thanks,
>> Prasanna
>>
>


Re: The Flink job recovered with wrong checkpoint state.

2020-06-14 Thread Yun Tang
Hi Thomas

The answer is yes. Without high availability, once the job manager is down and 
even the job manager is relaunched via YARN, the job graph and last checkpoint 
would not be recovered.

Best
Yun Tang

From: Thomas Huang 
Sent: Sunday, June 14, 2020 22:58
To: Flink 
Subject: The Flink job recovered with wrong checkpoint state.

Hi Flink Community,

Currently, I'm using yarn-cluster mode to submit flink job on yarn, and I 
haven't set high availability configuration (zookeeper), but set restart 
strategy:

 env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3000))

the attempt time is 10 and the wait time 30 seconds per failure.

Today, when Infra team was rolling restart the yarn platform. Although the job 
manager restarted, the job hadn't recovered from the latest checkpoint, and all 
task managers started from the default job configuration that was not excepted.

Does it mean I have to setup high availability configuration for yarn-cluster 
mode, or Is there any bug?

Best Wish.




How do I backfill time series data?

2020-06-14 Thread Marco Villalobos
Hello Flink community. I need help. Thus far, Flink has proven very useful
to me.

I am using it for stream processing of time-series data.

For the scope of this mailing list, let's say the time-series has the
fields: name: String, value: double, and timestamp: Instant.

I named the time series: timeSeriesDataStream.

My first task was to average the time series by name within a 15 minute
tumbling event time window.

\
I was able to solve this with a ProcessWindowFunction (had to use this
approach because the watermark is not keyed), and named resultant  stream:
aggregateTimeSeriesDataStream, and then "sinking" the values.

My next task is to backfill the name averages on the subsequent. This means
that if a time-series does not appear in a subsequent window then the
previous average value will be used in that window.

How do I do this?

I started by performing a Map function on the aggregateTimeSeriesDataStream
to change the timestamp back 15 minutes, and naming the resultant stream:
backfilledDataStream.

Now, I am stuck. I suspect that I either

1) timeSeriesDataStream.coGroup(backfilledDataStream) and add
CoGroupWindowFunction to process the backfill.
2) Use "iterate" to somehow jury rig a backfill.

I really don't know.  That's why I am asking this group for advice.

What's the common solution for this problem? I am quite sure that this is a
very common use-case.


Re: Shared state between two process functions

2020-06-14 Thread Yun Gao
Hi Jaswin,
   Currently the state belongs to single operators, thus it should be not 
possible to share states between different operators. Could you also share the 
original problem want to solve by sharing states ?

Best,
 Yun



 --Original Mail --
Sender:Jaswin Shah 
Send Date:Sun Jun 14 18:57:54 2020
Recipients:user@flink.apache.org 
Subject:Shared state between two process functions

Hi,

Is it possible to create the shared state(MapState) between two different 
keyedProcessFunction? If it's possible, how can we do that in flink?

Thanks,
Jaswin

The Flink job recovered with wrong checkpoint state.

2020-06-14 Thread Thomas Huang
Hi Flink Community,

Currently, I'm using yarn-cluster mode to submit flink job on yarn, and I 
haven't set high availability configuration (zookeeper), but set restart 
strategy:

 env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3000))

the attempt time is 10 and the wait time 30 seconds per failure.

Today, when Infra team was rolling restart the yarn platform. Although the job 
manager restarted, the job hadn't recovered from the latest checkpoint, and all 
task managers started from the default job configuration that was not excepted.

Does it mean I have to setup high availability configuration for yarn-cluster 
mode, or Is there any bug?

Best Wish.




Shared state between two process functions

2020-06-14 Thread Jaswin Shah
Hi,

Is it possible to create the shared state(MapState) between two different 
keyedProcessFunction? If it's possible, how can we do that in flink?

Thanks,
Jaswin


Re: Incremental state

2020-06-14 Thread Congxian Qiu
Hi

Can process function[1] can meet your needs here?, you can do the TTL logic
using timers in process functions.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
Best,
Congxian


Timo Walther  于2020年6月10日周三 下午9:36写道:

> Hi Annemarie,
>
> if TTL is what you are looking for and queryable state is what limits
> you, it might make sense to come up with a custom implementation of
> queryable state? TTL might be more difficult to implement. As far as I
> know this feature is more of an experimental feature without any
> consistency guarantees. A Function could offer this functionality using
> some socket/web service library. Or you offer insights through a side
> output into a sink such as Elasticsearch.
>
> Otherwise, it might be useful to "batch" the cleanups. In Flink's SQL
> engine, a user can define a minimum and maximum retention time. So
> timers are always set based on the maximum retention time but during
> cleanup the elements that fall into the minimum retention time are also
> cleaned up on the way (see [1]). This could be a performance improvement.
>
> If the clean up happens based on event-time, it is also possible to use
> timers more efficiently and only set one timer per watermark [2].
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#timer-coalescing
>
>
> On 09.06.20 16:29, Annemarie Burger wrote:
> > Hi,
> >
> > What I'm trying to do is the following: I want to incrementally add and
> > delete elements to a state. If the element expires/goes out of the
> window,
> > it needs to be removed from the state. I basically want the
> functionality of
> > TTL, without using it, since I'm also using Queryable State and these two
> > features can't be combined. Ofcourse I can give a "valid untill" time to
> > each element when I'm adding it to the state using a ProcessFunction, and
> > periodically iterate over the state to remove expired elements, but I was
> > wondering if there is a more efficient way. For example to use a timer,
> > which we give the element as a parameter, so that when the timer fires, x
> > seconds after the timer was set, it can just look up the element directly
> > and remove it. But how would I implement this?
> >
> > Thanks!
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>


Re: MapState in flink

2020-06-14 Thread Congxian Qiu
Hi

Could you please share why you need `MapState` instead
of `MapState>`

Best,
Congxian


Oytun Tez  于2020年6月14日周日 上午3:39写道:

> Correct me @everyone if I'm wrong, but you cannot keep State inside State
> in that way. So change your signature to: MapState>
>
> The underlying mechanism wouldn't make sense in this state-inside-state
> shape.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Sat, Jun 13, 2020 at 3:32 PM Jaswin Shah 
> wrote:
>
>> I need some representation like this:
>>
>>
>> --
>> *From:* Jaswin Shah 
>> *Sent:* 14 June 2020 01:01
>> *To:* user@flink.apache.org 
>> *Subject:* MapState in flink
>>
>> Hi,
>> Can anyone please help me on how can I create a MapState of ListState in
>> flink, does flink support the same and if supports, how to declare the
>> descriptor for same state data structure?
>> If it is not supported, how may I create similar datastructure for state
>> in flink?
>>
>> Thanks,
>> Jaswin
>>
>