Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

2019-04-11 Thread Jins George
Thank you Guowei. That was the trick!

By default jobs from completed section are expired and removed in 1 hour.  I 
have increased jobstore.expiration-time and now completed jobs are retained.

Thanks,
Jins

From: Guowei Ma 
Date: Wednesday, April 10, 2019 at 3:29 AM
To: Jins George 
Cc: Timothy Victor , user 
Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

I am not very sure about this problem. But you could try to increase 
jobstore.expiration-time in config.
Best,
Guowei


Jins George mailto:jins.geo...@aeris.net>> 于2019年4月10日周三 
下午1:01写道:
Any input on this UI behavior ?

Thanks,
Jins

From: Timothy Victor mailto:vict...@gmail.com>>
Date: Monday, April 8, 2019 at 10:47 AM
To: Jins George mailto:jins.geo...@aeris.net>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

I face the same issue in Flink 1.7.1.

Would be good to know a solution.

Tim

On Mon, Apr 8, 2019, 12:45 PM Jins George 
mailto:jins.geo...@aeris.net>> wrote:
Hi,

I am facing a weird problem in which jobs from ‘Completed Jobs’ section in 
Flink 1.7.2 UI disappear.  Looking at the job manager logs, I see the job was 
failed and restarted  ‘restart-strategy.fixed-delay.attempts’ times and the 
JobMaster was stopped.
I was able to see the job in Completed Jobs section with the status as FAILED, 
but after some time,  I don’t see it any more.  The jobmanager was never 
restarted, so I expected the Failed or completed jobs to appear in Completed 
Jobs section.

Any idea what might be happening ?


JobManager.log:

2019-04-06 18:21:10,638 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job dwellalert-ubuntu-0403174608-698009a0 
(b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it.
2019-04-06 18:21:10,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
- Stopping the JobMaster for job 
dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d).

Restart Strategy Conf:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s


Thanks
Jins George


Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

2019-04-09 Thread Jins George
Any input on this UI behavior ?

Thanks,
Jins

From: Timothy Victor 
Date: Monday, April 8, 2019 at 10:47 AM
To: Jins George 
Cc: user 
Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section

I face the same issue in Flink 1.7.1.

Would be good to know a solution.

Tim

On Mon, Apr 8, 2019, 12:45 PM Jins George 
mailto:jins.geo...@aeris.net>> wrote:
Hi,

I am facing a weird problem in which jobs from ‘Completed Jobs’ section in 
Flink 1.7.2 UI disappear.  Looking at the job manager logs, I see the job was 
failed and restarted  ‘restart-strategy.fixed-delay.attempts’ times and the 
JobMaster was stopped.
I was able to see the job in Completed Jobs section with the status as FAILED, 
but after some time,  I don’t see it any more.  The jobmanager was never 
restarted, so I expected the Failed or completed jobs to appear in Completed 
Jobs section.

Any idea what might be happening ?


JobManager.log:

2019-04-06 18:21:10,638 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job dwellalert-ubuntu-0403174608-698009a0 
(b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it.
2019-04-06 18:21:10,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
- Stopping the JobMaster for job 
dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d).

Restart Strategy Conf:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s


Thanks
Jins George


Flink 1.7.2 UI : Jobs removed from Completed Jobs section

2019-04-08 Thread Jins George
Hi,

I am facing a weird problem in which jobs from ‘Completed Jobs’ section in 
Flink 1.7.2 UI disappear.  Looking at the job manager logs, I see the job was 
failed and restarted  ‘restart-strategy.fixed-delay.attempts’ times and the 
JobMaster was stopped.
I was able to see the job in Completed Jobs section with the status as FAILED, 
but after some time,  I don’t see it any more.  The jobmanager was never 
restarted, so I expected the Failed or completed jobs to appear in Completed 
Jobs section.

Any idea what might be happening ?


JobManager.log:

2019-04-06 18:21:10,638 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job dwellalert-ubuntu-0403174608-698009a0 
(b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it.
2019-04-06 18:21:10,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
- Stopping the JobMaster for job 
dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d).

Restart Strategy Conf:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s


Thanks
Jins George


Re: Flink 1.6 Yarn Session behavior

2019-02-18 Thread Jins George
Thank you Gary. That was helpful.

Thanks,

Jins George

On 2/17/19 10:03 AM, Gary Yao wrote:
Hi Jins George,

Every TM brings additional overhead, e.g., more heartbeat messages. However, a
cluster with 28 TMs would not be considered big as there are users that are
running Flink applications on thousands of cores [1][2].

Best,
Gary

[1] 
https://flink.apache.org/flink-architecture.html#run-applications-at-any-scale
[2] 
https://de.slideshare.net/FlinkForward/flink-forward-sf-2017-stephan-ewen-experiences-running-flink-at-very-large-scale

On Thu, Feb 14, 2019 at 6:59 PM Jins George 
mailto:jins.geo...@aeris.net>> wrote:

Thanks Gary. Understood the behavior.

I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes, that 
will end up 28 taskmanagers and 1 job manager. I was wondering if this can 
bring additional burden on jobmanager? Is it recommended?

Thanks,

Jins George

On 2/14/19 8:49 AM, Gary Yao wrote:
Hi Jins George,

This has been asked before [1]. The bottom line is that you currently cannot
pre-allocate TMs and distribute your tasks evenly. You might be able to
achieve a better distribution across hosts by configuring fewer slots in your
TMs.

Best,
Gary

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html


On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi,

I'm forwarding this question to Gary (CC'ed), who most likely would have an 
answer for your question here.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 8:33 AM Jins George 
mailto:jins.geo...@aeris.net>> wrote:

Hello community,

I am trying to  upgrade a  Flink Yarn session cluster running BEAM pipelines  
from version 1.2.0 to 1.6.3.

Here is my session start command: yarn-session.sh -d -n 4  -jm 1024 -tm 3072 -s 
7

Because of the dynamic resource allocation,  no taskmanager gets created 
initially. Now once I submit a job with parallelism 5, I see that 1 
task-manager gets created and all 5 parallel instances are scheduled on the 
same taskmanager( because I have 7 slots).  This can create hot spot as only 
one physical node ( out of 4 in my case) is utilized for processing.

I noticed the legacy mode, which would provision all task managers at cluster 
creation, but since legacy mode is expected to go away soon, I didn't want to 
try that route.

Is there a way I can configure the multiple jobs or parallel instances of same 
job spread across all the available Yarn nodes and continue using the 'new' 
mode ?

Thanks,

Jins George


Re: Flink 1.6 Yarn Session behavior

2019-02-14 Thread Jins George
Thanks Gary. Understood the behavior.

I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes, that 
will end up 28 taskmanagers and 1 job manager. I was wondering if this can 
bring additional burden on jobmanager? Is it recommended?

Thanks,

Jins George

On 2/14/19 8:49 AM, Gary Yao wrote:
Hi Jins George,

This has been asked before [1]. The bottom line is that you currently cannot
pre-allocate TMs and distribute your tasks evenly. You might be able to
achieve a better distribution across hosts by configuring fewer slots in your
TMs.

Best,
Gary

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html


On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi,

I'm forwarding this question to Gary (CC'ed), who most likely would have an 
answer for your question here.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 8:33 AM Jins George 
mailto:jins.geo...@aeris.net>> wrote:

Hello community,

I am trying to  upgrade a  Flink Yarn session cluster running BEAM pipelines  
from version 1.2.0 to 1.6.3.

Here is my session start command: yarn-session.sh -d -n 4  -jm 1024 -tm 3072 -s 
7

Because of the dynamic resource allocation,  no taskmanager gets created 
initially. Now once I submit a job with parallelism 5, I see that 1 
task-manager gets created and all 5 parallel instances are scheduled on the 
same taskmanager( because I have 7 slots).  This can create hot spot as only 
one physical node ( out of 4 in my case) is utilized for processing.

I noticed the legacy mode, which would provision all task managers at cluster 
creation, but since legacy mode is expected to go away soon, I didn't want to 
try that route.

Is there a way I can configure the multiple jobs or parallel instances of same 
job spread across all the available Yarn nodes and continue using the 'new' 
mode ?

Thanks,

Jins George


Flink 1.6 Yarn Session behavior

2019-02-12 Thread Jins George
Hello community,

I am trying to  upgrade a  Flink Yarn session cluster running BEAM pipelines  
from version 1.2.0 to 1.6.3.

Here is my session start command: yarn-session.sh -d -n 4  -jm 1024 -tm 3072 -s 
7

Because of the dynamic resource allocation,  no taskmanager gets created 
initially. Now once I submit a job with parallelism 5, I see that 1 
task-manager gets created and all 5 parallel instances are scheduled on the 
same taskmanager( because I have 7 slots).  This can create hot spot as only 
one physical node ( out of 4 in my case) is utilized for processing.

I noticed the legacy mode, which would provision all task managers at cluster 
creation, but since legacy mode is expected to go away soon, I didn't want to 
try that route.

Is there a way I can configure the multiple jobs or parallel instances of same 
job spread across all the available Yarn nodes and continue using the 'new' 
mode ?

Thanks,

Jins George


Re: Flink on YARN || Monitoring REST API Not Working || Please help

2018-01-31 Thread Jins George
8081 is the default port for standalone cluster.

For Yarn flink cluster,
Go to the Running applications and from the list of applications.
 You can get the Flink UI by clicking Application master link for the yarn 
session.

Regards,
Jins

On Feb 1, 2018, at 8:06 AM, Raja.Aravapalli 
> wrote:

Hi,

I have deployed Flink cluster on Hadoop YARN and I am able to trigger jobs and 
run it.

But, I am not able to work the running flink cluster’s Montoring REST API!

As listed here @ 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html
 I am trying to connect using url http://hostname:8081/jobs where hostname is 
jobmanager host.

Could n’t find, how to fix this.

Can someone please share your thoughts.


Thanks a lot.


Regards,
Raja.







Re: Back-pressure Status shows OK but records are backed up in kafka

2018-01-08 Thread Jins George
Thank You Ufuk & Shannon. Since my kafka consumer is 
UnboundedKafkaSource from BEAM, not sure if  records-lag-max metrics is 
exposed. Let me research further.


Thanks,
Jins George
On 01/08/2018 10:11 AM, Shannon Carey wrote:

Right, backpressure only measures backpressure on the inside of the Flink job. 
Ie. between Flink tasks.

Therefore, it’s up to you to monitor whether your Flink job is “keeping up” 
with the source stream. If you’re using Kafka, there’s a metric that the 
consumer library makes available. For example, for one of our jobs, in Graphite 
we have a metric that matches:

aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max,
 18, 19)

The “$Job” is a variable which allows you to select the job. You can see that I 
have wildcards on other elements of the path, for example the TaskManager id, 
the operator name, the Task index, etc. Your metric is probably rooted 
somewhere else, but the thing you’re looking for is under 
operator.*.*.KafkaConsumer.records-lag-max.

Flink manages its offsets itself, rather than acting like a “normal” consumer 
which commits offsets to Kafka. However, in the docs I see that 
“setCommitOffsetsOnCheckpoints()” is enabled by default.  So, theoretically you 
can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor 
or https://github.com/linkedin/Burrow etc. which polls Kafka itself and 
produces metrics about consumer lag. However, for some reason, I don’t see our 
Flink consumer metrics showing up in our lag monitoring tool or in the Kafka 
command-line tools, so I’m not sure what’s going on there. Maybe it’s because 
Flink doesn’t show up as a consumer group? At first I thought that it might be 
because we’re not setting the “group.id” property, but as it turns out we are 
indeed setting it. In any case, we have to use the job’s metrics, and monitor 
that the job is up, rather than monitoring the offset in Kafka itself.

-Shannon

On 1/8/18, 1:52 AM, "Ufuk Celebi" <u...@apache.org> wrote:

 Hey Jins,
 
 our current back pressure tracking mechanism does not work with Kafka

 sources. To gather back pressure indicators we sample the main task
 thread of a subtask. For most tasks, this is the thread that emits
 records downstream (e.g. if you have a map function) and everything
 works as expected. In case of the Kafka source though there is a
 separate thread that consumes from Kafka and emits the records.
 Therefore we sample the "wrong" thread and don't observe any
 indicators for back pressure. :-( Unfortunately, this was not taking
 into account when back pressure sampling was implemented.
 
 There is this old issue to track this:

 https://issues.apache.org/jira/browse/FLINK-3456
 
 I'm not aware of any other way to track this situation. Maybe others

 can chime in here...
 
 – Ufuk
 
 
 On Mon, Jan 8, 2018 at 8:16 AM, Jins George <jins.geo...@aeris.net> wrote:

 > I have a Beam Pipeline consuming records from Kafka doing some
 > transformations and writing it to Hbase. I faced an issue in which 
records
 > were writing to Hbase at a slower rate than the incoming messages to 
Kafka
 > due to a temporary surge in the incoming traffic.
 >
 > From the flink UI, if I check the back pressure status, it shows OK. I 
have
 > one task which has all the operators including source.
 >
 > Any idea why backpressure indicator would show OK, but messages are 
backed
 > up in Kafka.
 >
 > Is there any other mechanism/metrics by which I can identify this 
situation
 > ?
     >
     > I'm running Flink 1.2/w beam 2.0.
 >
 > Thanks,
 > Jins George
 
 






Back-pressure Status shows OK but records are backed up in kafka

2018-01-07 Thread Jins George
I have a Beam Pipeline consuming records from Kafka doing some 
transformations and writing it to Hbase. I faced an issue in which 
records were writing to Hbase at a slower rate than the incoming 
messages to Kafka due to a temporary surge in the incoming traffic.


From the flink UI, if I check the back pressure status, it shows OK. I 
have one task which has all the operators including source.


Any idea why backpressure indicator would show OK, but messages are 
backed up in Kafka.


Is there any other mechanism/metrics by which I can identify this 
situation ?


I'm running Flink 1.2/w beam 2.0.

Thanks,
Jins George


Re: Issue with Checkpoint restore( Beam pipeline)

2017-11-30 Thread Jins George
Thanks Aljoscha.  I have not tried with 1.3. I will try and check the 
behavior.


Regarding setting UIDs to operators from Beam, do you know if thats 
something planned for a near future release ?


Thanks,
Jins George

On 11/30/2017 01:48 AM, Aljoscha Krettek wrote:

Hi,

I think you might be running into a problem that is hard to solve with 
Flink 1.2 and Beam. As you noticed, it's a problem that Beam doesn't 
assign UIDs to operators, which is a problem. Flink 1.3 and even more 
Flink 1.4 are a bit more lenient in accepting changes to the graph, so 
you might have better luck when trying it with that. Did you try using 
a newer Beam/Flink version? Flink 1.4 should be out next week and 
shortly after that I'll also update the Beam dependency.


Best,
Aljoscha

On 29. Nov 2017, at 23:52, Jins George <jins.geo...@aeris.net 
<mailto:jins.geo...@aeris.net>> wrote:


Hi,

I am running a Beam Pipeline on Flink 1.2 and facing an issue in 
restoring a job from checkpoint. If I modify my beam pipeline to add 
a new operator and  try to restore from the externalized checkpoint, 
I get the error


/java.lang.IllegalStateException: Invalid Invalid number of operator 
states. Found :56. Expected: 58//
//at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)//

//at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)//
//at java.lang.Thread.run(Thread.java:745)/

From the savepoint guide [1], new operator added should be 
initialized without any state.  Any idea why this error is reported.


Also note, I am not setting a ID to my operator ( because Flink 
runner in Beam does set the operator name user provided in pipeline 
creation)


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html



Thanks,
Jins George






Issue with Checkpoint restore( Beam pipeline)

2017-11-29 Thread Jins George

Hi,

I am running a Beam Pipeline on Flink 1.2 and facing an issue in 
restoring a job from checkpoint. If I modify my beam pipeline to add a 
new operator and  try to restore from the externalized checkpoint, I get 
the error


/java.lang.IllegalStateException: Invalid Invalid number of operator 
states. Found :56. Expected: 58//
//at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)//
//at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)//

//at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)//
//at java.lang.Thread.run(Thread.java:745)/

From the savepoint guide [1], new operator added should be initialized 
without any state.  Any idea why this error is reported.


Also note, I am not setting a ID to my operator ( because Flink runner 
in Beam does set the operator name user provided in pipeline creation)


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html



Thanks,
Jins George


Re: System properties when submitting flink job to YARN Session

2017-07-12 Thread Jins George

Hi Aljoscha,

I am still using Beam on Flink. I have one yarn session running multiple 
streaming jobs. The application jar contains some environment specific 
run time properties( like ip addresses, rest api end points etc). This 
adds overhead in my usecase as we have to deploy this in multiple 
environments.  I was trying to decouple these properties files from the 
uber jar and provide as as either a classpath resource or pass the path 
of the file as a system property to the jvm.


So far I noticed following options to achieve this.

 * put all properties in a file and use /--classpath/ file:// option  in /flink run /command . This needs the url to be
   accessible from all nodes, something like NFS
 * use -D in yarn-session to pass each properties. This will need to
   restart the yarn session if a new property gets added.

An ideal solution for me would to provide a local classpath to flink run 
command and that gets propagated to other workers automatically :)


Thanks,
Jins
On 07/12/2017 02:25 AM, Aljoscha Krettek wrote:

Hi,

Yes, setting the property using -D when creating the session should work to 
make it available on all workers. I think after that it cannot be changed since 
they JVMs are already running.

If I may ask, what’s your use case for this? Are you still using Beam on Flink 
or are you using vanilla Flink with this?

Best,
Aljoscha


On 11. Jul 2017, at 07:24, Jins George <jins.geo...@aeris.net> wrote:

Thanks Nico. I am able to pass arguments to the  main program, that works, but 
not exactly that I was looking for.

I guess to have all worker jvms the same  system property,  I have to set it at 
yarn-session creation time using -D ( haven't tried it yet)

Thanks,
Jins George

On 07/10/2017 06:56 AM, Nico Kruber wrote:

Hi Jins,
I'm not sure whether you can define a system property, but you can include it
in the program arguments of "flink run [OPTIONS]  "

You may also be able to define system properties but these are probably only
valid in your main() function executed within the flink run script, not any
operators run on other JVM nodes. Have you tried that?


Nico

On Saturday, 8 July 2017 18:08:59 CEST Jins George wrote:

Hello,

I want to set the path of a properties file as System property in my
application(something like -Dkey=value).
Is there a way to set it while submitting a flink job to running YARN
Session? I am using //bin/flink run/ to submit the job to a already
running YARN session.

Thanks,
Jins George






Re: System properties when submitting flink job to YARN Session

2017-07-10 Thread Jins George
Thanks Nico. I am able to pass arguments to the  main program, that 
works, but not exactly that I was looking for.


I guess to have all worker jvms the same  system property,  I have to 
set it at yarn-session creation time using -D ( haven't tried it yet)


Thanks,
Jins George

On 07/10/2017 06:56 AM, Nico Kruber wrote:

Hi Jins,
I'm not sure whether you can define a system property, but you can include it
in the program arguments of "flink run [OPTIONS]  "

You may also be able to define system properties but these are probably only
valid in your main() function executed within the flink run script, not any
operators run on other JVM nodes. Have you tried that?


Nico

On Saturday, 8 July 2017 18:08:59 CEST Jins George wrote:

Hello,

I want to set the path of a properties file as System property in my
application(something like -Dkey=value).
Is there a way to set it while submitting a flink job to running YARN
Session? I am using //bin/flink run/ to submit the job to a already
running YARN session.

Thanks,
Jins George




System properties when submitting flink job to YARN Session

2017-07-08 Thread Jins George

Hello,

I want to set the path of a properties file as System property in my 
application(something like -Dkey=value).
Is there a way to set it while submitting a flink job to running YARN 
Session? I am using //bin/flink run/ to submit the job to a already 
running YARN session.


Thanks,
Jins George





Submit Flink job programatically

2017-04-07 Thread Jins George

Hello Community,

I have a need to submit  flink job to a remote Yarn cluster 
programatically . I tried to use YarnClusterDescriptor.deploy() , but I 
get message /RMProxy.java:92:main] - Connecting to ResourceManager at 
/0.0.0.0:8032.
/It is trying to connect the resouce manager on the client machine.  I 
have set the YARN_CONF_DIR on the client machine  and placed 
yarn-site.xml , core-site.xml etc.  However it does not seems to be 
picking these files.


Is this the right way to sumit to a Remote Yarn cluster ?


Thanks,
Jins George