Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section
Thank you Guowei. That was the trick! By default jobs from completed section are expired and removed in 1 hour. I have increased jobstore.expiration-time and now completed jobs are retained. Thanks, Jins From: Guowei Ma Date: Wednesday, April 10, 2019 at 3:29 AM To: Jins George Cc: Timothy Victor , user Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section I am not very sure about this problem. But you could try to increase jobstore.expiration-time in config. Best, Guowei Jins George mailto:jins.geo...@aeris.net>> 于2019年4月10日周三 下午1:01写道: Any input on this UI behavior ? Thanks, Jins From: Timothy Victor mailto:vict...@gmail.com>> Date: Monday, April 8, 2019 at 10:47 AM To: Jins George mailto:jins.geo...@aeris.net>> Cc: user mailto:user@flink.apache.org>> Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section I face the same issue in Flink 1.7.1. Would be good to know a solution. Tim On Mon, Apr 8, 2019, 12:45 PM Jins George mailto:jins.geo...@aeris.net>> wrote: Hi, I am facing a weird problem in which jobs from ‘Completed Jobs’ section in Flink 1.7.2 UI disappear. Looking at the job manager logs, I see the job was failed and restarted ‘restart-strategy.fixed-delay.attempts’ times and the JobMaster was stopped. I was able to see the job in Completed Jobs section with the status as FAILED, but after some time, I don’t see it any more. The jobmanager was never restarted, so I expected the Failed or completed jobs to appear in Completed Jobs section. Any idea what might be happening ? JobManager.log: 2019-04-06 18:21:10,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not restart the job dwellalert-ubuntu-0403174608-698009a0 (b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it. 2019-04-06 18:21:10,662 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d). Restart Strategy Conf: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 10 s Thanks Jins George
Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section
Any input on this UI behavior ? Thanks, Jins From: Timothy Victor Date: Monday, April 8, 2019 at 10:47 AM To: Jins George Cc: user Subject: Re: Flink 1.7.2 UI : Jobs removed from Completed Jobs section I face the same issue in Flink 1.7.1. Would be good to know a solution. Tim On Mon, Apr 8, 2019, 12:45 PM Jins George mailto:jins.geo...@aeris.net>> wrote: Hi, I am facing a weird problem in which jobs from ‘Completed Jobs’ section in Flink 1.7.2 UI disappear. Looking at the job manager logs, I see the job was failed and restarted ‘restart-strategy.fixed-delay.attempts’ times and the JobMaster was stopped. I was able to see the job in Completed Jobs section with the status as FAILED, but after some time, I don’t see it any more. The jobmanager was never restarted, so I expected the Failed or completed jobs to appear in Completed Jobs section. Any idea what might be happening ? JobManager.log: 2019-04-06 18:21:10,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not restart the job dwellalert-ubuntu-0403174608-698009a0 (b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it. 2019-04-06 18:21:10,662 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d). Restart Strategy Conf: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 10 s Thanks Jins George
Flink 1.7.2 UI : Jobs removed from Completed Jobs section
Hi, I am facing a weird problem in which jobs from ‘Completed Jobs’ section in Flink 1.7.2 UI disappear. Looking at the job manager logs, I see the job was failed and restarted ‘restart-strategy.fixed-delay.attempts’ times and the JobMaster was stopped. I was able to see the job in Completed Jobs section with the status as FAILED, but after some time, I don’t see it any more. The jobmanager was never restarted, so I expected the Failed or completed jobs to appear in Completed Jobs section. Any idea what might be happening ? JobManager.log: 2019-04-06 18:21:10,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not restart the job dwellalert-ubuntu-0403174608-698009a0 (b274377e6a223078d6f40b9c0620ee0d) because the restart strategy prevented it. 2019-04-06 18:21:10,662 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job dwellalert-ubuntu-0403174608-698009a0(b274377e6a223078d6f40b9c0620ee0d). Restart Strategy Conf: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 restart-strategy.fixed-delay.delay: 10 s Thanks Jins George
Re: Flink 1.6 Yarn Session behavior
Thank you Gary. That was helpful. Thanks, Jins George On 2/17/19 10:03 AM, Gary Yao wrote: Hi Jins George, Every TM brings additional overhead, e.g., more heartbeat messages. However, a cluster with 28 TMs would not be considered big as there are users that are running Flink applications on thousands of cores [1][2]. Best, Gary [1] https://flink.apache.org/flink-architecture.html#run-applications-at-any-scale [2] https://de.slideshare.net/FlinkForward/flink-forward-sf-2017-stephan-ewen-experiences-running-flink-at-very-large-scale On Thu, Feb 14, 2019 at 6:59 PM Jins George mailto:jins.geo...@aeris.net>> wrote: Thanks Gary. Understood the behavior. I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes, that will end up 28 taskmanagers and 1 job manager. I was wondering if this can bring additional burden on jobmanager? Is it recommended? Thanks, Jins George On 2/14/19 8:49 AM, Gary Yao wrote: Hi Jins George, This has been asked before [1]. The bottom line is that you currently cannot pre-allocate TMs and distribute your tasks evenly. You might be able to achieve a better distribution across hosts by configuring fewer slots in your TMs. Best, Gary [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai mailto:tzuli...@apache.org>> wrote: Hi, I'm forwarding this question to Gary (CC'ed), who most likely would have an answer for your question here. Cheers, Gordon On Wed, Feb 13, 2019 at 8:33 AM Jins George mailto:jins.geo...@aeris.net>> wrote: Hello community, I am trying to upgrade a Flink Yarn session cluster running BEAM pipelines from version 1.2.0 to 1.6.3. Here is my session start command: yarn-session.sh -d -n 4 -jm 1024 -tm 3072 -s 7 Because of the dynamic resource allocation, no taskmanager gets created initially. Now once I submit a job with parallelism 5, I see that 1 task-manager gets created and all 5 parallel instances are scheduled on the same taskmanager( because I have 7 slots). This can create hot spot as only one physical node ( out of 4 in my case) is utilized for processing. I noticed the legacy mode, which would provision all task managers at cluster creation, but since legacy mode is expected to go away soon, I didn't want to try that route. Is there a way I can configure the multiple jobs or parallel instances of same job spread across all the available Yarn nodes and continue using the 'new' mode ? Thanks, Jins George
Re: Flink 1.6 Yarn Session behavior
Thanks Gary. Understood the behavior. I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes, that will end up 28 taskmanagers and 1 job manager. I was wondering if this can bring additional burden on jobmanager? Is it recommended? Thanks, Jins George On 2/14/19 8:49 AM, Gary Yao wrote: Hi Jins George, This has been asked before [1]. The bottom line is that you currently cannot pre-allocate TMs and distribute your tasks evenly. You might be able to achieve a better distribution across hosts by configuring fewer slots in your TMs. Best, Gary [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai mailto:tzuli...@apache.org>> wrote: Hi, I'm forwarding this question to Gary (CC'ed), who most likely would have an answer for your question here. Cheers, Gordon On Wed, Feb 13, 2019 at 8:33 AM Jins George mailto:jins.geo...@aeris.net>> wrote: Hello community, I am trying to upgrade a Flink Yarn session cluster running BEAM pipelines from version 1.2.0 to 1.6.3. Here is my session start command: yarn-session.sh -d -n 4 -jm 1024 -tm 3072 -s 7 Because of the dynamic resource allocation, no taskmanager gets created initially. Now once I submit a job with parallelism 5, I see that 1 task-manager gets created and all 5 parallel instances are scheduled on the same taskmanager( because I have 7 slots). This can create hot spot as only one physical node ( out of 4 in my case) is utilized for processing. I noticed the legacy mode, which would provision all task managers at cluster creation, but since legacy mode is expected to go away soon, I didn't want to try that route. Is there a way I can configure the multiple jobs or parallel instances of same job spread across all the available Yarn nodes and continue using the 'new' mode ? Thanks, Jins George
Flink 1.6 Yarn Session behavior
Hello community, I am trying to upgrade a Flink Yarn session cluster running BEAM pipelines from version 1.2.0 to 1.6.3. Here is my session start command: yarn-session.sh -d -n 4 -jm 1024 -tm 3072 -s 7 Because of the dynamic resource allocation, no taskmanager gets created initially. Now once I submit a job with parallelism 5, I see that 1 task-manager gets created and all 5 parallel instances are scheduled on the same taskmanager( because I have 7 slots). This can create hot spot as only one physical node ( out of 4 in my case) is utilized for processing. I noticed the legacy mode, which would provision all task managers at cluster creation, but since legacy mode is expected to go away soon, I didn't want to try that route. Is there a way I can configure the multiple jobs or parallel instances of same job spread across all the available Yarn nodes and continue using the 'new' mode ? Thanks, Jins George
Re: Flink on YARN || Monitoring REST API Not Working || Please help
8081 is the default port for standalone cluster. For Yarn flink cluster, Go to the Running applications and from the list of applications. You can get the Flink UI by clicking Application master link for the yarn session. Regards, Jins On Feb 1, 2018, at 8:06 AM, Raja.Aravapalli mailto:raja.aravapa...@target.com>> wrote: Hi, I have deployed Flink cluster on Hadoop YARN and I am able to trigger jobs and run it. But, I am not able to work the running flink cluster’s Montoring REST API! As listed here @ https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html I am trying to connect using url http://hostname:8081/jobs where hostname is jobmanager host. Could n’t find, how to fix this. Can someone please share your thoughts. Thanks a lot. Regards, Raja.
Re: Back-pressure Status shows OK but records are backed up in kafka
Thank You Ufuk & Shannon. Since my kafka consumer is UnboundedKafkaSource from BEAM, not sure if records-lag-max metrics is exposed. Let me research further. Thanks, Jins George On 01/08/2018 10:11 AM, Shannon Carey wrote: Right, backpressure only measures backpressure on the inside of the Flink job. Ie. between Flink tasks. Therefore, it’s up to you to monitor whether your Flink job is “keeping up” with the source stream. If you’re using Kafka, there’s a metric that the consumer library makes available. For example, for one of our jobs, in Graphite we have a metric that matches: aliasByNode(stats.gauges.myapp.prod.us-east-1.myapp.*.taskmanager.*.job.com.example.$Job.operator.*.*.KafkaConsumer.records-lag-max, 18, 19) The “$Job” is a variable which allows you to select the job. You can see that I have wildcards on other elements of the path, for example the TaskManager id, the operator name, the Task index, etc. Your metric is probably rooted somewhere else, but the thing you’re looking for is under operator.*.*.KafkaConsumer.records-lag-max. Flink manages its offsets itself, rather than acting like a “normal” consumer which commits offsets to Kafka. However, in the docs I see that “setCommitOffsetsOnCheckpoints()” is enabled by default. So, theoretically you can use any sort of tool similar to https://github.com/srotya/kafka-lag-monitor or https://github.com/linkedin/Burrow etc. which polls Kafka itself and produces metrics about consumer lag. However, for some reason, I don’t see our Flink consumer metrics showing up in our lag monitoring tool or in the Kafka command-line tools, so I’m not sure what’s going on there. Maybe it’s because Flink doesn’t show up as a consumer group? At first I thought that it might be because we’re not setting the “group.id” property, but as it turns out we are indeed setting it. In any case, we have to use the job’s metrics, and monitor that the job is up, rather than monitoring the offset in Kafka itself. -Shannon On 1/8/18, 1:52 AM, "Ufuk Celebi" wrote: Hey Jins, our current back pressure tracking mechanism does not work with Kafka sources. To gather back pressure indicators we sample the main task thread of a subtask. For most tasks, this is the thread that emits records downstream (e.g. if you have a map function) and everything works as expected. In case of the Kafka source though there is a separate thread that consumes from Kafka and emits the records. Therefore we sample the "wrong" thread and don't observe any indicators for back pressure. :-( Unfortunately, this was not taking into account when back pressure sampling was implemented. There is this old issue to track this: https://issues.apache.org/jira/browse/FLINK-3456 I'm not aware of any other way to track this situation. Maybe others can chime in here... – Ufuk On Mon, Jan 8, 2018 at 8:16 AM, Jins George wrote: > I have a Beam Pipeline consuming records from Kafka doing some > transformations and writing it to Hbase. I faced an issue in which records > were writing to Hbase at a slower rate than the incoming messages to Kafka > due to a temporary surge in the incoming traffic. > > From the flink UI, if I check the back pressure status, it shows OK. I have > one task which has all the operators including source. > > Any idea why backpressure indicator would show OK, but messages are backed > up in Kafka. > > Is there any other mechanism/metrics by which I can identify this situation > ? > > I'm running Flink 1.2/w beam 2.0. > > Thanks, > Jins George
Back-pressure Status shows OK but records are backed up in kafka
I have a Beam Pipeline consuming records from Kafka doing some transformations and writing it to Hbase. I faced an issue in which records were writing to Hbase at a slower rate than the incoming messages to Kafka due to a temporary surge in the incoming traffic. From the flink UI, if I check the back pressure status, it shows OK. I have one task which has all the operators including source. Any idea why backpressure indicator would show OK, but messages are backed up in Kafka. Is there any other mechanism/metrics by which I can identify this situation ? I'm running Flink 1.2/w beam 2.0. Thanks, Jins George
Re: Issue with Checkpoint restore( Beam pipeline)
Thanks Aljoscha. I have not tried with 1.3. I will try and check the behavior. Regarding setting UIDs to operators from Beam, do you know if thats something planned for a near future release ? Thanks, Jins George On 11/30/2017 01:48 AM, Aljoscha Krettek wrote: Hi, I think you might be running into a problem that is hard to solve with Flink 1.2 and Beam. As you noticed, it's a problem that Beam doesn't assign UIDs to operators, which is a problem. Flink 1.3 and even more Flink 1.4 are a bit more lenient in accepting changes to the graph, so you might have better luck when trying it with that. Did you try using a newer Beam/Flink version? Flink 1.4 should be out next week and shortly after that I'll also update the Beam dependency. Best, Aljoscha On 29. Nov 2017, at 23:52, Jins George <mailto:jins.geo...@aeris.net>> wrote: Hi, I am running a Beam Pipeline on Flink 1.2 and facing an issue in restoring a job from checkpoint. If I modify my beam pipeline to add a new operator and try to restore from the externalized checkpoint, I get the error /java.lang.IllegalStateException: Invalid Invalid number of operator states. Found :56. Expected: 58// //at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)// //at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)// //at java.lang.Thread.run(Thread.java:745)/ From the savepoint guide [1], new operator added should be initialized without any state. Any idea why this error is reported. Also note, I am not setting a ID to my operator ( because Flink runner in Beam does set the operator name user provided in pipeline creation) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html Thanks, Jins George
Issue with Checkpoint restore( Beam pipeline)
Hi, I am running a Beam Pipeline on Flink 1.2 and facing an issue in restoring a job from checkpoint. If I modify my beam pipeline to add a new operator and try to restore from the externalized checkpoint, I get the error /java.lang.IllegalStateException: Invalid Invalid number of operator states. Found :56. Expected: 58// //at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650)// //at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)// //at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)// //at java.lang.Thread.run(Thread.java:745)/ From the savepoint guide [1], new operator added should be initialized without any state. Any idea why this error is reported. Also note, I am not setting a ID to my operator ( because Flink runner in Beam does set the operator name user provided in pipeline creation) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html Thanks, Jins George
Re: external checkpoints
Hi Aviad, I had a similar situation and my solution was to use the flink monitoring rest api (/jobs/{jobid}/checkpoints) to get the mapping between job and checkpoint file. Wrap this in a script and run periodically( in my case, it was 30 sec). You can also configure each job with an externalized checkpoint directory. Refer https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#directory-structure Thanks, Jins On 11/15/2017 06:34 AM, Aviad Rotem wrote: Hi, I have several jobs which configured for external check-pointing (enableExternalizedCheckpoints) how can I correlate between checkpoint and jobs. for example, if i want to write script which monitor if the job is up or not and if the job is down it will resume the job from the externalized checkpoint. how could i know which checkpoint belong to the specific job? can I configure each job to write the external check-pointing to a different position? my configuration is: *state.backend*: rocksdb *state.backend.fs.checkpointdir*: s3a://flink-bucket/backend/checkpoints *state.checkpoints.dir*: s3a://flink-bucket/checkpoints and in the code I set: enableCheckpointing enableExternalizedCheckpoints
Re: System properties when submitting flink job to YARN Session
Hi Aljoscha, I am still using Beam on Flink. I have one yarn session running multiple streaming jobs. The application jar contains some environment specific run time properties( like ip addresses, rest api end points etc). This adds overhead in my usecase as we have to deploy this in multiple environments. I was trying to decouple these properties files from the uber jar and provide as as either a classpath resource or pass the path of the file as a system property to the jvm. So far I noticed following options to achieve this. * put all properties in a file and use /--classpath/ file:// option in /flink run /command . This needs the url to be accessible from all nodes, something like NFS * use -D in yarn-session to pass each properties. This will need to restart the yarn session if a new property gets added. An ideal solution for me would to provide a local classpath to flink run command and that gets propagated to other workers automatically :) Thanks, Jins On 07/12/2017 02:25 AM, Aljoscha Krettek wrote: Hi, Yes, setting the property using -D when creating the session should work to make it available on all workers. I think after that it cannot be changed since they JVMs are already running. If I may ask, what’s your use case for this? Are you still using Beam on Flink or are you using vanilla Flink with this? Best, Aljoscha On 11. Jul 2017, at 07:24, Jins George wrote: Thanks Nico. I am able to pass arguments to the main program, that works, but not exactly that I was looking for. I guess to have all worker jvms the same system property, I have to set it at yarn-session creation time using -D ( haven't tried it yet) Thanks, Jins George On 07/10/2017 06:56 AM, Nico Kruber wrote: Hi Jins, I'm not sure whether you can define a system property, but you can include it in the program arguments of "flink run [OPTIONS] " You may also be able to define system properties but these are probably only valid in your main() function executed within the flink run script, not any operators run on other JVM nodes. Have you tried that? Nico On Saturday, 8 July 2017 18:08:59 CEST Jins George wrote: Hello, I want to set the path of a properties file as System property in my application(something like -Dkey=value). Is there a way to set it while submitting a flink job to running YARN Session? I am using //bin/flink run/ to submit the job to a already running YARN session. Thanks, Jins George
Re: System properties when submitting flink job to YARN Session
Thanks Nico. I am able to pass arguments to the main program, that works, but not exactly that I was looking for. I guess to have all worker jvms the same system property, I have to set it at yarn-session creation time using -D ( haven't tried it yet) Thanks, Jins George On 07/10/2017 06:56 AM, Nico Kruber wrote: Hi Jins, I'm not sure whether you can define a system property, but you can include it in the program arguments of "flink run [OPTIONS] " You may also be able to define system properties but these are probably only valid in your main() function executed within the flink run script, not any operators run on other JVM nodes. Have you tried that? Nico On Saturday, 8 July 2017 18:08:59 CEST Jins George wrote: Hello, I want to set the path of a properties file as System property in my application(something like -Dkey=value). Is there a way to set it while submitting a flink job to running YARN Session? I am using //bin/flink run/ to submit the job to a already running YARN session. Thanks, Jins George
System properties when submitting flink job to YARN Session
Hello, I want to set the path of a properties file as System property in my application(something like -Dkey=value). Is there a way to set it while submitting a flink job to running YARN Session? I am using //bin/flink run/ to submit the job to a already running YARN session. Thanks, Jins George
Submit Flink job programatically
Hello Community, I have a need to submit flink job to a remote Yarn cluster programatically . I tried to use YarnClusterDescriptor.deploy() , but I get message /RMProxy.java:92:main] - Connecting to ResourceManager at /0.0.0.0:8032. /It is trying to connect the resouce manager on the client machine. I have set the YARN_CONF_DIR on the client machine and placed yarn-site.xml , core-site.xml etc. However it does not seems to be picking these files. Is this the right way to sumit to a Remote Yarn cluster ? Thanks, Jins George