flink k8s operator chk config interval bug.inoperative
kcz 573693...@qq.com
Re: flink k8s 部署启动报错
_DIRTY.json 看下以这个结尾的文件,内容应该是一个 json,如果不是标准 json 说明数据已经异常了,可以尝试删除 Best, Weihua On Tue, Mar 14, 2023 at 11:23 AM Jason_H wrote: > 您好, > 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > 回复的原邮件 > | 发件人 | Weihua Hu | > | 发送日期 | 2023年3月14日 10:39 | > | 收件人 | | > | 主题 | Re: flink k8s 部署启动报错 | > Hi, > > 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 > 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 > > 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path > > Best, > Weihua > > > On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: > > hi,大家好 > 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults > of globally-terminated jobs from JobResultStore > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at > java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) > [?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) [?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) [?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) [?:?] > at java.lang.Thread.run(Unknown Source) [?:?] > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve > JobResults of globally-terminated jobs from JobResultStore > at > > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) > ~[flink-dist-1.15.2.jar:1.15.2] > ... 4 more > Caused by: > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > No content to map due to end-of-input > at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); > line: 1, column: 0] > at > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) > ~[flink-dist-1.15.2.jar:1.15.2] > at > > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) > ~[flink-dist-1.15.2.jar:1.15.2] > ... 4 more > > > | | > Jason_H > | > | > hyb_he...@163.com > | >
回复: flink k8s 部署启动报错
您好, 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
回复: flink k8s 部署启动报错
您好, 对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
Re: flink k8s 部署启动报错
Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: > hi,大家好 > 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults > of globally-terminated jobs from JobResultStore > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at > java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) > [?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) [?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) [?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) [?:?] > at java.lang.Thread.run(Unknown Source) [?:?] > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve > JobResults of globally-terminated jobs from JobResultStore > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) > ~[flink-dist-1.15.2.jar:1.15.2] > ... 4 more > Caused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > No content to map due to end-of-input > at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); > line: 1, column: 0] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) > ~[flink-dist-1.15.2.jar:1.15.2] > ... 4 more > > > | | > Jason_H > | > | > hyb_he...@163.com > |
flink k8s 部署启动报错
hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。
退订 发自我的 iPhone > 在 2022年10月28日,11:41,Weihua Hu 写道: > > Hi, Young > > 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster > 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod > 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。 > > 也许你可以创建一个 jira issue 来跟进这个问题 > > Best, > Weihua > > >> On Thu, Oct 27, 2022 at 6:51 PM Young Chen wrote: >> >> 【问题描述】 >> >> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), >> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 >> >> 可以看到容器中如下error日志。 >> >> >> >> 【操作步骤】 >> >> 部署Cluster >> >> >> >> apiVersion: flink.apache.org/v1beta1 >> >> kind: FlinkDeployment >> >> metadata: >> >> name: flink-cluster-1jm-checkpoint >> >> spec: >> >> image: flink:1.15 >> >> flinkVersion: v1_15 >> >> flinkConfiguration: >> >>taskmanager.numberOfTaskSlots: "1" >> >>state.savepoints.dir: >> file:///flink-data/savepoints >> >>state.checkpoints.dir: >> file:///flink-data/checkpoints >> >>high-availability: >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> >>high-availability.storageDir: >> file:///flink-data/ha >> >>state.checkpoints.num-retained: "10" >> >> serviceAccount: flink >> >> ingress: >> >>template: "{{name}}.{{namespace}}.k8s.rf.io" >> >> jobManager: >> >>replicas: 2 >> >> podTemplate: >> >>spec: >> >> nodeSelector: >> >>kubernetes.io/hostname: k8s17 >> >> containers: >> >>- name: flink-main-container >> >> volumeMounts: >> >>- mountPath: /flink-data >> >> name: flink-volume >> >> volumes: >> >>- name: flink-volume >> >> hostPath: >> >># directory location on host >> >>path: /tmp/flink >> >># this field is optional >> >>type: Directory >> >> >> >> 部署job: >> >> >> >> apiVersion: flink.apache.org/v1beta1 >> >> kind: FlinkSessionJob >> >> metadata: >> >> name: flink-job-1jm-checkpoint >> >> spec: >> >> deploymentName: flink-cluster-1jm-checkpoint >> >> job: >> >>jarURI: >> file:///opt/flink/examples/streaming/StateMachineExample.jar >> # 自己打的operator镜像包含了examples的jar >> >>entryClass: >> org.apache.flink.streaming.examples.statemachine.StateMachineExample >> >>parallelism: 1 >> >>upgradeMode: savepoint >> >> >> >> >> >> 【相关日志】 >> >> 1. job部署成功可以运行的一次,operator日志: >> >> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService >> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: >> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. >> >> java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.rest.util.RestClientException: >> [org.apache.flink.runtime.rest.handler.RestHandlerException: File >> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist >> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. >> >> at >> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) >> >> at >> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown >> Source) >> >> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown >> Source) >> >> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) >> >> at >> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown >> Source) >> >> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> Source) >> >> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >> Source) >> >> at java.base/java.lang.Thread.run(Unknown Source) >> >> ] >> >> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown >> Source >> >> 一个JobManager >> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6
Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。
Hi, Young 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。 也许你可以创建一个 jira issue 来跟进这个问题 Best, Weihua On Thu, Oct 27, 2022 at 6:51 PM Young Chen wrote: > 【问题描述】 > > Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), > 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 > > 可以看到容器中如下error日志。 > > > > 【操作步骤】 > > 部署Cluster > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkDeployment > > metadata: > > name: flink-cluster-1jm-checkpoint > > spec: > > image: flink:1.15 > > flinkVersion: v1_15 > > flinkConfiguration: > > taskmanager.numberOfTaskSlots: "1" > > state.savepoints.dir: > file:///flink-data/savepoints > > state.checkpoints.dir: > file:///flink-data/checkpoints > > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > > high-availability.storageDir: > file:///flink-data/ha > > state.checkpoints.num-retained: "10" > > serviceAccount: flink > > ingress: > > template: "{{name}}.{{namespace}}.k8s.rf.io" > > jobManager: > > replicas: 2 > > podTemplate: > > spec: > > nodeSelector: > > kubernetes.io/hostname: k8s17 > > containers: > > - name: flink-main-container > > volumeMounts: > > - mountPath: /flink-data > > name: flink-volume > > volumes: > > - name: flink-volume > > hostPath: > > # directory location on host > > path: /tmp/flink > > # this field is optional > > type: Directory > > > > 部署job: > > > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkSessionJob > > metadata: > > name: flink-job-1jm-checkpoint > > spec: > > deploymentName: flink-cluster-1jm-checkpoint > > job: > > jarURI: > file:///opt/flink/examples/streaming/StateMachineExample.jar > # 自己打的operator镜像包含了examples的jar > > entryClass: > org.apache.flink.streaming.examples.statemachine.StateMachineExample > > parallelism: 1 > > upgradeMode: savepoint > > > > > > 【相关日志】 > > 1. job部署成功可以运行的一次,operator日志: > > 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService > [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. > > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: > [org.apache.flink.runtime.rest.handler.RestHandlerException: File > 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist > in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. > > at > org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > > at java.base/java.lang.Thread.run(Unknown Source) > > ] > > at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown > Source > > 一个JobManager > Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。 > > > > > > 1. job部署失败operator日志: > > 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher > [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing > ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', > namespace='flink'}, version: 120505701} failed. > > org.apache.flink.kubernetes.operator.exception.ReconciliationException: > org.apache.flink.util.FlinkRuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., > java.util.concurrent.CompletionException:
Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。
【问题描述】 Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。 可以看到容器中如下error日志。 【操作步骤】 部署Cluster apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-cluster-1jm-checkpoint spec: image: flink:1.15 flinkVersion: v1_15 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: file:///flink-data/savepoints state.checkpoints.dir: file:///flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: file:///flink-data/ha state.checkpoints.num-retained: "10" serviceAccount: flink ingress: template: "{{name}}.{{namespace}}.k8s.rf.io" jobManager: replicas: 2 podTemplate: spec: nodeSelector: kubernetes.io/hostname: k8s17 containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume volumes: - name: flink-volume hostPath: # directory location on host path: /tmp/flink # this field is optional type: Directory 部署job: apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: flink-job-1jm-checkpoint spec: deploymentName: flink-cluster-1jm-checkpoint job: jarURI: file:///opt/flink/examples/streaming/StateMachineExample.jar # 自己打的operator镜像包含了examples的jar entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample parallelism: 1 upgradeMode: savepoint 【相关日志】 1. job部署成功可以运行的一次,operator日志: 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar. java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: File 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload. at org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ] at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source 一个JobManager Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。 1. job部署失败operator日志: 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', namespace='flink'}, version: 120505701} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error.,
Re: batch job 结束时, flink-k8s-operator crd 状态展示不清晰
从1.15开始,任务结束不会主动把JobManager删除掉了。所以Kubernetes Operator就可以正常查到Job状态并且更新 Best, Yang ¥¥¥ 于2022年10月25日周二 15:58写道: > 退订 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > > 发送时间: 2022年10月25日(星期二) 下午3:33 > 收件人: "user-zh" > 主题: batch job 结束时, flink-k8s-operator crd 状态展示不清晰 > > > > hi, > 我在使用flink-k8s-operator 部署batch job。 我发现当batch job 结束之后, > flink-k8s-operator 的 FlinkDeployment CRD 状态发生了变化: > jobManagerDeploymentStatus 变成了"missing", "error" 变成了“Missing JobManager > deployment”。 我想这个应该是batch job执行完毕之后,native-k8s 自动将JobmanagerDeployment > 删除导致的。 请问该如何通过判断CRD的状态来监控batch job的运行结束? jobStatus 中state 如果能够是“finish” > 那么就很方便判断了。 > > > status: > clusterInfo: > flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00 > flink-version: 1.14.6 > error: Missing JobManager deployment > jobManagerDeploymentStatus: MISSING > jobStatus: > jobId: 3c5807b038300f46154d72c58f074715 > jobName: batch-job-lab-o8yln9 > savepointInfo: > lastPeriodicSavepointTimestamp: 0 > savepointHistory: [] > triggerId: '' > triggerTimestamp: 0 > triggerType: UNKNOWN > startTime: '181370751' > state: RECONCILING > updateTime: '181379021' > >
batch job 结束时, flink-k8s-operator crd 状态展示不清晰
hi, 我在使用flink-k8s-operator 部署batch job。 我发现当batch job 结束之后, flink-k8s-operator 的 FlinkDeployment CRD 状态发生了变化: jobManagerDeploymentStatus 变成了"missing", "error" 变成了“Missing JobManager deployment”。 我想这个应该是batch job执行完毕之后,native-k8s 自动将JobmanagerDeployment 删除导致的。 请问该如何通过判断CRD的状态来监控batch job的运行结束? jobStatus 中state 如果能够是“finish” 那么就很方便判断了。 status: clusterInfo: flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00 flink-version: 1.14.6 error: Missing JobManager deployment jobManagerDeploymentStatus: MISSING jobStatus: jobId: 3c5807b038300f46154d72c58f074715 jobName: batch-job-lab-o8yln9 savepointInfo: lastPeriodicSavepointTimestamp: 0 savepointHistory: [] triggerId: '' triggerTimestamp: 0 triggerType: UNKNOWN startTime: '181370751' state: RECONCILING updateTime: '181379021'
Re: 怎样禁用 flink k8s operator 自动删除停止后的 pod ?
Hi, 能请问下你使用的flink版本和flink kubernetes operator版本吗? 如果flink版本>=1.15.0的话,app运行结束后JobManager Pod应该是会保留的。 Best, Biao Geng highfei2011 于2022年10月19日周三 14:11写道: > 问题描述:当使用 flink k8s operator 创建 flink app 后,不管 app 运行成功,或者失败, operator 都会自动 > delete 掉停止的 pod。我再次使用命令查看日志时, 由于 pod 不存在,所以日志无法查看。
Re: flink-k8s-operator中webhook的作用
Webhook主要的作用是做CR的校验,避免提交到K8s上之后才发现 例如:parallelism被错误的设置为负值,jarURI没有设置等 Best, Yang Kyle Zhang 于2022年7月27日周三 18:59写道: > Hi,all > 最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置 > webhook.create=false对整体功能有什么影响? > > Best regards > > [1] > > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/ >
flink-k8s-operator中webhook的作用
Hi,all 最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置 webhook.create=false对整体功能有什么影响? Best regards [1] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/
Re: Flink k8s 作业提交流程
Hi, 使用文档可以查看: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes 设计文档可以查看: https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing jira: https://issues.apache.org/jira/browse/FLINK-9953 Best, Lijie hjw <1010445...@qq.com.invalid> 于2022年6月28日周二 00:11写道: > Flink version:1.15.0 > 请问在1.15.0版本Flink在native k8s作业提交流程是如何实现的?亦或者说Flink on Native k8s > 是如何设计的,我想了解学习,如果大家有相关文档资料,麻烦告知,感谢:) >
Flink k8s ????????????
Flink version:1.15.0 ??1.15.0Flink??native k8s?Flink on Native k8s ??:)
Re:Re: Flink k8s HA 手动删除作业deployment导致的异常
感谢两位大大回复! 在 2022-06-13 10:09:39,"Yang Wang" 写道: >Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1] >之所以这样设计有两点原因: >(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复 >(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露 > >[1]. >https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up > > >Best, >Yang > >Zhanghao Chen 于2022年6月13日周一 07:53写道: > >> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink >> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 >> >> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 >> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 >> >> 2.基于k8s做HA的Flink job id皆为。 >> >> 开启 HA 的 Application mode 的 Flink job id >> 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA >> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 >> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 >> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp >> 相关联,导致作业从全新状态恢复。 >> >> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 >> >> 可以看下官方的博客文章: >> https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 >> JIRA 设计文档: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink >> >> >> Best, >> Zhanghao Chen >> >> From: m18814122325 >> Sent: Sunday, June 12, 2022 22:45 >> To: user-zh@flink.apache.org >> Subject: Flink k8s HA 手动删除作业deployment导致的异常 >> >> Flink version: 1.15.0 >> >> deploy mode: Native k8s application >> >> >> >> >> 问题现象: >> >> 我以Native >> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s >> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 >> >> >> >> >> kubectl delete deployment flink-bdra-sql-application-job -n >> bdra-dev-flink-standalone >> >> >> >> >> kubectl get configMap -n bdra-dev-flink-standalone >> >> >> >> >> NAME >>DATA AGE >> >> flink-bdra-sql-application-job-00000000-config-map >> 2 13m >> >> flink-bdra-sql-application-job-cluster-config-map >> 1 13m >> >> >> >> >> >> >> >> 我有以下疑问: >> >> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink >> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 >> >> 2.基于k8s做HA的Flink job id皆为。 >> >> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 >> >> >> >> >> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) >> >> flink run-application --target kubernetes-application -c CalculateUv >> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p >> -Dkubernetes.container.image= >> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 >> -Dkubernetes.namespace=bdra-dev-flink-standalone >> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa >> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 >> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 >> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m >> -Dstate.backend=filesystem >> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 >> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 >> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery >> -Ds3.access-key=* -Ds3.secret-key=* >> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory >> -Dmetrics.reporter.influxdb.scheme=http >> -Dmetrics.reporter.influxdb.host=influxdb >> -Dmetrics.reporter.influxdb.port=8086 >> -Dmetrics.reporter.influxdb.db=flink_metrics >> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 >> -Dkubernetes.rest-service.exposed.type=ClusterIP >> -Dkubernetes.config.file=kube_config >> -Dkubernetes.pod-template-file=pod-template.yaml >> local:///opt/flink/usrlib/flink-sql-1.0-SNAP
Re: Flink k8s HA 手动删除作业deployment导致的异常
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1] 之所以这样设计有两点原因: (1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复 (2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露 [1]. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up Best, Yang Zhanghao Chen 于2022年6月13日周一 07:53写道: > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 > ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 > > 2.基于k8s做HA的Flink job id皆为。 > > 开启 HA 的 Application mode 的 Flink job id > 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA > 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 > JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 > job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp > 相关联,导致作业从全新状态恢复。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > 可以看下官方的博客文章: > https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 > JIRA 设计文档: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink > > > Best, > Zhanghao Chen > > From: m18814122325 > Sent: Sunday, June 12, 2022 22:45 > To: user-zh@flink.apache.org > Subject: Flink k8s HA 手动删除作业deployment导致的异常 > > Flink version: 1.15.0 > > deploy mode: Native k8s application > > > > > 问题现象: > > 我以Native > k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s > 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 > > > > > kubectl delete deployment flink-bdra-sql-application-job -n > bdra-dev-flink-standalone > > > > > kubectl get configMap -n bdra-dev-flink-standalone > > > > > NAME >DATA AGE > > flink-bdra-sql-application-job--config-map > 2 13m > > flink-bdra-sql-application-job-cluster-config-map > 1 13m > > > > > > > > 我有以下疑问: > > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 2.基于k8s做HA的Flink job id皆为。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > > > > 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) > > flink run-application --target kubernetes-application -c CalculateUv > -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p > -Dkubernetes.container.image= > acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 > -Dkubernetes.namespace=bdra-dev-flink-standalone > -Dkubernetes.service-account=bdra-dev-flink-standalone-sa > -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 > -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 > -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m > -Dstate.backend=filesystem > -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 > -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=file:///opt/flink/log/recovery > -Ds3.access-key=* -Ds3.secret-key=* > -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory > -Dmetrics.reporter.influxdb.scheme=http > -Dmetrics.reporter.influxdb.host=influxdb > -Dmetrics.reporter.influxdb.port=8086 > -Dmetrics.reporter.influxdb.db=flink_metrics > -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 > -Dkubernetes.rest-service.exposed.type=ClusterIP > -Dkubernetes.config.file=kube_config > -Dkubernetes.pod-template-file=pod-template.yaml > local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar > > > > > 重启后自动从ConfigMap中恢复。 > > 2022-06-10 20:20:52,592 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [] - Successfully recovered 1 persisted job graphs. > > 2022-06-10 20:20:52,654 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting > RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher > at akka://flink/user/rpc/dispatcher_1 . > > 2022-06-10 20:20:53,552 INFO > org.apache.flink.kubernetes.KubernetesResourceManagerDr
Re: Flink k8s HA 手动删除作业deployment导致的异常
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 2.基于k8s做HA的Flink job id皆为。 开启 HA 的 Application mode 的 Flink job id 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 相关联,导致作业从全新状态恢复。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 可以看下官方的博客文章: https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink Best, Zhanghao Chen From: m18814122325 Sent: Sunday, June 12, 2022 22:45 To: user-zh@flink.apache.org Subject: Flink k8s HA 手动删除作业deployment导致的异常 Flink version: 1.15.0 deploy mode: Native k8s application 问题现象: 我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 kubectl delete deployment flink-bdra-sql-application-job -n bdra-dev-flink-standalone kubectl get configMap -n bdra-dev-flink-standalone NAME DATA AGE flink-bdra-sql-application-job--config-map 2 13m flink-bdra-sql-application-job-cluster-config-map 1 13m 我有以下疑问: 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 2.基于k8s做HA的Flink job id皆为。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar 重启后自动从ConfigMap中恢复。 2022-06-10 20:20:52,592 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs. 2022-06-10 20:20:52,654 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2022-06-10 20:20:53,552 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-06-10 20:20:53,552 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2022-06-10 20:20:55,352 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5. 2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) 2022-06-10 20:20:55,394 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2022-06-10 20:20:55,438 INFO
Flink k8s HA 手动删除作业deployment导致的异常
Flink version: 1.15.0 deploy mode: Native k8s application 问题现象: 我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 kubectl delete deployment flink-bdra-sql-application-job -n bdra-dev-flink-standalone kubectl get configMap -n bdra-dev-flink-standalone NAME DATA AGE flink-bdra-sql-application-job--config-map 2 13m flink-bdra-sql-application-job-cluster-config-map 1 13m 我有以下疑问: 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 2.基于k8s做HA的Flink job id皆为。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar 重启后自动从ConfigMap中恢复。 2022-06-10 20:20:52,592 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs. 2022-06-10 20:20:52,654 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2022-06-10 20:20:53,552 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-06-10 20:20:53,552 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2022-06-10 20:20:55,352 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5. 2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) 2022-06-10 20:20:55,394 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2022-06-10 20:20:55,438 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (). 2022-06-10 20:20:55,477 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for insert-into_default_catalog.default_database.buy_cnt_per_hour (). 2022-06-10 20:20:55,558 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}. 2022-06-10 20:20:55,572 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}. 2022-06-10 20:20:55,572 INFO org.apache.flink.runtime.checkpoint.Defaul
Re:Re: flink k8s ha
恩,明白保留HA配置的意义了但感觉是不是有bug,看我的问题,重启报找不到 /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件但oss上的HA目录只有 /high-availability.storageDir/task/completedCheckpointacdfb4309903既HA的configmap 信息和 high-availability.storageDir 目录里的文件不一致了 在 2022-06-08 23:06:03,"Weihua Hu" 写道: >Hi, >删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是 >HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA >状态中恢复。更多内容参考官方文档[1] > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up >Best, >Weihua > > >On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote: > >> configmap 如下 >> sql-test--jobmanager-leader >> sql-test-resourcemanager-leader >> sql-test-restserver-leader >> sql-test-dispatcher-leader >> >> >> >> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道: >> >> flink1.13.6 on k8s application 模式,设置HA >> high-availability: >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> high-availability.storageDir: oss >> 会在 k8s 上生成configmap >> >> >> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧) >> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息 >> >> 为什么会关注这个,因为碰到一个问题: >> 任务重启报错,找不到 >> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件, >> 但oss 是有文件 >> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903 >> 导致我任务一直报错;删除 上面的configmap 才能正常运行 >> >> >> >> >> >>
Re: flink k8s ha
Hi, 删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是 HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA 状态中恢复。更多内容参考官方文档[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up Best, Weihua On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote: > configmap 如下 > sql-test--jobmanager-leader > sql-test-resourcemanager-leader > sql-test-restserver-leader > sql-test-dispatcher-leader > > > > 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道: > > flink1.13.6 on k8s application 模式,设置HA > high-availability: > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > high-availability.storageDir: oss > 会在 k8s 上生成configmap > > > 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧) > 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息 > > 为什么会关注这个,因为碰到一个问题: > 任务重启报错,找不到 > /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件, > 但oss 是有文件 > /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903 > 导致我任务一直报错;删除 上面的configmap 才能正常运行 > > > > > >
flink k8s ha
flink1.13.6 on k8s application 模式,设置HA high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: oss 会在 k8s 上生成configmap 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧) 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息 为什么会关注这个,因为碰到一个问题: 任务重启报错,找不到 /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件, 但oss 是有文件 /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903 导致我任务一直报错;删除 上面的configmap 才能正常运行
flink ????????k8s????????jar??????????
flink??kubernetes session ??jarjar??flink/libjarjar??flink/lib??, > ??!
flink ????????k8s????????jar??????????
flink??kubernetes session jar ??!
回复:flink k8s部署使用s3做HA问题
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception | | johnjlong | | johnjl...@163.com | 签名由网易邮箱大师定制 在2021年7月27日 15:18,maker_d...@foxmail.com 写道: 各位开发者: 大家好! 我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下: state.backend: filesystem fs.allowed-fallback-filesystems: s3 s3.endpoint: http://172.16.14.40:9000 s3.path-style: true s3.access-key: admin s3.secret-key: admin123 containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar minio使用正常。 随后根据官方文档设置了HA,配置如下: kubernetes.cluster-id: flink-sessoion high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3:///flink/recovery flink-session正常部署,但在提交作业时报错如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 8 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ... 16 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files. at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201) at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) at java.util.concurrent.Complet
flink k8s部署使用s3做HA问题
各位开发者: 大家好! 我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下: state.backend: filesystem fs.allowed-fallback-filesystems: s3 s3.endpoint: http://172.16.14.40:9000 s3.path-style: true s3.access-key: admin s3.secret-key: admin123 containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar minio使用正常。 随后根据官方文档设置了HA,配置如下: kubernetes.cluster-id: flink-sessoion high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3:///flink/recovery flink-session正常部署,但在提交作业时报错如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 8 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ... 16 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files. at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201) at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119) at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084) at java.util.concur
Re: flink k8s高可用如何使用oss作为high-availability.storageDir?
使用社区官方镜像flink:1.12.1,你需要配置如下参数 最后两个参数是通过环境变量的方式来enable oss的plugin high-availability.storageDir: oss://flink/flink-ha fs.oss.endpoint: fs.oss.accessKeyId: fs.oss.accessKeySecret: containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-oss-fs-hadoop-1.12.1.jar containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: flink-oss-fs-hadoop-1.12.1.jar Best, Yang casel.chen 于2021年2月17日周三 下午5:42写道: > > 如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。
flink k8s高可用如何使用oss作为high-availability.storageDir?
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。