Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Ravi Bhushan Ratnakar
Hi congxian,

Thank you for your reply. As I shared details in my previous mail, in my
case, last successful checkpoint is missing details for some of the shards.
I am not doing any up scale or down scale of kinesis shard. I always run
with fix number of shards, so there is no possibility of new shard
discovery which could cause such problem.

Thanks,
Ravi



On Fri 22 Nov, 2019, 02:53 Congxian Qiu,  wrote:

> Hi
>
> For idle shards, I think restore from the previous not consumed data is
> ok, because Flink did not consume any data before, but for not idle shards
> this is problematic. From my experience of other connectors, could you
> check whether the "error" shards are newly split? maybe the newly split
> shards were not contained in the checkpoint.
>
> Best,
> Congxian
>
>
> Steven Nelson  于2019年10月17日周四 上午2:19写道:
>
>> In my situation I believe it's because we have idle shards (it's a
>> testing environment). I dug into the connector code and it looks like it
>> only updates the shard state when a record is processed or when the shard
>> hits shard_end. So, for an idle shard it would never get a checkpointed
>> state. I guess this is okay since in production we won't have idle shards,
>> but it might be better to send through a empty record that doesn't get
>> emitted, but it does trigger a state update.
>>
>> -Steve
>>
>>
>> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Do you know step by step process to reproduce this problem?
>>>
>>> -Ravi
>>>
>>>
>>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
>>> wrote:
>>>
 I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.

 About half my shards start over at trim horizon. Why would some shard
 statuses appear to not exist in a savepoints? This seems like a big 
 problem.

 -Steve

 On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
 ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> I am also facing the same problem. I am using Flink 1.9.0 and
> consuming from Kinesis source with retention of 1 day. I am observing that
> when the job is submitted with "latest" initial stream position, the job
> starts well and keep on processing data from all the shards for very long
> period of time without any lag. When the job fails then it also recovery
> well with last successful checkpointed state. But i am also experiencing
> that very rarely when the job fails and it recovers from the last
> successful checkpointed state, i noticed a hug lag( 1 day as per 
> retention)
> on one of the stream. For me, to reproduce this issue is still unknown to
> defined a step by step process.
>
> So far now, as per the analysis i gathered some  more information by
> customizing the FlinkKinesisConsumer to put additional log message, I
> noticed that the number of shards details which is loaded from checkpoint
> data during recovering is less than than the actual number of shards in 
> the
> stream. I have fixed number of shards in kinesis stream.
>
> i added one line of debug log at line 408 to print the size of
> variable "sequenceNumsToRestore" which was populated with shard
> details from checkpoint data.
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>
> In this consumer class, when the "run" method is called, it does
> following
>
>-  it discover shards from kinesis stream and selects all those
>shards which a subtask can scheduled
>- then one by one it iterates over the discovers shards and checks
>that whether that shards state is available in recovered state
>"sequenceNumsToRestore"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>- if it is available then it scheduled that shard with the
>recovered state
>- if it is not available in the state then it shcedule that shard
>with "EARLIEST_SEQUENCE_NUMBER"
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>
> As in my case, the recovered number of shard details from the
> checkpoint data is less than the actual number of shards which results 
> into
> scheduling those shards with earliest stream position.
> I am suspecting that somehow the checkpoint is missing state for some
> of the shards. But if this is the case then that checkpoint should had
> failed.
>
> Any further information to resolve this issue would be highly
> appreciated...
>
> Regards,
> Ravi
>

Re: Re: how to setup a ha flink cluster on k8s?

2019-11-21 Thread Yang Wang
Hi Rock,

If you correctly set the restart strategy, i think the jobmanager will
failover and relaunched again.
Also the job will be recovered, please share more logs about jobmanager if
you want.


Best,
Yang

Rock  于2019年11月20日周三 下午2:57写道:

> Hi Yang Wang,
>
> Thanks for your reply, I MAY HAVE setup a ha cluster succefully. The
> reason I can't setup before may be some bug about s3 in flink, after change
> to hdfs,I can run it suceefully.
>
> But after about one day of running ,the job-manager will crash and can't
> recover automatic, I must apply the deployment of job-manager manually (and
> that will fix the problom,my jobs will auto start), so strange 
>
> Since I changed too many from the yaml in flink's doc, I really don't know
> where is my conf is wrong.But I have add logback to flink and let
> it send log to my elasticsearch cluster,may the log can tell more..
>
> -- 原始邮件 --
> *发件人:* "Yang Wang";
> *发送时间:* 2019年11月19日(星期二) 中午12:05
> *收件人:* "vino yang";
> *抄送:* "Rock";"user@flink.apache.org"<
> user@flink.apache.org>;
> *主题:* Re: how to setup a ha flink cluster on k8s?
>
> Hi Rock,
>
> If you want to start a ha flink cluster on k8s, the simplest way is to use
> ZK+HDFS/S3,
> just as the ha configuration on Yarn. The zookeeper-operator could help
> the start a zk
> cluster.[1] Please share more information that why it could not work.
>
> If you are using kubernetes per-job cluster, the job could be recovered
> when the jm pod
> crashed and restarted.[2] The savepoint could also be used to get better
> recovery.
>
> [1].https://github.com/pravega/zookeeper-operator
> [2].
> https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
>
> vino yang  于2019年11月16日周六 下午5:00写道:
>
>> Hi Rock,
>>
>> I searched by Google and found a blog[1] talk about how to config JM HA
>> for Flink on k8s. Do not know whether it suitable for you or not. Please
>> feel free to refer to it.
>>
>> Best,
>> Vino
>>
>> [1]:
>> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>>
>> Rock  于2019年11月16日周六 上午11:02写道:
>>
>>> I'm trying to setup a flink cluster on k8s for production use.But the
>>> setup here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html
>>>   this
>>> not ha , when job-manager down and rescheduled
>>>
>>> the metadata for running job is lost.
>>>
>>>
>>>
>>> I tried to use ha setup for zk
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html
>>>  on
>>> k8s , but can't get it right.
>>>
>>>
>>>
>>> Stroing  job's metadata on k8s using pvc or other external file
>>> system should be  very easy.Is there a way to achieve it.
>>>
>>


Re: Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread Yang Wang
Hi Amran,

>> Container released on a *lost* node

If you see such exceptions, it means that the corresponding Yarn
NodeManager has lost. So all the
containers running on this node will be released. The Flink
YarnResourceManager receives the 'lost'
message from Yarn ResourceManager and will allocate a new taskmanager
container instead.

If you want to find the root cause, you need to check Yarn NodeManager logs
why it has been lost.



Best,
Yang

vino yang  于2019年11月22日周五 上午10:20写道:

> Hi Amran,
>
> Did you monitor or have a look at your memory metrics(e.g. full GC) of
> your TM.
>
> There is a similar thread that a user reported the same question due to
> full GC, the link is here[1].
>
> Best,
> Vino
>
> [1]:
> http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/%3cfa4068d3-2696-4f29-857d-4a7741c3e...@greghogan.com%3E
>
> amran dean  于2019年11月22日周五 上午7:15写道:
>
>> Hello,
>> I am frequently seeing this error in my jobmanager logs:
>>
>> 2019-11-18 09:07:08,863 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> kdi_g2 -> (Sink: s_id_metadata, Sink: t_id_metadata) (23/24)
>> (5e10e88814fe4ab0be5f554ec59bd93d) switched from RUNNING to FAILED.
>> java.lang.Exception: Container released on a *lost* node
>> at
>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Yarn nodemanager logs don't show anything out of the ordinary when such
>> exceptions occur, so I am suspecting it is a timeout of some sort due to
>> network problems. (YARN is not killing the container ). It's difficult to
>> diagnose because Flink doesn't give any reason for losing the node. Is it a
>> timeout? OOM?
>>
>>  Is there a corresponding configuration I should tune? What is the
>> recommended course of action?
>>
>


Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Yang Wang
Hi Piper,

Jingsong is right. Both per-job and session cluster, the
YarnResourceManager will allocate
taskmanager containers dynamically on demand.

For per-job cluster, it will allocate taskmanagers base on the job slot
demand. The excess
containers will return to yarn immediately. When the job finished,
jobmanager and all
taskmanagers will be released.
For sesion-cluster, the YarnResourceManager will not have any taskmanagers
on started.
Once the job is submitted, it will allocate the taskmanagers. When the job
finished, the
taskmanagers will enter into idle and be released after the timeout. The
jobmanager will
be long-running unless manually stop the session.

I'm just curious why do you want to control the amounts of taskmanagers.
Because they are
always allocated on demand.


Best,
Yang

Piper Piper  于2019年11月22日周五 上午11:02写道:

> Thank you, I will check it out.
>
> On Thu, Nov 21, 2019, 9:21 PM Jingsong Li  wrote:
>
>> Hi Piper,
>>
>> AFAIK, There are no these flexible operations. You can get some
>> information from metrics, but you can not control them.
>> Maybe you should modify some source code in flink-yarn.
>>
>> Best,
>> Jingsong Lee
>>
>>
>> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper  wrote:
>>
>>> Hi Jingsong,
>>>
>>> Thank you for your reply!
>>>
>>> >Is this what you want? Piper.
>>>
>>> Yes. This is exactly what I want.
>>>
>>> Is there any way for me to specify to Flink RM how much of resources to
>>> ask YARN's RM for, and if we want Flink's RM to ask for resources
>>> proactively before it runs out?
>>> Similarly, is there any way I can force the JM to release TM back to
>>> YARN before timeout?
>>>
>>> Or will I need to modify the source code of Flink for this?
>>>
>>> Thank you,
>>>
>>> Piper
>>>
>>> On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:
>>>
 Hi Jingsong,

 Thanks for the explanation about the mechanism of the new Flink session
 cluster mode.

 Because I mostly use job cluster mode, so did not have a good knowledge
 of the new Flink session cluster mode.

 Best,
 Vino

 Jingsong Li  于2019年11月21日周四 下午2:46写道:

> Hi Piper and Vino:
>
> Current Flink version, the resources of Flink Session cluster
> are unrestricted, which means if the requested resources exceed the
> resources owned by the current session, it will apply to the RM of yarn 
> for
> new resources.
> And if TaskManager is idle for too long, JM will release it to yarn.
> This behavior is controlled by resourcemanager.taskmanager-timeout . You
> can set a suitable value for it to enjoy the benefits of reuse process and
> dynamic resources.
>
> From this point of view, I think session mode is a good choice.
> Is this what you want? Piper.
>
> Best,
> Jingsong Lee
>
>
>
> On Thu, Nov 21, 2019 at 2:25 PM vino yang 
> wrote:
>
>> Hi Piper,
>>
>> The understanding of two deploy modes For Flink on Yarn is right.
>>
>> AFAIK, The single job (job cluster) mode is more popular than Session
>> mode.
>>
>> Because job cluster mode, Flink let YARN manage resources as far as
>> possible. And this mode can keep isolation from other jobs.
>>
>> IMO, we do not need to combine their advantages. Let YARN do the
>> things that it is good at. What do you think?
>>
>> Best,
>> Vino
>>
>>
>> Piper Piper  于2019年11月21日周四 上午11:55写道:
>>
>>> Hi Vino,
>>>
>>> I want to implement Resource Elasticity. In doing so, I have read
>>> that Flink with YARN has two modes: Job and Session.
>>>
>>> In Job mode, Flink’s Resource Manager requests YARN for containers
>>> with TMs, and then gives the containers back to YARN upon job 
>>> completion.
>>>
>>> In Session mode, Flink already has the TMs that are persistent.
>>>
>>> I want to combine the advantages of Job and Session mode, i.e. Flink
>>> will have persistent TMs/containers and request YARN for more
>>> TMs/containers when needed (or release TMs/containers back to YARN).
>>>
>>> Thank you,
>>>
>>> Piper
>>>
>>> On Wed, Nov 20, 2019 at 9:39 PM vino yang 
>>> wrote:
>>>
 Hi Piper,

 Can you share more reason and details of your requirements.

 Best,
 Vino

 Piper Piper  于2019年11月21日周四 上午5:48写道:

> Hi,
>
> How can I make Flink's Resource Manager request YARN to spin up
> new (or destroy/reclaim existing) TaskManagers in YARN containers?
>
> Preferably at runtime (i.e. dynamically).
>
> Thank you
>
> Piper
>

>
> --
> Best, Jingsong Lee
>

>>
>> --
>> Best, Jingsong Lee
>>
>


Apache Flink - Uid and name for Flink sources and sinks

2019-11-21 Thread M Singh
Hi Folks:
I am assigning uid and name for all stateful processors in our application and 
wanted to find out the following:
1. Should we assign uid and name to the sources and sinks too ?  2. What are 
the pros and cons of adding uid to sources and sinks ?3. The sinks have uid and 
hashUid - which is the preferred attribute to use  for allowing job restarts 
?4. If sink and sources uid are not provided in the application, can they still 
maintain state across job restarts from checkpoints ?  5. Can the sinks and 
sources without uid restart from savepoints ?6. The data streams have an 
attribute id -  How is this generated and can this be used for creating a uid 
for the sink ?  
Thanks for your help.
Mans

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Thank you, I will check it out.

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li  wrote:

> Hi Piper,
>
> AFAIK, There are no these flexible operations. You can get some
> information from metrics, but you can not control them.
> Maybe you should modify some source code in flink-yarn.
>
> Best,
> Jingsong Lee
>
>
> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper  wrote:
>
>> Hi Jingsong,
>>
>> Thank you for your reply!
>>
>> >Is this what you want? Piper.
>>
>> Yes. This is exactly what I want.
>>
>> Is there any way for me to specify to Flink RM how much of resources to
>> ask YARN's RM for, and if we want Flink's RM to ask for resources
>> proactively before it runs out?
>> Similarly, is there any way I can force the JM to release TM back to YARN
>> before timeout?
>>
>> Or will I need to modify the source code of Flink for this?
>>
>> Thank you,
>>
>> Piper
>>
>> On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:
>>
>>> Hi Jingsong,
>>>
>>> Thanks for the explanation about the mechanism of the new Flink session
>>> cluster mode.
>>>
>>> Because I mostly use job cluster mode, so did not have a good knowledge
>>> of the new Flink session cluster mode.
>>>
>>> Best,
>>> Vino
>>>
>>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>>
 Hi Piper and Vino:

 Current Flink version, the resources of Flink Session cluster
 are unrestricted, which means if the requested resources exceed the
 resources owned by the current session, it will apply to the RM of yarn for
 new resources.
 And if TaskManager is idle for too long, JM will release it to yarn.
 This behavior is controlled by resourcemanager.taskmanager-timeout . You
 can set a suitable value for it to enjoy the benefits of reuse process and
 dynamic resources.

 From this point of view, I think session mode is a good choice.
 Is this what you want? Piper.

 Best,
 Jingsong Lee



 On Thu, Nov 21, 2019 at 2:25 PM vino yang 
 wrote:

> Hi Piper,
>
> The understanding of two deploy modes For Flink on Yarn is right.
>
> AFAIK, The single job (job cluster) mode is more popular than Session
> mode.
>
> Because job cluster mode, Flink let YARN manage resources as far as
> possible. And this mode can keep isolation from other jobs.
>
> IMO, we do not need to combine their advantages. Let YARN do the
> things that it is good at. What do you think?
>
> Best,
> Vino
>
>
> Piper Piper  于2019年11月21日周四 上午11:55写道:
>
>> Hi Vino,
>>
>> I want to implement Resource Elasticity. In doing so, I have read
>> that Flink with YARN has two modes: Job and Session.
>>
>> In Job mode, Flink’s Resource Manager requests YARN for containers
>> with TMs, and then gives the containers back to YARN upon job completion.
>>
>> In Session mode, Flink already has the TMs that are persistent.
>>
>> I want to combine the advantages of Job and Session mode, i.e. Flink
>> will have persistent TMs/containers and request YARN for more
>> TMs/containers when needed (or release TMs/containers back to YARN).
>>
>> Thank you,
>>
>> Piper
>>
>> On Wed, Nov 20, 2019 at 9:39 PM vino yang 
>> wrote:
>>
>>> Hi Piper,
>>>
>>> Can you share more reason and details of your requirements.
>>>
>>> Best,
>>> Vino
>>>
>>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>>
 Hi,

 How can I make Flink's Resource Manager request YARN to spin up new
 (or destroy/reclaim existing) TaskManagers in YARN containers?

 Preferably at runtime (i.e. dynamically).

 Thank you

 Piper

>>>

 --
 Best, Jingsong Lee

>>>
>
> --
> Best, Jingsong Lee
>


Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Zhijiang
The hint of mmap usage below is really helpful to locate this problem. I forgot 
this biggest change for batch job in release-1.9.
The blocking type option can be set to `file` as Piotr suggested to behave 
similar as before. I think it can solve your problem. 


--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 23:37
To:Piotr Nowojski 
Cc:Zhijiang ; user@flink.apache.org 

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Thanks, Piotr. We’ll rerun our apps today with this and get back to you. 

// ah

From: Piotr Nowojski  On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] 
Cc: Zhijiang ; user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi,
I would suspect this:
https://issues.apache.org/jira/browse/FLINK-12070
To be the source of the problems.
There seems to be a hidden configuration option that avoids using memory mapped 
files:
taskmanager.network.bounded-blocking-subpartition-type: file
Could you test if helps?
Piotrek



On 21 Nov 2019, at 15:22, Hailu, Andreas  wrote:
Hi Zhijiang,

I looked into the container logs for the failure, and didn’t see any specific 
OutOfMemory errors before it was killed. I ran the application using the same 
config this morning on 1.6.4, and it went through successfully. I took a 
snapshot of the memory usage from the dashboard and can send it to you if you 
like for reference.

What stands out to me as suspicious is that on 1.9.1, the application is using 
nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its 
runtime and succeeds. The JVM heap memory itself never exceeds its capacity, 
peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes 
around mapped memory.

// ah

From: Zhijiang  
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang

--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at 
org.apache.

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Jingsong Li
Hi Piper,

AFAIK, There are no these flexible operations. You can get some information
from metrics, but you can not control them.
Maybe you should modify some source code in flink-yarn.

Best,
Jingsong Lee


On Thu, Nov 21, 2019 at 8:17 PM Piper Piper  wrote:

> Hi Jingsong,
>
> Thank you for your reply!
>
> >Is this what you want? Piper.
>
> Yes. This is exactly what I want.
>
> Is there any way for me to specify to Flink RM how much of resources to
> ask YARN's RM for, and if we want Flink's RM to ask for resources
> proactively before it runs out?
> Similarly, is there any way I can force the JM to release TM back to YARN
> before timeout?
>
> Or will I need to modify the source code of Flink for this?
>
> Thank you,
>
> Piper
>
> On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:
>
>> Hi Jingsong,
>>
>> Thanks for the explanation about the mechanism of the new Flink session
>> cluster mode.
>>
>> Because I mostly use job cluster mode, so did not have a good knowledge
>> of the new Flink session cluster mode.
>>
>> Best,
>> Vino
>>
>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>
>>> Hi Piper and Vino:
>>>
>>> Current Flink version, the resources of Flink Session cluster
>>> are unrestricted, which means if the requested resources exceed the
>>> resources owned by the current session, it will apply to the RM of yarn for
>>> new resources.
>>> And if TaskManager is idle for too long, JM will release it to yarn.
>>> This behavior is controlled by resourcemanager.taskmanager-timeout . You
>>> can set a suitable value for it to enjoy the benefits of reuse process and
>>> dynamic resources.
>>>
>>> From this point of view, I think session mode is a good choice.
>>> Is this what you want? Piper.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>>
>>>
>>> On Thu, Nov 21, 2019 at 2:25 PM vino yang  wrote:
>>>
 Hi Piper,

 The understanding of two deploy modes For Flink on Yarn is right.

 AFAIK, The single job (job cluster) mode is more popular than Session
 mode.

 Because job cluster mode, Flink let YARN manage resources as far as
 possible. And this mode can keep isolation from other jobs.

 IMO, we do not need to combine their advantages. Let YARN do the things
 that it is good at. What do you think?

 Best,
 Vino


 Piper Piper  于2019年11月21日周四 上午11:55写道:

> Hi Vino,
>
> I want to implement Resource Elasticity. In doing so, I have read that
> Flink with YARN has two modes: Job and Session.
>
> In Job mode, Flink’s Resource Manager requests YARN for containers
> with TMs, and then gives the containers back to YARN upon job completion.
>
> In Session mode, Flink already has the TMs that are persistent.
>
> I want to combine the advantages of Job and Session mode, i.e. Flink
> will have persistent TMs/containers and request YARN for more
> TMs/containers when needed (or release TMs/containers back to YARN).
>
> Thank you,
>
> Piper
>
> On Wed, Nov 20, 2019 at 9:39 PM vino yang 
> wrote:
>
>> Hi Piper,
>>
>> Can you share more reason and details of your requirements.
>>
>> Best,
>> Vino
>>
>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>
>>> Hi,
>>>
>>> How can I make Flink's Resource Manager request YARN to spin up new
>>> (or destroy/reclaim existing) TaskManagers in YARN containers?
>>>
>>> Preferably at runtime (i.e. dynamically).
>>>
>>> Thank you
>>>
>>> Piper
>>>
>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

-- 
Best, Jingsong Lee


Re: Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread vino yang
Hi Amran,

Did you monitor or have a look at your memory metrics(e.g. full GC) of your
TM.

There is a similar thread that a user reported the same question due to
full GC, the link is here[1].

Best,
Vino

[1]:
http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/%3cfa4068d3-2696-4f29-857d-4a7741c3e...@greghogan.com%3E

amran dean  于2019年11月22日周五 上午7:15写道:

> Hello,
> I am frequently seeing this error in my jobmanager logs:
>
> 2019-11-18 09:07:08,863 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> kdi_g2 -> (Sink: s_id_metadata, Sink: t_id_metadata) (23/24)
> (5e10e88814fe4ab0be5f554ec59bd93d) switched from RUNNING to FAILED.
> java.lang.Exception: Container released on a *lost* node
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Yarn nodemanager logs don't show anything out of the ordinary when such
> exceptions occur, so I am suspecting it is a timeout of some sort due to
> network problems. (YARN is not killing the container ). It's difficult to
> diagnose because Flink doesn't give any reason for losing the node. Is it a
> timeout? OOM?
>
>  Is there a corresponding configuration I should tune? What is the
> recommended course of action?
>


Re: Savepoints and checkpoints

2019-11-21 Thread Congxian Qiu
Hi

First, Checkpoint for Flink is a distributed snapshot of the job.
As Yun said, Kafka consumer will snapshot the topic name and partition to
the checkpoint, then when restoring from the last checkpoint you do not
know about the newly topic name.
Inner the checkpoint, you can think checkpoint as collections of key-value
pair, the key is operatorid and value is the snapshot of the operator,
operatorid will be generated automatically if you do not set it, and you
can disable the automatically generate by calling `
ExecutionConfig#disableAutoGeneratedUIDs`[1], and it will fail the job
submission if any operator does not contain a custom unique ID.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
Best,
Congxian


Yun Tang  于2019年11月22日周五 上午2:20写道:

> Hi Min
>
>
>
> Since kafka consumer would store KafkaTopicPartition [1] within
> checkpoint, you cannot load previous state if you changed the kafka topic
> name.
>
>
>
> If you assign operator-id to previous stateful operator and splits into
> two operator but still maintain one new operator as previous operator-id,
> Flink would try to assign previous state to that new operator. Otherwise,
> previous state would not match any operator and you need to consider allow
> non-restored state if choose to resume from previous checkpoint/savepoint
> [3].
>
>
>
> [1]
> https://github.com/apache/flink/blob/b290230662fa1aa38909aed40ac85eaf843e1d1c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L902
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
>
>
>
> Best
>
> Yun Tang
>
>
>
>
>
> *From: *"min@ubs.com" 
> *Date: *Thursday, November 21, 2019 at 5:19 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Savepoints and checkpoints
>
>
>
> Hi,
>
>
>
> Are Flink savepoints and checkpoitns still vlaid after some data entity
> changes e.g. Kafka topic name changes? I expect the answer is "No"?
>
> Similarly, are Flink savepoints and checkpoitns still valid after some job
> graph changes e.g. one stateful operator splits into two? I expect the
> answer is "No"?
>
>
>
> Regards,
>
>
>
> Min
>
>
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread vino yang
Hi Lei,

It would be better to use Flink's RESTful API to fetch the information of
the running jobs[1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-1

Best,
Vino

Lei Nie  于2019年11月22日周五 上午4:14写道:

> I looked at the code, and
> StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is
> generating a random ID unrelated to the actual ID used.
>
> Is there any way to fetch the real ID at runtime?
> Use case: fetch most recent checkpoint from stable storage for
> automated restarts. Most recent checkpoint has form
> ".../checkpoints/flink_app_id/chk-123"
>
> On Thu, Nov 21, 2019 at 11:28 AM Lei Nie  wrote:
> >
> > This does not get the correct id:
> > StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
> > eea5abc21dd8743a4090f4a3a660f9e8
> > Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a
> >
> >
> >
> > On Thu, Nov 7, 2019 at 6:56 PM vino yang  wrote:
> > >
> > > Hi Lei Nie,
> > >
> > > You can use
> `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
> job id.
> > >
> > > Best,
> > > Vino
> > >
> > > Lei Nie  于2019年11月8日周五 上午8:38写道:
> > >>
> > >> Hello,
> > >> I am currently executing streaming jobs via
> StreamExecutionEnvironment. Is it possible to retrieve the Flink job
> ID/YARN ID within the context of a job? I'd like to be able to
> automatically register the job such that monitoring jobs can run (REST api
> requires for example job id).
> > >>
> > >> Thanks
>


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Congxian Qiu
Hi

For idle shards, I think restore from the previous not consumed data is ok,
because Flink did not consume any data before, but for not idle shards this
is problematic. From my experience of other connectors, could you check
whether the "error" shards are newly split? maybe the newly split shards
were not contained in the checkpoint.

Best,
Congxian


Steven Nelson  于2019年10月17日周四 上午2:19写道:

> In my situation I believe it's because we have idle shards (it's a testing
> environment). I dug into the connector code and it looks like it only
> updates the shard state when a record is processed or when the shard hits
> shard_end. So, for an idle shard it would never get a checkpointed state. I
> guess this is okay since in production we won't have idle shards, but it
> might be better to send through a empty record that doesn't get emitted,
> but it does trigger a state update.
>
> -Steve
>
>
> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Do you know step by step process to reproduce this problem?
>>
>> -Ravi
>>
>>
>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
>> wrote:
>>
>>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>>
>>> About half my shards start over at trim horizon. Why would some shard
>>> statuses appear to not exist in a savepoints? This seems like a big problem.
>>>
>>> -Steve
>>>
>>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
 Hi,

 I am also facing the same problem. I am using Flink 1.9.0 and consuming
 from Kinesis source with retention of 1 day. I am observing that when the
 job is submitted with "latest" initial stream position, the job starts well
 and keep on processing data from all the shards for very long period of
 time without any lag. When the job fails then it also recovery well with
 last successful checkpointed state. But i am also experiencing that very
 rarely when the job fails and it recovers from the last successful
 checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
 the stream. For me, to reproduce this issue is still unknown to defined a
 step by step process.

 So far now, as per the analysis i gathered some  more information by
 customizing the FlinkKinesisConsumer to put additional log message, I
 noticed that the number of shards details which is loaded from checkpoint
 data during recovering is less than than the actual number of shards in the
 stream. I have fixed number of shards in kinesis stream.

 i added one line of debug log at line 408 to print the size of variable
 "sequenceNumsToRestore" which was populated with shard details from
 checkpoint data.

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

 In this consumer class, when the "run" method is called, it does
 following

-  it discover shards from kinesis stream and selects all those
shards which a subtask can scheduled
- then one by one it iterates over the discovers shards and checks
that whether that shards state is available in recovered state
"sequenceNumsToRestore"

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
- if it is available then it scheduled that shard with the
recovered state
- if it is not available in the state then it shcedule that shard
with "EARLIEST_SEQUENCE_NUMBER"

 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

 As in my case, the recovered number of shard details from the
 checkpoint data is less than the actual number of shards which results into
 scheduling those shards with earliest stream position.
 I am suspecting that somehow the checkpoint is missing state for some
 of the shards. But if this is the case then that checkpoint should had
 failed.

 Any further information to resolve this issue would be highly
 appreciated...

 Regards,
 Ravi

 On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might
> due to the shard wasn't discovered in the previous run, therefore it would
> be consumed from the beginning. Please refer to the implementation here: 
> [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.

Flink on YARN: frequent "container released on a *lost* node"

2019-11-21 Thread amran dean
Hello,
I am frequently seeing this error in my jobmanager logs:

2019-11-18 09:07:08,863 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
kdi_g2 -> (Sink: s_id_metadata, Sink: t_id_metadata) (23/24)
(5e10e88814fe4ab0be5f554ec59bd93d) switched from RUNNING to FAILED.
java.lang.Exception: Container released on a *lost* node
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Yarn nodemanager logs don't show anything out of the ordinary when such
exceptions occur, so I am suspecting it is a timeout of some sort due to
network problems. (YARN is not killing the container ). It's difficult to
diagnose because Flink doesn't give any reason for losing the node. Is it a
timeout? OOM?

 Is there a corresponding configuration I should tune? What is the
recommended course of action?


Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Thank you, Kelly!

On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith  wrote:

> Hi Piper,
>
>
>
> The repro is pretty simple:
>
>- Submit a job with parallelism set higher than YARN has resources to
>support
>
>
>
> What this ends up looking like in the Flink UI is this:
>
>
>
> The Job is in a “RUNNING” state, but all of the tasks are in the
> “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits
> by default will increase by 1, but none of the tasks actually get scheduled
> on any TM.
>
>
>
>
>
> What I’m looking for is a way to detect when I am in this state using
> Flink metrics (ideally the count of tasks in each state for better
> observability).
>
>
>
> Does that make sense?
>
>
>
> Thanks,
>
> Kelly
>
>
>
> *From: *Piper Piper 
> *Date: *Thursday, November 21, 2019 at 12:59 PM
> *To: *Kelly Smith 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Metrics for Task States
>
>
>
> Hello Kelly,
>
>
>
> I thought that Flink scheduler only starts a job if all requested
> containers/TMs are available and allotted to that job.
>
>
>
> How can I reproduce your issue on Flink with YARN?
>
>
>
> Thank you,
>
>
>
> Piper
>
>
>
>
>
> On Thu, Nov 21, 2019, 1:48 PM Kelly Smith  wrote:
>
> I’ve been running Flink in production on EMR (YARN) for some time and have
> found the metrics system to be quite useful, but there is one specific case
> where I’m missing a signal for this scenario:
>
>
>
>- When a job has been submitted, but YARN does not have enough
>resources to provide
>
>
>
> Observed:
>
>- Job is in RUNNING state
>- All of the tasks for the job are in the (I believe) DEPLOYING state
>
>
>
> Is there a way to access these as metrics for monitoring the number of
> tasks in each state for a given job (image below)? The metric I’m currently
> using is the number of running jobs, but it misses this “unhealthy”
> scenario. I realize that I could use application-level metrics (record
> counts, etc) as a proxy for this, but I’m working on providing a streaming
> platform and need all of my monitoring to be application agnostic.
>
> [image: cid:image001.png@01D5A059.19DB3EB0]
>
>
>
> I can’t find anything on it in the documentation.
>
>
>
> Thanks,
>
> Kelly
>
>


Re: Streaming data to Segment

2019-11-21 Thread Li Peng
Awesome, I'll definitely try that out, thanks!

On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov  wrote:

> Hi Li,
>
> You're in the right direction. One additional step would be to use
> RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
> close functions which allow you to initialize and dispose resources
> properly.
>
> On Thu, 21 Nov 2019, 5:23 Li Peng,  wrote:
>
>> Hey folks, I'm interested in streaming some data to Segment
>> , using their existing
>> java library. This is a pretty high throughput stream, so I wanted for each
>> parallel operator to have its own instance of the segment client. From what
>> I could tell, defining a custom SinkFunction should be able to satisfy as
>> it as each parallel operator gets its own SinkFunction object
>> automatically. So my code looks like this:
>>
>> class SegmentSink() extends SinkFunction[Data] {
>>
>>   @transient
>>   val segmentClient: Analytics = Analytics.builder("key").build()
>>
>>   override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = 
>> {
>> segmentClient.enqueue(...)
>>   }
>> }
>>
>> Can anyone verify if this is the right pattern for me to use? Is there
>> any risk of the SinkFunction getting repeatedly serialized/deserialized
>> which results in new segment clients getting created each time?
>>
>> Thanks,
>> Li
>>
>


Re: Metrics for Task States

2019-11-21 Thread Kelly Smith
Hi Piper,

The repro is pretty simple:

  *   Submit a job with parallelism set higher than YARN has resources to 
support

What this ends up looking like in the Flink UI is this:
[cid:image001.png@01D5A06C.6E16D580]

The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” 
state. The `jobmanager.numRunningJobs` metric that Flink emits by default will 
increase by 1, but none of the tasks actually get scheduled on any TM.

[cid:image002.png@01D5A06C.6E16D580]

What I’m looking for is a way to detect when I am in this state using Flink 
metrics (ideally the count of tasks in each state for better observability).

Does that make sense?

Thanks,
Kelly

From: Piper Piper 
Date: Thursday, November 21, 2019 at 12:59 PM
To: Kelly Smith 
Cc: "user@flink.apache.org" 
Subject: Re: Metrics for Task States

Hello Kelly,

I thought that Flink scheduler only starts a job if all requested 
containers/TMs are available and allotted to that job.

How can I reproduce your issue on Flink with YARN?

Thank you,

Piper


On Thu, Nov 21, 2019, 1:48 PM Kelly Smith 
mailto:kell...@zillowgroup.com>> wrote:
I’ve been running Flink in production on EMR (YARN) for some time and have 
found the metrics system to be quite useful, but there is one specific case 
where I’m missing a signal for this scenario:


  *   When a job has been submitted, but YARN does not have enough resources to 
provide

Observed:

  *   Job is in RUNNING state
  *   All of the tasks for the job are in the (I believe) DEPLOYING state

Is there a way to access these as metrics for monitoring the number of tasks in 
each state for a given job (image below)? The metric I’m currently using is the 
number of running jobs, but it misses this “unhealthy” scenario. I realize that 
I could use application-level metrics (record counts, etc) as a proxy for this, 
but I’m working on providing a streaming platform and need all of my monitoring 
to be application agnostic.
[cid:image001.png@01D5A059.19DB3EB0]

I can’t find anything on it in the documentation.

Thanks,
Kelly


Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Hello Kelly,

I thought that Flink scheduler only starts a job if all requested
containers/TMs are available and allotted to that job.

How can I reproduce your issue on Flink with YARN?

Thank you,

Piper


On Thu, Nov 21, 2019, 1:48 PM Kelly Smith  wrote:

> I’ve been running Flink in production on EMR (YARN) for some time and have
> found the metrics system to be quite useful, but there is one specific case
> where I’m missing a signal for this scenario:
>
>
>
>- When a job has been submitted, but YARN does not have enough
>resources to provide
>
>
>
> Observed:
>
>- Job is in RUNNING state
>- All of the tasks for the job are in the (I believe) DEPLOYING state
>
>
>
> Is there a way to access these as metrics for monitoring the number of
> tasks in each state for a given job (image below)? The metric I’m currently
> using is the number of running jobs, but it misses this “unhealthy”
> scenario. I realize that I could use application-level metrics (record
> counts, etc) as a proxy for this, but I’m working on providing a streaming
> platform and need all of my monitoring to be application agnostic.
>
>
>
> I can’t find anything on it in the documentation.
>
>
>
> Thanks,
>
> Kelly
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread Lei Nie
I looked at the code, and
StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is
generating a random ID unrelated to the actual ID used.

Is there any way to fetch the real ID at runtime?
Use case: fetch most recent checkpoint from stable storage for
automated restarts. Most recent checkpoint has form
".../checkpoints/flink_app_id/chk-123"

On Thu, Nov 21, 2019 at 11:28 AM Lei Nie  wrote:
>
> This does not get the correct id:
> StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
> eea5abc21dd8743a4090f4a3a660f9e8
> Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a
>
>
>
> On Thu, Nov 7, 2019 at 6:56 PM vino yang  wrote:
> >
> > Hi Lei Nie,
> >
> > You can use 
> > `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the 
> > job id.
> >
> > Best,
> > Vino
> >
> > Lei Nie  于2019年11月8日周五 上午8:38写道:
> >>
> >> Hello,
> >> I am currently executing streaming jobs via StreamExecutionEnvironment. Is 
> >> it possible to retrieve the Flink job ID/YARN ID within the context of a 
> >> job? I'd like to be able to automatically register the job such that 
> >> monitoring jobs can run (REST api requires for example job id).
> >>
> >> Thanks


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-21 Thread Lei Nie
This does not get the correct id:
StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
eea5abc21dd8743a4090f4a3a660f9e8
Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a



On Thu, Nov 7, 2019 at 6:56 PM vino yang  wrote:
>
> Hi Lei Nie,
>
> You can use `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` 
> to get the job id.
>
> Best,
> Vino
>
> Lei Nie  于2019年11月8日周五 上午8:38写道:
>>
>> Hello,
>> I am currently executing streaming jobs via StreamExecutionEnvironment. Is 
>> it possible to retrieve the Flink job ID/YARN ID within the context of a 
>> job? I'd like to be able to automatically register the job such that 
>> monitoring jobs can run (REST api requires for example job id).
>>
>> Thanks


Metrics for Task States

2019-11-21 Thread Kelly Smith
I’ve been running Flink in production on EMR (YARN) for some time and have 
found the metrics system to be quite useful, but there is one specific case 
where I’m missing a signal for this scenario:


  *   When a job has been submitted, but YARN does not have enough resources to 
provide

Observed:

  *   Job is in RUNNING state
  *   All of the tasks for the job are in the (I believe) DEPLOYING state

Is there a way to access these as metrics for monitoring the number of tasks in 
each state for a given job (image below)? The metric I’m currently using is the 
number of running jobs, but it misses this “unhealthy” scenario. I realize that 
I could use application-level metrics (record counts, etc) as a proxy for this, 
but I’m working on providing a streaming platform and need all of my monitoring 
to be application agnostic.
[cid:image001.png@01D5A059.19DB3EB0]

I can’t find anything on it in the documentation.

Thanks,
Kelly


Re: Savepoints and checkpoints

2019-11-21 Thread Yun Tang
Hi Min

Since kafka consumer would store KafkaTopicPartition [1] within checkpoint, you 
cannot load previous state if you changed the kafka topic name.

If you assign operator-id to previous stateful operator and splits into two 
operator but still maintain one new operator as previous operator-id, Flink 
would try to assign previous state to that new operator. Otherwise, previous 
state would not match any operator and you need to consider allow non-restored 
state if choose to resume from previous checkpoint/savepoint [3].

[1] 
https://github.com/apache/flink/blob/b290230662fa1aa38909aed40ac85eaf843e1d1c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L902
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job

Best
Yun Tang


From: "min@ubs.com" 
Date: Thursday, November 21, 2019 at 5:19 PM
To: "user@flink.apache.org" 
Subject: Savepoints and checkpoints

Hi,

Are Flink savepoints and checkpoitns still vlaid after some data entity changes 
e.g. Kafka topic name changes? I expect the answer is "No"?
Similarly, are Flink savepoints and checkpoitns still valid after some job 
graph changes e.g. one stateful operator splits into two? I expect the answer 
is "No"?

Regards,

Min



Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
 Thanks so much Congxian for your pointers - I will try to go through them.  
Mans
On Thursday, November 21, 2019, 07:26:49 AM EST, Congxian Qiu 
 wrote:  
 
 Hi1. I think this issue[1] can help to understand the directory layout2. chk-6 
directory or the metafilePath, for more information, you can ref to[2][3]3. 
every checkpoint contains a meta-file record such, maybe you can ref to[4], and 
debug it for more information4. currently, you need to go through  the restore 
logic to see if the files' contents
[1] https://issues.apache.org/jira/browse/FLINK-8531[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1153[3]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint[4]
 
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,Congxian

M Singh  于2019年11月21日周四 下午7:44写道:

 Hi Congxian:
For my application i see many uuids under the chk-6 directory ( I posted one in 
the sample above).   I am trying to understand that if I restart the 
application with this checkpoint (which I believe I can just like a savepoint - 
I am using chk-6 as an example below)1. I believe each chk- - is a complete 
state of checkpoint. Is that correct ?2. How to I point it to the checkpoint 
(chk-6) when I submit the job - Do I point it to the jobid or the chk-6 
directory ?  I am presuming the latter (ie, pointing to the chk06 directory but 
just want to confirm.3. Secondly, how does the application map the the files 
under the chk-6 to restore the state of each of the stateful operators ?4. Is 
there any API by which I can examine the contents of the files under the chk-6 
directory ?
Thanks for your help.
Mans
On Wednesday, November 20, 2019, 09:13:39 PM EST, Congxian Qiu 
 wrote:  
 
 Hi
Currently, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) is 
generated by UUID.randomUUID(), so there is not a easy way to map this to the 
assigned in the application.In another word, the last part (uuid 
a4d87cda-2afd-47d4-8d3f-b0658466fb2d) belongs to one checkpoint, and the 
assigned in the application belongs to one operator, they are different.
Best,Congxian

M Singh  于2019年11月21日周四 上午6:18写道:

 Hi Arvid:
Thanks for your clarification.
I am giving supplying uid for the stateful operators and find the following 
directory structure on in the chkpoint directory:

f4e78cb47f9dc12859558be7d15f39d0/chk-6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_idIs there a way to 
map the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid 
assigned in the application ?

Thanks 
On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
just to follow up. There are no limitations for name or uuid. 

The uuid will be in fact hashed internally while the StreamGraph is being 
generated, so all characters are allowed.The name is only for debugging 
purposes and web ui. If you use very special characters, you may see oddities 
in logs/web ui, but nothing should break. 
Spaces or parentheses should work in any case.

Best,
Arvid

On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:

 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid 

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Hailu, Andreas
Thanks, Piotr. We’ll rerun our apps today with this and get back to you.

// ah

From: Piotr Nowojski  On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] 
Cc: Zhijiang ; user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

I would suspect this:
https://issues.apache.org/jira/browse/FLINK-12070
To be the source of the problems.

There seems to be a hidden configuration option that avoids using memory mapped 
files:

taskmanager.network.bounded-blocking-subpartition-type: file

Could you test if helps?

Piotrek


On 21 Nov 2019, at 15:22, Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:

Hi Zhijiang,

I looked into the container logs for the failure, and didn’t see any specific 
OutOfMemory errors before it was killed. I ran the application using the same 
config this morning on 1.6.4, and it went through successfully. I took a 
snapshot of the memory usage from the dashboard and can send it to you if you 
like for reference.

What stands out to me as suspicious is that on 1.9.1, the application is using 
nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its 
runtime and succeeds. The JVM heap memory itself never exceeds its capacity, 
peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes 
around mapped memory.

// ah

From: Zhijiang mailto:wangzhijiang...@aliyun.com>>
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang

--
From:Hailu, Andreas mailto:andreas.ha...@gs.com>>
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering]
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
mailto:user@flink.apache.org>>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 
'd73996-213.dc.gs.com/10.47.226.218:46003'.
 This indicates that the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 
'

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Piotr Nowojski
Hi,

I would suspect this:
https://issues.apache.org/jira/browse/FLINK-12070 

To be the source of the problems.

There seems to be a hidden configuration option that avoids using memory mapped 
files:

taskmanager.network.bounded-blocking-subpartition-type: file

Could you test if helps?

Piotrek

> On 21 Nov 2019, at 15:22, Hailu, Andreas  wrote:
> 
> Hi Zhijiang,
>  
> I looked into the container logs for the failure, and didn’t see any specific 
> OutOfMemory errors before it was killed. I ran the application using the same 
> config this morning on 1.6.4, and it went through successfully. I took a 
> snapshot of the memory usage from the dashboard and can send it to you if you 
> like for reference.
>  
> What stands out to me as suspicious is that on 1.9.1, the application is 
> using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 
> throughout its runtime and succeeds. The JVM heap memory itself never exceeds 
> its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere 
> in the changes around mapped memory.
>  
> // ah
>  
>  <>From: Zhijiang  
> Sent: Wednesday, November 20, 2019 11:32 PM
> To: Hailu, Andreas [Engineering] ; 
> user@flink.apache.org
> Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Hi Andreas,
>  
> You are running a batch job, so there should be no native memory used by 
> rocked state backend. Then I guess it is either heap memory or direct memory 
> over used. The heap managed memory is mainly used by batch operators and 
> direct memory is used by network shuffle. Can you further check whether there 
> are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? 
> If the used memory exceeds the JVM configuration, it should throw that error. 
> Then we can further narrow down the scope. I can not remember the changes of 
> memory issues for managed memory or network stack, especially it really spans 
> several releases.
>  
> Best,
> Zhijiang
>  
> --
> From:Hailu, Andreas mailto:andreas.ha...@gs.com>>
> Send Time:2019 Nov. 21 (Thu.) 01:03
> To:user@flink.apache.org  >
> Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Going through the release notes today - we tried fiddling with the 
> taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
> success. It still leads to the container running beyond physical memory 
> limits.
>  
> // ah
>  
> From: Hailu, Andreas [Engineering] 
> Sent: Tuesday, November 19, 2019 6:01 PM
> To: 'user@flink.apache.org'  >
> Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
>  
> Hi,
>  
> We’re in the middle of testing the upgrade of our data processing flows from 
> Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
> 1.6.4 now fail on 1.9.1 with the same application resources and input data 
> size. It seems that there have been some changes around how the data is 
> sorted prior to being fed to the CoGroup operator - this is the error that we 
> encounter:
>  
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 15 more
> Caused by: java.lang.Exception: The data preparation for task 'CoGroup 
> (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger Reading Thread' terminated due to an exception: 
> Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. 
> This indicates that the remote task manager was lost.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> ... 1 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Lost 
> connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This 
> indicates that the remote task manager was lost.
> at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
> at 
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)
>  
> I drilled further down into the YARN app logs, and I found that the container 
> was running out of physical memory:
>  
> 2019-11-19 12:49:23,068 INFO  o

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Hailu, Andreas
Hi Zhijiang,

I looked into the container logs for the failure, and didn’t see any specific 
OutOfMemory errors before it was killed. I ran the application using the same 
config this morning on 1.6.4, and it went through successfully. I took a 
snapshot of the memory usage from the dashboard and can send it to you if you 
like for reference.

What stands out to me as suspicious is that on 1.9.1, the application is using 
nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its 
runtime and succeeds. The JVM heap memory itself never exceeds its capacity, 
peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes 
around mapped memory.

// ah

From: Zhijiang 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang

--
From:Hailu, Andreas mailto:andreas.ha...@gs.com>>
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org mailto:user@flink.apache.org>>
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering]
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
mailto:user@flink.apache.org>>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

I drilled further down into the YARN app logs, and I found that the container 
was running out of physical memory:

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e42_1574076744505_9444_01_04 because: Container 
[pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running 
beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory 
used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

This is what leads my suspicions as this resourcing configuration worked just 
fine on 1.6.4

I’m working on getting heap dumps of these applications to try and get a better 
understanding of what’s causing the blowup in physical memory 

Re: Completed job wasn't saved to archive

2019-11-21 Thread Chesnay Schepler
If the archiving fails there should be some log message, like "Failed to 
archive job" or "Could not archive completed job..." .
If nothing of the sort is logged my first instinct would be that the 
operation is being slowed down, _a lot_.


Where are you archiving them to? Could it be the write operation is 
being throttled heavily?


On 21/11/2019 13:48, Pavel Potseluev wrote:

Hi Vino,
Usually Flink archives jobs correctly and the problem is rarely 
reproduced. So I think it isn't a problem with configuration.

Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was canceled:

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:13.294
[Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
Triggering checkpoint 1872 @ 1574092333218 for job
5ec264a20bb5005cdbd8e23a5e59f136.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:37.260
[flink-akka.actor.default-dispatcher-30] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
Completed checkpoint 1872 for job 5ec264a20bb5005cdbd8e23a5e59f136
(568048140 bytes in 23541 ms).

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:13.314
[Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
Triggering checkpoint 1873 @ 1574092393218 for job
5ec264a20bb5005cdbd8e23a5e59f136.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
bureau-user-offers-statistics-AUTORU-USERS_AUTORU
(5ec264a20bb5005cdbd8e23a5e59f136) switched from state RUNNING to
CANCELLING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
Custom File Source (1/1) (934d89cf3d7999b40225dd8009b5493c)
switched from RUNNING to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
kafka-source-moderation-update-journal-autoru -> Filter -> Flat
Map (1/2) (47656a3c4fc70e19622acca31267e41f) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
kafka-source-moderation-update-journal-autoru -> Filter -> Flat
Map (2/2) (be3c4562e65d3d6bdfda4f1632017c6c) switched from RUNNING
to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  -
user-offers-statistics-init-from-file -> Map (1/2)
(4a45ed43b05e4d444e190a44b33514ac) switched from RUNNING to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  -
user-offers-statistics-init-from-file -> Map (2/2)
(bb3be311c5e53abaedb06b4d0148c23f) switched from RUNNING to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Keyed
Reduce -> Map -> Sink: user-offers-statistics-autoru (1/2)
(cfb291033df3f19c9745a6f2fd25e037) switched from RUNNING to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
[flink-akka.actor.default-dispatcher-40] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Keyed
Reduce -> Map -> Sink: user-offers-statistics-autoru (2/2)
(9ce7cd66199513fa97ac44d7617f0c83) switched from RUNNING to CANCELING.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.299
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
Custom File Source (1/1) (934d89cf3d7999b40225dd8009b5493c)
switched from CANCELING to CANCELED.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.300
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
kafka-source-moderation-update-journal-autoru -> Filter -> Flat
Map (1/2) (47656a3c4fc70e19622acca31267e41f) switched from
CANCELING to CANCELED.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.300
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
kafka-source-moderation-update-journal-autoru -> Filter -> Flat
Map (2/2) (be3c4562e65d3d6bdfda4f1632017c6c) switched from
CANCELING to CANCELED.

771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.344
[flink-akka.

Re: Completed job wasn't saved to archive

2019-11-21 Thread Pavel Potseluev
Hi Vino, Usually Flink archives jobs correctly and the problem is rarely reproduced. So I think it isn't a problem with configuration. Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was canceled:771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:13.294 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1872 @ 1574092333218 for job 5ec264a20bb5005cdbd8e23a5e59f136.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:37.260 [flink-akka.actor.default-dispatcher-30] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed checkpoint 1872 for job 5ec264a20bb5005cdbd8e23a5e59f136 (568048140 bytes in 23541 ms).771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:13.314 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering checkpoint 1873 @ 1574092393218 for job 5ec264a20bb5005cdbd8e23a5e59f136.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job bureau-user-offers-statistics-AUTORU-USERS_AUTORU (5ec264a20bb5005cdbd8e23a5e59f136) switched from state RUNNING to CANCELLING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom File Source (1/1) (934d89cf3d7999b40225dd8009b5493c) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (1/2) (47656a3c4fc70e19622acca31267e41f) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (2/2) (be3c4562e65d3d6bdfda4f1632017c6c) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - user-offers-statistics-init-from-file -> Map (1/2) (4a45ed43b05e4d444e190a44b33514ac) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - user-offers-statistics-init-from-file -> Map (2/2) (bb3be311c5e53abaedb06b4d0148c23f) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Keyed Reduce -> Map -> Sink: user-offers-statistics-autoru (1/2) (cfb291033df3f19c9745a6f2fd25e037) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280 [flink-akka.actor.default-dispatcher-40] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Keyed Reduce -> Map -> Sink: user-offers-statistics-autoru (2/2) (9ce7cd66199513fa97ac44d7617f0c83) switched from RUNNING to CANCELING.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.299 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom File Source (1/1) (934d89cf3d7999b40225dd8009b5493c) switched from CANCELING to CANCELED.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.300 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (1/2) (47656a3c4fc70e19622acca31267e41f) switched from CANCELING to CANCELED.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.300 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (2/2) (be3c4562e65d3d6bdfda4f1632017c6c) switched from CANCELING to CANCELED.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.344 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - user-offers-statistics-init-from-file -> Map (2/2) (bb3be311c5e53abaedb06b4d0148c23f) switched from CANCELING to CANCELED.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.345 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - user-offers-statistics-init-from-file -> Map (1/2) (4a45ed43b05e4d444e190a44b33514ac) switched from CANCELING to CANCELED.771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.706 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Keyed Reduce -> Map -> Sink: user-offers-st

Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread Congxian Qiu
Hi
1. I think this issue[1] can help to understand the directory layout
2. chk-6 directory or the metafilePath, for more information, you can ref
to[2][3]
3. every checkpoint contains a meta-file record such, maybe you can ref
to[4], and debug it for more information
4. currently, you need to go through  the restore logic to see if the
files' contents

[1] https://issues.apache.org/jira/browse/FLINK-8531
[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1153
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[4]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


M Singh  于2019年11月21日周四 下午7:44写道:

> Hi Congxian:
>
> For my application i see many uuids under the chk-6 directory ( I posted
> one in the sample above).   I am trying to understand that if I restart the
> application with this checkpoint (which I believe I can just like a
> savepoint - I am using chk-6 as an example below)
> 1. I believe each chk- - is a complete state of checkpoint. Is that
> correct ?
> 2. How to I point it to the checkpoint (chk-6) when I submit the job - Do
> I point it to the jobid or the chk-6 directory ?  I am presuming the latter
> (ie, pointing to the chk06 directory but just want to confirm.
> 3. Secondly, how does the application map the the files under the chk-6 to
> restore the state of each of the stateful operators ?
> 4. Is there any API by which I can examine the contents of the files under
> the chk-6 directory ?
>
> Thanks for your help.
>
> Mans
>
> On Wednesday, November 20, 2019, 09:13:39 PM EST, Congxian Qiu <
> qcx978132...@gmail.com> wrote:
>
>
> Hi
>
> Currently, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) is
> generated by UUID.randomUUID(), so there is not a easy way to map this to
> the assigned in the application.
> In another word, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) 
> belongs
> to one checkpoint, and the assigned in the application belongs to one
> operator, they are different.
>
> Best,
> Congxian
>
>
> M Singh  于2019年11月21日周四 上午6:18写道:
>
> Hi Arvid:
>
> Thanks for your clarification.
>
> I am giving supplying uid for the stateful operators and find the
> following directory structure on in the chkpoint directory:
>
> f4e78cb47f9dc12859558be7d15f39d0/*chk*
> -6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
> The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_id
> Is there a way to map the last part (uuid
> a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid assigned in the
> application ?
>
> Thanks
>
> On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise <
> ar...@ververica.com> wrote:
>
>
> Hi Mans,
>
> just to follow up. There are no limitations for name or uuid.
>
> The uuid will be in fact hashed internally while the StreamGraph is being
> generated, so all characters are allowed.
> The name is only for debugging purposes and web ui. If you use very
> special characters, you may see oddities in logs/web ui, but nothing should
> break.
> Spaces or parentheses should work in any case.
>
> Best,
>
> Arvid
>
> On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:
>
> Thanks Jiayi for your response. I am thinking on the same lines.
>
> Regarding using the same name and uuid, I believe the checkpoint state for
> an operator will be easy to identify if the uuid is the same as name.  But
> I am not sure if having a very long name and uuid or a character like
> parenthesis, etc might cause any issues, so just wanted to check.
>
> Mans
>
> On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao <
> bupt_...@163.com> wrote:
>
>
> Hi Mans!
>
>
> Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s
> name is used in WebUI and metrics reporting, and uid is used to mark the
> uniqueness of operator which is useful when you’re using state[1].
>
>
> > Are there any restrictions on the length of the name and uuid attributes?
>
> It’s pretty much the same as you define a string value, so there is no
> special restrictions on this.
>
>
> > Are there any restrictions on the characters used for name and uuid
> (blank spaces, etc) ?
>
> I’m not a hundred percent sure about this but I run a testing program and
> it works fine.
>
>
> > Can the name and uuid be the same ?
>
> Yes. But uuids accross operators cannot be same.
>
>
> For me I usually set name and uuid for almost every operator, which gives
> me better experience in monitoring and scaling.
>
>
> Hope this helps.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
>
>
>
> Best,
>
> Jiayi Liao
>
>
> At 2019-11-16 18:35:38, "M Singh"  wrote:
>
> Hi:
>
> I am working on a project and wanted to find out what are the best
> practices for setting name and uuid for operator

Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask
YARN's RM for, and if we want Flink's RM to ask for resources proactively
before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN
before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:

> Hi Jingsong,
>
> Thanks for the explanation about the mechanism of the new Flink session
> cluster mode.
>
> Because I mostly use job cluster mode, so did not have a good knowledge of
> the new Flink session cluster mode.
>
> Best,
> Vino
>
> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>
>> Hi Piper and Vino:
>>
>> Current Flink version, the resources of Flink Session cluster
>> are unrestricted, which means if the requested resources exceed the
>> resources owned by the current session, it will apply to the RM of yarn for
>> new resources.
>> And if TaskManager is idle for too long, JM will release it to yarn. This
>> behavior is controlled by resourcemanager.taskmanager-timeout . You can set
>> a suitable value for it to enjoy the benefits of reuse process and dynamic
>> resources.
>>
>> From this point of view, I think session mode is a good choice.
>> Is this what you want? Piper.
>>
>> Best,
>> Jingsong Lee
>>
>>
>>
>> On Thu, Nov 21, 2019 at 2:25 PM vino yang  wrote:
>>
>>> Hi Piper,
>>>
>>> The understanding of two deploy modes For Flink on Yarn is right.
>>>
>>> AFAIK, The single job (job cluster) mode is more popular than Session
>>> mode.
>>>
>>> Because job cluster mode, Flink let YARN manage resources as far as
>>> possible. And this mode can keep isolation from other jobs.
>>>
>>> IMO, we do not need to combine their advantages. Let YARN do the things
>>> that it is good at. What do you think?
>>>
>>> Best,
>>> Vino
>>>
>>>
>>> Piper Piper  于2019年11月21日周四 上午11:55写道:
>>>
 Hi Vino,

 I want to implement Resource Elasticity. In doing so, I have read that
 Flink with YARN has two modes: Job and Session.

 In Job mode, Flink’s Resource Manager requests YARN for containers with
 TMs, and then gives the containers back to YARN upon job completion.

 In Session mode, Flink already has the TMs that are persistent.

 I want to combine the advantages of Job and Session mode, i.e. Flink
 will have persistent TMs/containers and request YARN for more
 TMs/containers when needed (or release TMs/containers back to YARN).

 Thank you,

 Piper

 On Wed, Nov 20, 2019 at 9:39 PM vino yang 
 wrote:

> Hi Piper,
>
> Can you share more reason and details of your requirements.
>
> Best,
> Vino
>
> Piper Piper  于2019年11月21日周四 上午5:48写道:
>
>> Hi,
>>
>> How can I make Flink's Resource Manager request YARN to spin up new
>> (or destroy/reclaim existing) TaskManagers in YARN containers?
>>
>> Preferably at runtime (i.e. dynamically).
>>
>> Thank you
>>
>> Piper
>>
>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: Cron style for checkpoint

2019-11-21 Thread Congxian Qiu
Hi

thanks for your explanation, what you want is to disable periodic
checkpoint in some time duration, and at other times the periodic
checkpoint is doing as normal. Currently, Flink does not support this, as
you've created an issue for this, we can track this in the issue side. for
now, if you really want this, you can change the logic in
`CheckpointCoordinator#triggerCheckpoint`.

Best,
Congxian


shuwen zhou  于2019年11月21日周四 下午4:57写道:

> Hi Yun and Congxian,
> I would actually want checkpoint to avoid being triggered on a certain
> time. It still remains as system mechanism just avoid being triggered at a
> certain range of time.
> Waiting for the checkpoint to timeout still waste CPU&disk IO resources
> since it was being triggered. I would like it to avoid from being triggered
> at first.
> I suppose use a cron style would not break checkpoint's system mechanism.
> Savepoint, on the other hand, is not incremental update, trigger a
> savepoint every 10 mins will waste a lot of disk and another script is
> required to remove outdated savepoint. I suppose savepoint is being used in
> upgrade/restart scenario.
> A cron style checkpoint time config will provide a lot flexibility. Thanks.
>
>
> On Thu, 21 Nov 2019 at 16:28, Yun Tang  wrote:
>
>> Hi Shuwen
>>
>>
>>
>> Conceptually, checkpoints in Flink behaves more like a system mechanism
>> to achieve fault tolerance and transparent for users. On the other hand,
>> savepoint in Flink behaves more like a user control behavior, can savepoint
>> not satisfy your demands for crontab?
>>
>>
>>
>> Best
>>
>> Yun Tang
>>
>>
>>
>> *From: *Congxian Qiu 
>> *Date: *Thursday, November 21, 2019 at 2:27 PM
>> *To: *shuwen zhou 
>> *Cc: *Jiayi Liao , dev , user <
>> user@flink.apache.org>
>> *Subject: *Re: Cron style for checkpoint
>>
>>
>>
>> Hi
>>
>>
>>
>> Currently, Flink does not support such feature, from what you describe,
>> does set an appropriate timeout for checkpoint can solve your problem?
>>
>>
>> Best,
>>
>> Congxian
>>
>>
>>
>>
>>
>> shuwen zhou  于2019年11月21日周四 下午12:06写道:
>>
>> Hi Jiayi,
>>
>> It would be great if Flink could have a user defined interface for user
>> to implement to control checkpoint behavior, at least for time related
>> behavior.
>>
>> I brought up a wish on JIRA [1], perhaps it described clearly enough.
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14884
>>
>>
>>
>>
>>
>> On Thu, 21 Nov 2019 at 11:40, Jiayi Liao  wrote:
>>
>> Hi Shuwen,
>>
>>
>>
>> As far as I know, Flink can only support checkpoint with a fixed
>> interval.
>>
>>
>>
>> However I think the flexible mechanism of triggering checkpoint is worth
>> working on, at least from my perspective. And it may not only be a cron
>> style. In our business scenario, the data traffic usually reaches the peek
>> of the day after 20:00, which we want to increase the interval of
>> checkpoint otherwise it’ll introduce more disk and network IO.
>>
>>
>>
>> Just want to share something about this :)
>>
>>
>>
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>
>>
>>
>> At 2019-11-21 10:20:47, "shuwen zhou"  wrote:
>>
>> >Hi Community,
>>
>> >I would like to know if there is a existing function to support cron style
>>
>> >checkpoint?
>>
>> >The case is, our data traffic is huge on HH:30 for each hour. We don't wont
>>
>> >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
>>
>> >for checkpoint would be nice. If a checkpoint is already in progress when
>>
>> >minutes is 15 or 45, there would be a config value to trigger a new
>>
>> >checkpoint or pass.
>>
>> >
>>
>> >--
>>
>> >Best Wishes,
>>
>> >Shuwen Zhou 
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best Wishes,
>>
>> Shuwen Zhou 
>>
>>
>>
>>
>
> --
> Best Wishes,
> Shuwen Zhou 
>
>


Re: Re:Apache Flink - Operator name and uuid best practices

2019-11-21 Thread M Singh
 Hi Congxian:
For my application i see many uuids under the chk-6 directory ( I posted one in 
the sample above).   I am trying to understand that if I restart the 
application with this checkpoint (which I believe I can just like a savepoint - 
I am using chk-6 as an example below)1. I believe each chk- - is a complete 
state of checkpoint. Is that correct ?2. How to I point it to the checkpoint 
(chk-6) when I submit the job - Do I point it to the jobid or the chk-6 
directory ?  I am presuming the latter (ie, pointing to the chk06 directory but 
just want to confirm.3. Secondly, how does the application map the the files 
under the chk-6 to restore the state of each of the stateful operators ?4. Is 
there any API by which I can examine the contents of the files under the chk-6 
directory ?
Thanks for your help.
Mans
On Wednesday, November 20, 2019, 09:13:39 PM EST, Congxian Qiu 
 wrote:  
 
 Hi
Currently, the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) is 
generated by UUID.randomUUID(), so there is not a easy way to map this to the 
assigned in the application.In another word, the last part (uuid 
a4d87cda-2afd-47d4-8d3f-b0658466fb2d) belongs to one checkpoint, and the 
assigned in the application belongs to one operator, they are different.
Best,Congxian

M Singh  于2019年11月21日周四 上午6:18写道:

 Hi Arvid:
Thanks for your clarification.
I am giving supplying uid for the stateful operators and find the following 
directory structure on in the chkpoint directory:

f4e78cb47f9dc12859558be7d15f39d0/chk-6/a4d87cda-2afd-47d4-8d3f-b0658466fb2d
The first part f4e78cb47f9dc12859558be7d15f39d0 is the job_idIs there a way to 
map the last part (uuid a4d87cda-2afd-47d4-8d3f-b0658466fb2d) -  to the uid 
assigned in the application ?

Thanks 
On Wednesday, November 20, 2019, 07:52:49 AM EST, Arvid Heise 
 wrote:  
 
 Hi Mans,
just to follow up. There are no limitations for name or uuid. 

The uuid will be in fact hashed internally while the StreamGraph is being 
generated, so all characters are allowed.The name is only for debugging 
purposes and web ui. If you use very special characters, you may see oddities 
in logs/web ui, but nothing should break. 
Spaces or parentheses should work in any case.

Best,
Arvid

On Sat, Nov 16, 2019 at 6:40 PM M Singh  wrote:

 Thanks Jiayi for your response. I am thinking on the same lines.  
Regarding using the same name and uuid, I believe the checkpoint state for an 
operator will be easy to identify if the uuid is the same as name.  But I am 
not sure if having a very long name and uuid or a character like parenthesis, 
etc might cause any issues, so just wanted to check.
Mans
On Saturday, November 16, 2019, 11:19:08 AM EST, Jiayi Liao 
 wrote:  
 
 
Hi Mans!




Firstly let’s see how operator’s name and uid is used. AFAIK, operator’s name 
is used in WebUI and metrics reporting, and uid is used to mark the uniqueness 
of operator which is useful when you’re using state[1].




> Are there any restrictions on the length of the name and uuid attributes?

It’s pretty much the same as you define a string value, so there is no special 
restrictions on this.




> Are there any restrictions on the characters used for name and uuid (blank 
>spaces, etc) ?

I’m not a hundred percent sure about this but I run a testing program and it 
works fine.




> Can the name and uuid be the same ? 

Yes. But uuids accross operators cannot be same.




For me I usually set name and uuid for almost every operator, which gives me 
better experience in monitoring and scaling.




Hope this helps.







[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state







Best,



Jiayi Liao


At 2019-11-16 18:35:38, "M Singh"  wrote:
 
Hi:
I am working on a project and wanted to find out what are the best practices 
for setting name and uuid for operators:
1. Are there any restrictions on the length of the name and uuid attributes ?2. 
Are there any restrictions on the characters used for name and uuid (blank 
spaces, etc) ?3. Can the name and uuid be the same ?
Please let me know if there is any other advice.
Thanks
Mans



 
  
  
  

回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
hi arujit,  

blink planner with flink-1.9 supports this query.

the reason is both planners do not support complex expressions like INTERVAL 
'7' HOUR + INTERVAL '30' MINUTE when transforming window to 
LogicalWindowAggregate node now.

why blink planner supports this query?
 the optimization order between two planners are different,  Flink planner 
(a.k.a. old planner) will transform window to LogicalWindowAggregate node 
first, and then simplify the constant expressions (like INTERVAL '7' HOUR + 
INTERVAL '30' MINUTE, which could be simplified to 2700:INTERVAL HOUR TO 
MINUTE). While blink planner's approach is just the opposite. (simplify 
expression first, and then transform window). 

so, you could try blink planner.

thanks, 
godfrey




--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:31
收件人:贺小令(晓令) 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

We are using flink-1.6.2. But when working with flink-1.9 I am still getting 
this error.

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window intervals with 
millisecond resolution are supported.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:53 PM 贺小令(晓令)  wrote:

hi arujit,
Which Flink version are you using?


thanks, 
godfrey


--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:21
收件人:贺小令(晓令) ; user 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

Thanks for your reply. But now I am getting this error :

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window descriptors are 
supported.
at 
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)
Caused by: org.apache.flink.table.api.TableException: Only constant window 
descriptors are supported.
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令)  wrote:
please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey

--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit
   

回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
hi arujit,
Which Flink version are you using?


thanks, 
godfrey



--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:21
收件人:贺小令(晓令) ; user 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

Thanks for your reply. But now I am getting this error :

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window descriptors are 
supported.
at 
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)
Caused by: org.apache.flink.table.api.TableException: Only constant window 
descriptors are supported.
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令)  wrote:

please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey

--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit
  

Savepoints and checkpoints

2019-11-21 Thread min.tan
Hi,

Are Flink savepoints and checkpoitns still vlaid after some data entity changes 
e.g. Kafka topic name changes? I expect the answer is "No"?
Similarly, are Flink savepoints and checkpoitns still valid after some job 
graph changes e.g. one stateful operator splits into two? I expect the answer 
is "No"?

Regards,

Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi, godfrey,

Thanks for your reply. But now I am getting this error :












*Exception in thread "main"
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.table.api.TableException: Only constant window descriptors
are supported.at
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)Caused
by: org.apache.flink.table.api.TableException: Only constant window
descriptors are supported.at
org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
  at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
  at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
  at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
  at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
  at
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
  at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
  at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
  at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)*

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令) 
wrote:

> please try  this approach: interval + interval
>
> like this:
> SELECT count(1) AS event_count ,
> TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS
> window_timestamp
> FROM `data_stream`
> GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)
>
> thanks,
> godfrey
>
> --
> 发件人:Arujit Pradhan 
> 发送时间:2019年11月21日(星期四) 16:23
> 收件人:user 
> 主 题:Compound Time interval in SQL queries
>
> Hi all,
>
> Is there a way to define a compound time interval(that can consist of both
> HOUR and MINUTE) in windows in a Flink SQL query.
>
> For example, we want to do something like this:
> SELECT count(1) AS event_count ,
> TUMBLE_END(rowtime,
> INTERVAL '7' HOUR
> AND '30' MINUTE) AS window_timestamp
> FROM `data_stream`
> GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )
>
> We can not even convert this to Minutes as we are getting this error :
>  *Interval field value  exceeds precision of MINUTE(2) field*
>
> We were going through Calcite documentation and could not find any
> workaround on this.
>
> Thanks and regards,
> arujit
>
>


回复:Compound Time interval in SQL queries

2019-11-21 Thread 贺小令(晓令)
please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey


--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit
 

Re: Cron style for checkpoint

2019-11-21 Thread shuwen zhou
Hi Yun and Congxian,
I would actually want checkpoint to avoid being triggered on a certain
time. It still remains as system mechanism just avoid being triggered at a
certain range of time.
Waiting for the checkpoint to timeout still waste CPU&disk IO resources
since it was being triggered. I would like it to avoid from being triggered
at first.
I suppose use a cron style would not break checkpoint's system mechanism.
Savepoint, on the other hand, is not incremental update, trigger a
savepoint every 10 mins will waste a lot of disk and another script is
required to remove outdated savepoint. I suppose savepoint is being used in
upgrade/restart scenario.
A cron style checkpoint time config will provide a lot flexibility. Thanks.


On Thu, 21 Nov 2019 at 16:28, Yun Tang  wrote:

> Hi Shuwen
>
>
>
> Conceptually, checkpoints in Flink behaves more like a system mechanism to
> achieve fault tolerance and transparent for users. On the other hand,
> savepoint in Flink behaves more like a user control behavior, can savepoint
> not satisfy your demands for crontab?
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Congxian Qiu 
> *Date: *Thursday, November 21, 2019 at 2:27 PM
> *To: *shuwen zhou 
> *Cc: *Jiayi Liao , dev , user <
> user@flink.apache.org>
> *Subject: *Re: Cron style for checkpoint
>
>
>
> Hi
>
>
>
> Currently, Flink does not support such feature, from what you describe,
> does set an appropriate timeout for checkpoint can solve your problem?
>
>
> Best,
>
> Congxian
>
>
>
>
>
> shuwen zhou  于2019年11月21日周四 下午12:06写道:
>
> Hi Jiayi,
>
> It would be great if Flink could have a user defined interface for user to
> implement to control checkpoint behavior, at least for time related
> behavior.
>
> I brought up a wish on JIRA [1], perhaps it described clearly enough.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14884
>
>
>
>
>
> On Thu, 21 Nov 2019 at 11:40, Jiayi Liao  wrote:
>
> Hi Shuwen,
>
>
>
> As far as I know, Flink can only support checkpoint with a fixed interval.
>
>
>
> However I think the flexible mechanism of triggering checkpoint is worth
> working on, at least from my perspective. And it may not only be a cron
> style. In our business scenario, the data traffic usually reaches the peek
> of the day after 20:00, which we want to increase the interval of
> checkpoint otherwise it’ll introduce more disk and network IO.
>
>
>
> Just want to share something about this :)
>
>
>
>
>
> Best,
>
> Jiayi Liao
>
>
>
>
> At 2019-11-21 10:20:47, "shuwen zhou"  wrote:
>
> >Hi Community,
>
> >I would like to know if there is a existing function to support cron style
>
> >checkpoint?
>
> >The case is, our data traffic is huge on HH:30 for each hour. We don't wont
>
> >checkpoint to fall in that range of time. A cron like 15,45 * * * * to set
>
> >for checkpoint would be nice. If a checkpoint is already in progress when
>
> >minutes is 15 or 45, there would be a config value to trigger a new
>
> >checkpoint or pass.
>
> >
>
> >--
>
> >Best Wishes,
>
> >Shuwen Zhou 
>
>
>
>
>
>
>
>
>
> --
>
> Best Wishes,
>
> Shuwen Zhou 
>
>
>
>

-- 
Best Wishes,
Shuwen Zhou 


Re: StreamingFileSink duplicate data

2019-11-21 Thread Paul Lam
Hi,

StreamingFileSink would not remove committed files, so if you use a non-latest 
checkpoint to restore state, you may need to perform a manual cleanup.

WRT the part id issue, StreamingFileSink will track the global max part number, 
and use this value + 1 as the new id upon restoring. In this way, we avoid file 
name conflicts with the previous execution (see[1]).

[1] 
https://github.com/apache/flink/blob/93dfdd05a84f933473c7b22437e12c03239f9462/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L276

Best,
Paul Lam

> 在 2019年11月21日,10:01,Lei Nie  写道:
> 
> Hello,
> I would like clarification on the StreamingFileSink, thank you.
> 
> From my testing, it seems that resuming job from checkpoint does not also 
> restore the rolling part counter.
> 
> E.g, job may have stopped with last file:
> part-6-71
> 
> But when resuming from most recent checkpoint:
> part-6-89
> (There is unexplained gap).
> 
> This is a problem if I am having an issue with my job, and need to roll back 
> more than one checkpoint. After rolling back to the 4th last checkpoint, e.g, 
> the data will be written into different part file names, causing duplication.
> -
> For example, checkpoints:
> chk-17, chk-18, chk-19, chk-20
> 
> Original data:
> part-1-5, part-1-6, part-1-7
> 
> Rollback to chk-17, which writes part-1-18, but with the same data as 
> part-1-5! This is duplicate.
> --
> Am I correct? How to avoid this?



Re: Cron style for checkpoint

2019-11-21 Thread Yun Tang
Hi Shuwen

Conceptually, checkpoints in Flink behaves more like a system mechanism to 
achieve fault tolerance and transparent for users. On the other hand, savepoint 
in Flink behaves more like a user control behavior, can savepoint not satisfy 
your demands for crontab?

Best
Yun Tang

From: Congxian Qiu 
Date: Thursday, November 21, 2019 at 2:27 PM
To: shuwen zhou 
Cc: Jiayi Liao , dev , user 

Subject: Re: Cron style for checkpoint

Hi

Currently, Flink does not support such feature, from what you describe, does 
set an appropriate timeout for checkpoint can solve your problem?

Best,
Congxian


shuwen zhou mailto:jaco...@gmail.com>> 于2019年11月21日周四 
下午12:06写道:
Hi Jiayi,
It would be great if Flink could have a user defined interface for user to 
implement to control checkpoint behavior, at least for time related behavior.
I brought up a wish on JIRA [1], perhaps it described clearly enough.

[1] https://issues.apache.org/jira/browse/FLINK-14884


On Thu, 21 Nov 2019 at 11:40, Jiayi Liao 
mailto:bupt_...@163.com>> wrote:

Hi Shuwen,



As far as I know, Flink can only support checkpoint with a fixed interval.



However I think the flexible mechanism of triggering checkpoint is worth 
working on, at least from my perspective. And it may not only be a cron style. 
In our business scenario, the data traffic usually reaches the peek of the day 
after 20:00, which we want to increase the interval of checkpoint otherwise 
it’ll introduce more disk and network IO.



Just want to share something about this :)





Best,

Jiayi Liao


At 2019-11-21 10:20:47, "shuwen zhou" 
mailto:jaco...@gmail.com>> wrote:

>Hi Community,

>I would like to know if there is a existing function to support cron style

>checkpoint?

>The case is, our data traffic is huge on HH:30 for each hour. We don't wont

>checkpoint to fall in that range of time. A cron like 15,45 * * * * to set

>for checkpoint would be nice. If a checkpoint is already in progress when

>minutes is 15 or 45, there would be a config value to trigger a new

>checkpoint or pass.

>

>--

>Best Wishes,

>Shuwen Zhou 







--
Best Wishes,
Shuwen Zhou



Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi all,

Is there a way to define a compound time interval(that can consist of both
HOUR and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
TUMBLE_END(rowtime,
INTERVAL '7' HOUR
AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
 *Interval field value  exceeds precision of MINUTE(2) field*

We were going through Calcite documentation and could not find any
workaround on this.

Thanks and regards,
arujit