Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job
Hi folks, I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is crashing while cancelling a job. This is causing Kubernetes readiness probes to fail, the JM to be restarted, and then get in a bad state while it tries to recover itself using ZK + a checkpoint which no longer exists. This is the only information being logged before the process exits: method: uncaughtException msg: FATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. Stopping the process... pod: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x stack: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58 I’m not sure how to debug this further, but it seems like an internal Flink bug? More info: * Checkpoints are stored in S3 and I’m using the S3 connector * Identical code has been running on Flink 1.11.x for months with no issues Thanks, Kelly
Re: Flink SQL - Join Lookup Table
Thanks Leonard and Danny, This makes a lot of sense. My hope here is to only use SQL without any specialized Java/Scala code, so it seems it may not be possible to use either of these methods yet. I’ll open an issue for the LookupTableSource implementation, and look into the workaround you suggested in the short term. Thanks! Kelly From: Leonard Xu Date: Monday, July 20, 2020 at 7:49 PM To: Danny Chan Cc: Kelly Smith , Flink ML Subject: Re: Flink SQL - Join Lookup Table Hi, kelly Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem), dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1]. But `FileSystemTableSource` did not implement `LookupTableSource` interface yet which means you can not use it as a dimension table, the connector that supported `LookupTableSource` includes JDBC、HBase、Hive, you can created an issue to support `lookupTableSource` for filesystem connector. Another approach is using Temporal Table Function[1] which can define a Temporal table from a dataStream, you can convert your Table(filesystem table) to stream and then create a temporal table and then join the temporal table. Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977735666&sdata=D6b79IvRAKV2c5Z2NC5wZulSiqzc8q7tZu0nRJTof1Y%3D&reserved=0> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Ftemporal_tables.html%23defining-temporal-table-function&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=XhKhalz7aFOD6jxhRIO0XzFOnA11cbIofhyYeiqz2KI%3D&reserved=0> 在 2020年7月21日,10:07,Danny Chan mailto:yuzhao@gmail.com>> 写道: Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax Join LookupTable FOR SYSTEM_TIME AS OF … See [1] for details. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Ftable%2Fstreaming%2Fjoins.html%23join-with-a-temporal-table&data=02%7C01%7Ckellysm%40zillowgroup.com%7C9930b3c35a854c335b4c08d82d20c097%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637308965977745660&sdata=MjhcZmZJ9X00Noyq4CmEtggM2g1%2BaC%2FUqUz1nnAUce8%3D&reserved=0> Best, Danny Chan 在 2020年7月21日 +0800 AM6:32,Kelly Smith mailto:kell...@zillowgroup.com>>,写道: Hi folks, I have a question Flink SQL. What I want to do is this: * Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table. For example, a simple lookup table: CREATE TABLE LookupTable ( `computeClass` STRING, `multiplier`FLOAT ) WITH ( 'connector' = 'filesystem', 'path' = 'fpu-multipliers.csv', 'format' = 'csv' ) And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above. SELECT `timestamp`, // ... ks.computeClass, lt.`multiplier` FROM KafkaStream ks JOIN LookupTable lt ON ks.computeClass = lt.computeClass Doing a simple join like that gives me this error: “org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.” Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory). Any ideas on how to accomplish what I’m trying to do? Thanks! Kelly
Flink SQL - Join Lookup Table
Hi folks, I have a question Flink SQL. What I want to do is this: * Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table. For example, a simple lookup table: CREATE TABLE LookupTable ( `computeClass` STRING, `multiplier`FLOAT ) WITH ( 'connector' = 'filesystem', 'path' = 'fpu-multipliers.csv', 'format' = 'csv' ) And I’ve got a Kafka connector table with rowtime semantics that has a `computeClass` field. I simply want to join (in a streaming fashion) the `multiplier` field above. SELECT `timestamp`, // ... ks.computeClass, lt.`multiplier` FROM KafkaStream ks JOIN LookupTable lt ON ks.computeClass = lt.computeClass Doing a simple join like that gives me this error: “org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.” Which leads me to believe that I should use an Interval Join instead, but that doesn’t seem to be appropriate since my table is static and has no concept of time. Basically, I want to hold the entire lookup table in memory, and simply enrich the Kafka stream (which need not be held in memory). Any ideas on how to accomplish what I’m trying to do? Thanks! Kelly
Re: Metrics for Task States
Thanks Caizhi, that was what I was afraid of. Thanks for the information on the REST API 😊 It seems like the right solution would be to add it as a first-class feature for Flink so I will add a feature request. I may end up using the REST API as a workaround in the short-term - probably with a side-car container once we move to Kubernetes. Kelly From: Caizhi Weng Date: Monday, November 25, 2019 at 1:41 AM To: Kelly Smith Cc: Piper Piper , "user@flink.apache.org" Subject: Re: Metrics for Task States Hi Kelly, As far as I know Flink currently does not have such metrics to monitor on the number of tasks in each states. See https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fmonitoring%2Fmetrics.html&data=02%7C01%7Ckellysm%40zillowgroup.com%7C2e123af242df4c2794b708d7718bb535%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637102717167008430&sdata=0P9tJLl7H5yjw4ov5Kkn2rwr4YDiOPCpU%2Fa%2BAeQeZhc%3D&reserved=0> for the complete metrics list. (It seems that `taskSlotsAvailable` in the metrics list is the most related metrics). But Flink has a REST api which can provide states for all the tasks (http://hostname:port/overview). This REST returns a json string containing all the metrics you want. Maybe you can write your own tool to monitor on this api. If you really want to have metrics that describe the number of tasks in each states, you can open up a JIRA ticket at https://issues.apache.org/jira/projects/FLINK/issues/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fprojects%2FFLINK%2Fissues%2F&data=02%7C01%7Ckellysm%40zillowgroup.com%7C2e123af242df4c2794b708d7718bb535%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637102717167008430&sdata=0gR88PMtJmE9JE53grFTquRdFkKssd3YUG2oufPRe3U%3D&reserved=0> Thank you Kelly Smith mailto:kell...@zillowgroup.com>> 于2019年11月25日周一 上午12:59写道: With EMR/YARN, the cluster is definitely running in session mode. It exists independently of any job and continues running after the job exits. Whether or not this is a bug in Flink, is it possible to get access to the metrics I'm asking about? Those would be useful even if this behavior is fixed. Get Outlook for Android<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&data=02%7C01%7Ckellysm%40zillowgroup.com%7C2e123af242df4c2794b708d7718bb535%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637102717167018425&sdata=WIM9W5dCtpxc4HWa0Azf1SvO4TtUl7ZpCDkxMyfhw94%3D&reserved=0> From: Piper Piper mailto:piperfl...@gmail.com>> Sent: Friday, November 22, 2019 9:10:41 PM To: Kelly Smith mailto:kell...@zillowgroup.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.org>> Subject: Re: Metrics for Task States I am trying to reason why this problem should occur (i.e. why Flink could not reject the job when it required more slots than were available). Flink in production on EMR (YARN): Does this mean Flink was being run in Job mode or Session mode? Thank you, Piper On Thu, Nov 21, 2019 at 4:56 PM Piper Piper mailto:piperfl...@gmail.com>> wrote: Thank you, Kelly! On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith mailto:kell...@zillowgroup.com>> wrote: Hi Piper, The repro is pretty simple: * Submit a job with parallelism set higher than YARN has resources to support What this ends up looking like in the Flink UI is this: [cid:16ea1e8b5784cff311] The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits by default will increase by 1, but none of the tasks actually get scheduled on any TM. [cid:16ea1e8b5785b16b22] What I’m looking for is a way to detect when I am in this state using Flink metrics (ideally the count of tasks in each state for better observability). Does that make sense? Thanks, Kelly From: Piper Piper mailto:piperfl...@gmail.com>> Date: Thursday, November 21, 2019 at 12:59 PM To: Kelly Smith mailto:kell...@zillowgroup.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Metrics for Task States Hello Kelly, I thought that Flink scheduler only starts a job if all requested containers/TMs are available and allotted to that job. How can I reproduce your issue on Flink with YARN? Thank you, Piper On Thu, Nov 21, 2019, 1:48 PM Kelly Smith mailto:kell...@zillowgroup.com>> wrote: I’ve been running Flink in production on EMR (YARN) for some time and have found the metrics system to be quite useful, but there is one specific case where I’m missing a signal for this scenario: * When a job
Re: Metrics for Task States
With EMR/YARN, the cluster is definitely running in session mode. It exists independently of any job and continues running after the job exits. Whether or not this is a bug in Flink, is it possible to get access to the metrics I'm asking about? Those would be useful even if this behavior is fixed. Get Outlook for Android<https://aka.ms/ghei36> From: Piper Piper Sent: Friday, November 22, 2019 9:10:41 PM To: Kelly Smith Cc: user@flink.apache.org Subject: Re: Metrics for Task States I am trying to reason why this problem should occur (i.e. why Flink could not reject the job when it required more slots than were available). Flink in production on EMR (YARN): Does this mean Flink was being run in Job mode or Session mode? Thank you, Piper On Thu, Nov 21, 2019 at 4:56 PM Piper Piper mailto:piperfl...@gmail.com>> wrote: Thank you, Kelly! On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith mailto:kell...@zillowgroup.com>> wrote: Hi Piper, The repro is pretty simple: * Submit a job with parallelism set higher than YARN has resources to support What this ends up looking like in the Flink UI is this: [cid:16e8fd359da4cff311] The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits by default will increase by 1, but none of the tasks actually get scheduled on any TM. [cid:16e8fd359db5b16b22] What I’m looking for is a way to detect when I am in this state using Flink metrics (ideally the count of tasks in each state for better observability). Does that make sense? Thanks, Kelly From: Piper Piper mailto:piperfl...@gmail.com>> Date: Thursday, November 21, 2019 at 12:59 PM To: Kelly Smith mailto:kell...@zillowgroup.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Metrics for Task States Hello Kelly, I thought that Flink scheduler only starts a job if all requested containers/TMs are available and allotted to that job. How can I reproduce your issue on Flink with YARN? Thank you, Piper On Thu, Nov 21, 2019, 1:48 PM Kelly Smith mailto:kell...@zillowgroup.com>> wrote: I’ve been running Flink in production on EMR (YARN) for some time and have found the metrics system to be quite useful, but there is one specific case where I’m missing a signal for this scenario: * When a job has been submitted, but YARN does not have enough resources to provide Observed: * Job is in RUNNING state * All of the tasks for the job are in the (I believe) DEPLOYING state Is there a way to access these as metrics for monitoring the number of tasks in each state for a given job (image below)? The metric I’m currently using is the number of running jobs, but it misses this “unhealthy” scenario. I realize that I could use application-level metrics (record counts, etc) as a proxy for this, but I’m working on providing a streaming platform and need all of my monitoring to be application agnostic. [cid:image001.png@01D5A059.19DB3EB0] I can’t find anything on it in the documentation. Thanks, Kelly
Re: Metrics for Task States
Hi Piper, The repro is pretty simple: * Submit a job with parallelism set higher than YARN has resources to support What this ends up looking like in the Flink UI is this: [cid:image001.png@01D5A06C.6E16D580] The Job is in a “RUNNING” state, but all of the tasks are in the “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits by default will increase by 1, but none of the tasks actually get scheduled on any TM. [cid:image002.png@01D5A06C.6E16D580] What I’m looking for is a way to detect when I am in this state using Flink metrics (ideally the count of tasks in each state for better observability). Does that make sense? Thanks, Kelly From: Piper Piper Date: Thursday, November 21, 2019 at 12:59 PM To: Kelly Smith Cc: "user@flink.apache.org" Subject: Re: Metrics for Task States Hello Kelly, I thought that Flink scheduler only starts a job if all requested containers/TMs are available and allotted to that job. How can I reproduce your issue on Flink with YARN? Thank you, Piper On Thu, Nov 21, 2019, 1:48 PM Kelly Smith mailto:kell...@zillowgroup.com>> wrote: I’ve been running Flink in production on EMR (YARN) for some time and have found the metrics system to be quite useful, but there is one specific case where I’m missing a signal for this scenario: * When a job has been submitted, but YARN does not have enough resources to provide Observed: * Job is in RUNNING state * All of the tasks for the job are in the (I believe) DEPLOYING state Is there a way to access these as metrics for monitoring the number of tasks in each state for a given job (image below)? The metric I’m currently using is the number of running jobs, but it misses this “unhealthy” scenario. I realize that I could use application-level metrics (record counts, etc) as a proxy for this, but I’m working on providing a streaming platform and need all of my monitoring to be application agnostic. [cid:image001.png@01D5A059.19DB3EB0] I can’t find anything on it in the documentation. Thanks, Kelly
Metrics for Task States
I’ve been running Flink in production on EMR (YARN) for some time and have found the metrics system to be quite useful, but there is one specific case where I’m missing a signal for this scenario: * When a job has been submitted, but YARN does not have enough resources to provide Observed: * Job is in RUNNING state * All of the tasks for the job are in the (I believe) DEPLOYING state Is there a way to access these as metrics for monitoring the number of tasks in each state for a given job (image below)? The metric I’m currently using is the number of running jobs, but it misses this “unhealthy” scenario. I realize that I could use application-level metrics (record counts, etc) as a proxy for this, but I’m working on providing a streaming platform and need all of my monitoring to be application agnostic. [cid:image001.png@01D5A059.19DB3EB0] I can’t find anything on it in the documentation. Thanks, Kelly