flink k8s operator chk config interval bug.inoperative

2024-03-14 文章 kcz
kcz
573693...@qq.com



 

Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
_DIRTY.json

看下以这个结尾的文件,内容应该是一个 json,如果不是标准 json 说明数据已经异常了,可以尝试删除



Best,
Weihua


On Tue, Mar 14, 2023 at 11:23 AM Jason_H  wrote:

> 您好,
> 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Weihua Hu |
> | 发送日期 | 2023年3月14日 10:39 |
> | 收件人 |  |
> | 主题 | Re: flink k8s 部署启动报错 |
> Hi,
>
> 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
> 可以参考文档[1],检查相关的 HA 路径,清理下异常数据
>
> 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path
>
> Best,
> Weihua
>
>
> On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:
>
> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
>
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>


回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好,
我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好,
对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

> hi,大家好
> 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> [?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.15.2.jar:1.15.2]
> at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
> ~[flink-dist-1.15.2.jar:1.15.2]
> ... 4 more
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


flink k8s 部署启动报错

2023-03-13 文章 Jason_H
hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of 
globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve 
JobResults of globally-terminated jobs from JobResultStore
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
 No content to map due to end-of-input
 at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, 
column: 0]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|

Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-31 文章 汪赟
退订

发自我的 iPhone

> 在 2022年10月28日,11:41,Weihua Hu  写道:
> 
> Hi, Young
> 
> 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
> 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
> 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。
> 
> 也许你可以创建一个 jira issue 来跟进这个问题
> 
> Best,
> Weihua
> 
> 
>> On Thu, Oct 27, 2022 at 6:51 PM Young Chen  wrote:
>> 
>> 【问题描述】
>> 
>> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
>> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>> 
>> 可以看到容器中如下error日志。
>> 
>> 
>> 
>> 【操作步骤】
>> 
>> 部署Cluster
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkDeployment
>> 
>> metadata:
>> 
>>  name: flink-cluster-1jm-checkpoint
>> 
>> spec:
>> 
>>  image: flink:1.15
>> 
>>  flinkVersion: v1_15
>> 
>>  flinkConfiguration:
>> 
>>taskmanager.numberOfTaskSlots: "1"
>> 
>>state.savepoints.dir:
>> file:///flink-data/savepoints
>> 
>>state.checkpoints.dir:
>> file:///flink-data/checkpoints
>> 
>>high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 
>>high-availability.storageDir:
>> file:///flink-data/ha
>> 
>>state.checkpoints.num-retained: "10"
>> 
>>  serviceAccount: flink
>> 
>>  ingress:
>> 
>>template: "{{name}}.{{namespace}}.k8s.rf.io"
>> 
>>  jobManager:
>> 
>>replicas: 2
>> 
>>  podTemplate:
>> 
>>spec:
>> 
>>  nodeSelector:
>> 
>>kubernetes.io/hostname: k8s17
>> 
>>  containers:
>> 
>>- name: flink-main-container
>> 
>>  volumeMounts:
>> 
>>- mountPath: /flink-data
>> 
>>  name: flink-volume
>> 
>>  volumes:
>> 
>>- name: flink-volume
>> 
>>  hostPath:
>> 
>># directory location on host
>> 
>>path: /tmp/flink
>> 
>># this field is optional
>> 
>>type: Directory
>> 
>> 
>> 
>> 部署job:
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkSessionJob
>> 
>> metadata:
>> 
>>  name: flink-job-1jm-checkpoint
>> 
>> spec:
>> 
>>  deploymentName: flink-cluster-1jm-checkpoint
>> 
>>  job:
>> 
>>jarURI:
>> file:///opt/flink/examples/streaming/StateMachineExample.jar
>> # 自己打的operator镜像包含了examples的jar
>> 
>>entryClass:
>> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>> 
>>parallelism: 1
>> 
>>upgradeMode: savepoint
>> 
>> 
>> 
>> 
>> 
>> 【相关日志】
>> 
>>  1.  job部署成功可以运行的一次,operator日志:
>> 
>> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
>> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>> 
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException:
>> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
>> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>> 
>> at
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>> 
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>> 
>> at java.base/java.lang.Thread.run(Unknown Source)
>> 
>> ]
>> 
>> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
>> Source
>> 
>> 一个JobManager
>> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6

Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-27 文章 Weihua Hu
Hi, Young

你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。

也许你可以创建一个 jira issue 来跟进这个问题

Best,
Weihua


On Thu, Oct 27, 2022 at 6:51 PM Young Chen  wrote:

> 【问题描述】
>
> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>
> 可以看到容器中如下error日志。
>
>
>
> 【操作步骤】
>
> 部署Cluster
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:
>
>   name: flink-cluster-1jm-checkpoint
>
> spec:
>
>   image: flink:1.15
>
>   flinkVersion: v1_15
>
>   flinkConfiguration:
>
> taskmanager.numberOfTaskSlots: "1"
>
> state.savepoints.dir:
> file:///flink-data/savepoints
>
> state.checkpoints.dir:
> file:///flink-data/checkpoints
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir:
> file:///flink-data/ha
>
> state.checkpoints.num-retained: "10"
>
>   serviceAccount: flink
>
>   ingress:
>
> template: "{{name}}.{{namespace}}.k8s.rf.io"
>
>   jobManager:
>
> replicas: 2
>
>   podTemplate:
>
> spec:
>
>   nodeSelector:
>
> kubernetes.io/hostname: k8s17
>
>   containers:
>
> - name: flink-main-container
>
>   volumeMounts:
>
> - mountPath: /flink-data
>
>   name: flink-volume
>
>   volumes:
>
> - name: flink-volume
>
>   hostPath:
>
> # directory location on host
>
> path: /tmp/flink
>
> # this field is optional
>
> type: Directory
>
>
>
> 部署job:
>
>
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkSessionJob
>
> metadata:
>
>   name: flink-job-1jm-checkpoint
>
> spec:
>
>   deploymentName: flink-cluster-1jm-checkpoint
>
>   job:
>
> jarURI:
> file:///opt/flink/examples/streaming/StateMachineExample.jar
> # 自己打的operator镜像包含了examples的jar
>
> entryClass:
> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>
> parallelism: 1
>
> upgradeMode: savepoint
>
>
>
>
>
> 【相关日志】
>
>   1.  job部署成功可以运行的一次,operator日志:
>
> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
>
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ]
>
> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
> Source
>
> 一个JobManager
> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>
>
>
>
>
>   1.  job部署失败operator日志:
>
> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
> namespace='flink'}, version: 120505701} failed.
>
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> java.util.concurrent.CompletionException:

Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

2022-10-27 文章 Young Chen
【问题描述】

Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager), 
然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。

可以看到容器中如下error日志。



【操作步骤】

部署Cluster



apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  name: flink-cluster-1jm-checkpoint

spec:

  image: flink:1.15

  flinkVersion: v1_15

  flinkConfiguration:

taskmanager.numberOfTaskSlots: "1"

state.savepoints.dir: 
file:///flink-data/savepoints

state.checkpoints.dir: 
file:///flink-data/checkpoints

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: file:///flink-data/ha

state.checkpoints.num-retained: "10"

  serviceAccount: flink

  ingress:

template: "{{name}}.{{namespace}}.k8s.rf.io"

  jobManager:

replicas: 2

  podTemplate:

spec:

  nodeSelector:

kubernetes.io/hostname: k8s17

  containers:

- name: flink-main-container

  volumeMounts:

- mountPath: /flink-data

  name: flink-volume

  volumes:

- name: flink-volume

  hostPath:

# directory location on host

path: /tmp/flink

# this field is optional

type: Directory



部署job:



apiVersion: flink.apache.org/v1beta1

kind: FlinkSessionJob

metadata:

  name: flink-job-1jm-checkpoint

spec:

  deploymentName: flink-cluster-1jm-checkpoint

  job:

jarURI: 
file:///opt/flink/examples/streaming/StateMachineExample.jar
  # 自己打的operator镜像包含了examples的jar

entryClass: 
org.apache.flink.streaming.examples.statemachine.StateMachineExample

parallelism: 1

upgradeMode: savepoint





【相关日志】

  1.  job部署成功可以运行的一次,operator日志:

2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService 
[ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar: 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.

java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: File 
06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist in 
/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.

at 
org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)

at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.base/java.lang.Thread.run(Unknown Source)

]

at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source

一个JobManager 
Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。





  1.  job部署失败operator日志:

2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/flink-job-1jm-checkpoint] Error during event processing 
ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint', 
namespace='flink'}, version: 120505701} failed.

org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
org.apache.flink.util.FlinkRuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., 

Re: batch job 结束时, flink-k8s-operator crd 状态展示不清晰

2022-10-25 文章 Yang Wang
从1.15开始,任务结束不会主动把JobManager删除掉了。所以Kubernetes Operator就可以正常查到Job状态并且更新

Best,
Yang

¥¥¥  于2022年10月25日周二 15:58写道:

> 退订
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
>
>  发送时间: 2022年10月25日(星期二) 下午3:33
> 收件人: "user-zh"
> 主题: batch job 结束时, flink-k8s-operator  crd 状态展示不清晰
>
>
>
> hi,
> 我在使用flink-k8s-operator 部署batch job。 我发现当batch job 结束之后,
> flink-k8s-operator  的 FlinkDeployment CRD 状态发生了变化:
> jobManagerDeploymentStatus 变成了"missing", "error" 变成了“Missing JobManager
> deployment”。 我想这个应该是batch job执行完毕之后,native-k8s 自动将JobmanagerDeployment
> 删除导致的。 请问该如何通过判断CRD的状态来监控batch job的运行结束? jobStatus 中state 如果能够是“finish”
> 那么就很方便判断了。
>
> 
> status:
>   clusterInfo:
>     flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00
>     flink-version: 1.14.6
>   error: Missing JobManager deployment
>   jobManagerDeploymentStatus: MISSING
>   jobStatus:
>     jobId: 3c5807b038300f46154d72c58f074715
>     jobName: batch-job-lab-o8yln9
>     savepointInfo:
>   lastPeriodicSavepointTimestamp: 0
>   savepointHistory: []
>   triggerId: ''
>   triggerTimestamp: 0
>   triggerType: UNKNOWN
>     startTime: '181370751'
>     state: RECONCILING
>     updateTime: '181379021'
>
> 


batch job 结束时, flink-k8s-operator crd 状态展示不清晰

2022-10-25 文章 Liting Liu (litiliu)
hi,
我在使用flink-k8s-operator 部署batch job。 我发现当batch job 结束之后, flink-k8s-operator  
的 FlinkDeployment CRD 状态发生了变化: jobManagerDeploymentStatus 变成了"missing", "error" 
变成了“Missing JobManager deployment”。 我想这个应该是batch job执行完毕之后,native-k8s 
自动将JobmanagerDeployment 删除导致的。 请问该如何通过判断CRD的状态来监控batch job的运行结束? jobStatus 
中state 如果能够是“finish” 那么就很方便判断了。


status:
  clusterInfo:
flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00
flink-version: 1.14.6
  error: Missing JobManager deployment
  jobManagerDeploymentStatus: MISSING
  jobStatus:
jobId: 3c5807b038300f46154d72c58f074715
jobName: batch-job-lab-o8yln9
savepointInfo:
  lastPeriodicSavepointTimestamp: 0
  savepointHistory: []
  triggerId: ''
  triggerTimestamp: 0
  triggerType: UNKNOWN
startTime: '181370751'
state: RECONCILING
updateTime: '181379021'




Re: 怎样禁用 flink k8s operator 自动删除停止后的 pod ?

2022-10-19 文章 Biao Geng
Hi,
能请问下你使用的flink版本和flink kubernetes operator版本吗?
如果flink版本>=1.15.0的话,app运行结束后JobManager Pod应该是会保留的。

Best,
Biao Geng

highfei2011  于2022年10月19日周三 14:11写道:

> 问题描述:当使用 flink k8s operator 创建 flink app 后,不管 app 运行成功,或者失败, operator 都会自动
> delete 掉停止的 pod。我再次使用命令查看日志时, 由于 pod 不存在,所以日志无法查看。


Re: flink-k8s-operator中webhook的作用

2022-07-27 文章 Yang Wang
Webhook主要的作用是做CR的校验,避免提交到K8s上之后才发现

例如:parallelism被错误的设置为负值,jarURI没有设置等

Best,
Yang

Kyle Zhang  于2022年7月27日周三 18:59写道:

> Hi,all
>     最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置
> webhook.create=false对整体功能有什么影响?
>
> Best regards
>
> [1]
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/
>


flink-k8s-operator中webhook的作用

2022-07-27 文章 Kyle Zhang
Hi,all
最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置
webhook.create=false对整体功能有什么影响?

Best regards

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/


Re: Flink k8s 作业提交流程

2022-06-27 文章 Lijie Wang
Hi,

使用文档可以查看:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes
设计文档可以查看:
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
jira: https://issues.apache.org/jira/browse/FLINK-9953

Best,
Lijie

hjw <1010445...@qq.com.invalid> 于2022年6月28日周二 00:11写道:

> Flink version:1.15.0
> 请问在1.15.0版本Flink在native k8s作业提交流程是如何实现的?亦或者说Flink on Native k8s
> 是如何设计的,我想了解学习,如果大家有相关文档资料,麻烦告知,感谢:)
>  


Flink k8s ????????????

2022-06-27 文章 hjw
Flink version:1.15.0
??1.15.0Flink??native k8s?Flink on 
Native k8s ??:)
 

Re:Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-13 文章 m18814122325



感谢两位大大回复!














在 2022-06-13 10:09:39,"Yang Wang"  写道:
>Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
>之所以这样设计有两点原因:
>(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
>(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露
>
>[1].
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
>
>
>Best,
>Yang
>
>Zhanghao Chen  于2022年6月13日周一 07:53写道:
>
>> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
>> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>>
>> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
>> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>>
>> 2.基于k8s做HA的Flink job id皆为。
>>
>> 开启 HA 的 Application mode 的 Flink job id
>> 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
>> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
>> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
>> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
>> 相关联,导致作业从全新状态恢复。
>>
>> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>>
>> 可以看下官方的博客文章: 
>> https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
>> JIRA 设计文档:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>
>>
>> Best,
>> Zhanghao Chen
>> 
>> From: m18814122325 
>> Sent: Sunday, June 12, 2022 22:45
>> To: user-zh@flink.apache.org 
>> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>>
>> Flink version: 1.15.0
>>
>> deploy mode: Native k8s application
>>
>>
>>
>>
>> 问题现象:
>>
>> 我以Native
>> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
>> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>>
>>
>>
>>
>> kubectl delete deployment flink-bdra-sql-application-job -n
>> bdra-dev-flink-standalone
>>
>>
>>
>>
>> kubectl get configMap -n bdra-dev-flink-standalone
>>
>>
>>
>>
>> NAME
>>DATA   AGE
>>
>> flink-bdra-sql-application-job-00000000-config-map
>> 2  13m
>>
>> flink-bdra-sql-application-job-cluster-config-map
>>   1  13m
>>
>>
>>
>>
>>
>>
>>
>> 我有以下疑问:
>>
>> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
>> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>>
>> 2.基于k8s做HA的Flink job id皆为。
>>
>> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>>
>>
>>
>>
>> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>>
>> flink run-application --target kubernetes-application -c CalculateUv
>> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
>> -Dkubernetes.container.image=
>> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
>> -Dkubernetes.namespace=bdra-dev-flink-standalone
>> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
>> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
>> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
>> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
>> -Dstate.backend=filesystem
>> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
>> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
>> -Ds3.access-key=* -Ds3.secret-key=*
>> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
>> -Dmetrics.reporter.influxdb.scheme=http
>> -Dmetrics.reporter.influxdb.host=influxdb
>> -Dmetrics.reporter.influxdb.port=8086
>> -Dmetrics.reporter.influxdb.db=flink_metrics
>> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
>> -Dkubernetes.rest-service.exposed.type=ClusterIP
>> -Dkubernetes.config.file=kube_config
>> -Dkubernetes.pod-template-file=pod-template.yaml
>> local:///opt/flink/usrlib/flink-sql-1.0-SNAP

Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 文章 Yang Wang
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
之所以这样设计有两点原因:
(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up


Best,
Yang

Zhanghao Chen  于2022年6月13日周一 07:53写道:

> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 开启 HA 的 Application mode 的 Flink job id
> 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
> 相关联,导致作业从全新状态恢复。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
> 可以看下官方的博客文章: 
> https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
> JIRA 设计文档:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>
>
> Best,
> Zhanghao Chen
> 
> From: m18814122325 
> Sent: Sunday, June 12, 2022 22:45
> To: user-zh@flink.apache.org 
> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>
> Flink version: 1.15.0
>
> deploy mode: Native k8s application
>
>
>
>
> 问题现象:
>
> 我以Native
> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>
>
>
>
> kubectl delete deployment flink-bdra-sql-application-job -n
> bdra-dev-flink-standalone
>
>
>
>
> kubectl get configMap -n bdra-dev-flink-standalone
>
>
>
>
> NAME
>DATA   AGE
>
> flink-bdra-sql-application-job--config-map
> 2  13m
>
> flink-bdra-sql-application-job-cluster-config-map
>   1  13m
>
>
>
>
>
>
>
> 我有以下疑问:
>
> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 2.基于k8s做HA的Flink job id皆为。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
>
>
>
> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>
> flink run-application --target kubernetes-application -c CalculateUv
> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
> -Dkubernetes.container.image=
> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
> -Dkubernetes.namespace=bdra-dev-flink-standalone
> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
> -Dstate.backend=filesystem
> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
> -Ds3.access-key=* -Ds3.secret-key=*
> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
> -Dmetrics.reporter.influxdb.scheme=http
> -Dmetrics.reporter.influxdb.host=influxdb
> -Dmetrics.reporter.influxdb.port=8086
> -Dmetrics.reporter.influxdb.db=flink_metrics
> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.config.file=kube_config
> -Dkubernetes.pod-template-file=pod-template.yaml
> local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
>
>
>
>
> 重启后自动从ConfigMap中恢复。
>
> 2022-06-10 20:20:52,592 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Successfully recovered 1 persisted job graphs.
>
> 2022-06-10 20:20:52,654 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDr

Re: Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 文章 Zhanghao Chen
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 
HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。

2.基于k8s做HA的Flink job id皆为。

开启 HA 的 Application mode 的 Flink job id 
皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 
服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 
生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id 
(一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 
相关联,导致作业从全新状态恢复。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。

可以看下官方的博客文章: 
https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 
设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink


Best,
Zhanghao Chen

From: m18814122325 
Sent: Sunday, June 12, 2022 22:45
To: user-zh@flink.apache.org 
Subject: Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO

Flink k8s HA 手动删除作业deployment导致的异常

2022-06-12 文章 m18814122325
Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native 
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  
bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME
 DATA   AGE

flink-bdra-sql-application-job--config-map  
2  13m

flink-bdra-sql-application-job-cluster-config-map   
 1  13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink 
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv 
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p 
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 
-Dkubernetes.namespace=bdra-dev-flink-standalone 
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa 
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m 
-Dstate.backend=filesystem 
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
 
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
 
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=file:///opt/flink/log/recovery 
-Ds3.access-key=* -Ds3.secret-key=* 
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
 -Dmetrics.reporter.influxdb.scheme=http 
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 
-Dmetrics.reporter.influxdb.db=flink_metrics 
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 
-Dkubernetes.rest-service.exposed.type=ClusterIP 
-Dkubernetes.config.file=kube_config 
-Dkubernetes.pod-template-file=pod-template.yaml 
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Initializing job 
'insert-into_default_catalog.default_database.buy_cnt_per_hour' 
().

2022-06-10 20:20:55,477 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for 
insert-into_default_catalog.default_database.buy_cnt_per_hour 
().

2022-06-10 20:20:55,558 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}.

2022-06-10 20:20:55,572 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Found 1 checkpoints in 
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}.

2022-06-10 20:20:55,572 INFO  
org.apache.flink.runtime.checkpoint.Defaul

Re:Re: flink k8s ha

2022-06-08 文章 json
恩,明白保留HA配置的意义了但感觉是不是有bug,看我的问题,重启报找不到 
/high-availability.storageDir/task/completedCheckpointe5c125ad20ea 
文件但oss上的HA目录只有 
/high-availability.storageDir/task/completedCheckpointacdfb4309903既HA的configmap
 信息和 high-availability.storageDir 目录里的文件不一致了
在 2022-06-08 23:06:03,"Weihua Hu"  写道:
>Hi,
>删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是
>HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA
>状态中恢复。更多内容参考官方文档[1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
>Best,
>Weihua
>
>
>On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote:
>
>> configmap 如下
>> sql-test--jobmanager-leader
>> sql-test-resourcemanager-leader
>> sql-test-restserver-leader
>> sql-test-dispatcher-leader
>>
>>
>>
>> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
>>
>> flink1.13.6 on k8s application 模式,设置HA
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> high-availability.storageDir: oss
>> 会在 k8s 上生成configmap
>>
>>
>> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
>> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息
>>
>> 为什么会关注这个,因为碰到一个问题:
>> 任务重启报错,找不到
>> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件,
>> 但oss 是有文件
>> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
>> 导致我任务一直报错;删除 上面的configmap 才能正常运行
>>
>>
>>
>>
>>
>>


Re: flink k8s ha

2022-06-08 文章 Weihua Hu
Hi,
删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是
HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA
状态中恢复。更多内容参考官方文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
Best,
Weihua


On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote:

> configmap 如下
> sql-test--jobmanager-leader
> sql-test-resourcemanager-leader
> sql-test-restserver-leader
> sql-test-dispatcher-leader
>
>
>
> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
>
> flink1.13.6 on k8s application 模式,设置HA
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss
> 会在 k8s 上生成configmap
>
>
> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息
>
> 为什么会关注这个,因为碰到一个问题:
> 任务重启报错,找不到
> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件,
> 但oss 是有文件
> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
> 导致我任务一直报错;删除 上面的configmap 才能正常运行
>
>
>
>
>
>


flink k8s ha

2022-06-08 文章 json
flink1.13.6 on k8s application 模式,设置HA
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
high-availability.storageDir: oss
会在 k8s 上生成configmap


1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息

为什么会关注这个,因为碰到一个问题:
任务重启报错,找不到 /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 
文件,
但oss 是有文件 
/high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
导致我任务一直报错;删除 上面的configmap 才能正常运行

flink ????????k8s????????jar??????????

2022-04-25 文章 ????????
flink??kubernetes session 
??jarjar??flink/libjarjar??flink/lib??,
> ??!

flink ????????k8s????????jar??????????

2022-04-25 文章 ????????
flink??kubernetes session  
jar
??!

回复:flink k8s部署使用s3做HA问题

2021-07-27 文章 johnjlong
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception


| |
johnjlong
|
|
johnjl...@163.com
|
签名由网易邮箱大师定制
在2021年7月27日 15:18,maker_d...@foxmail.com 写道:
各位开发者:
大家好!

我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下:
state.backend: filesystem
fs.allowed-fallback-filesystems: s3
s3.endpoint: http://172.16.14.40:9000
s3.path-style: true
s3.access-key: admin
s3.secret-key: admin123
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
minio使用正常。

随后根据官方文档设置了HA,配置如下:
kubernetes.cluster-id: flink-sessoion
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink/recovery

flink-session正常部署,但在提交作业时报错如下:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
... 16 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload 
job files.
at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at 
java.util.concurrent.Complet

flink k8s部署使用s3做HA问题

2021-07-27 文章 maker_d...@foxmail.com
各位开发者:
大家好!

我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下:
state.backend: filesystem
fs.allowed-fallback-filesystems: s3
s3.endpoint: http://172.16.14.40:9000
s3.path-style: true
s3.access-key: admin
s3.secret-key: admin123
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
minio使用正常。

随后根据官方文档设置了HA,配置如下:
kubernetes.cluster-id: flink-sessoion
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink/recovery

flink-session正常部署,但在提交作业时报错如下:
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
... 16 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload 
job files.
at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
at 
java.util.concur

Re: flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 文章 Yang Wang
使用社区官方镜像flink:1.12.1,你需要配置如下参数
最后两个参数是通过环境变量的方式来enable oss的plugin

high-availability.storageDir: oss://flink/flink-ha
fs.oss.endpoint: 
fs.oss.accessKeyId: 
fs.oss.accessKeySecret: 
containerized.master.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar

Best,
Yang

casel.chen  于2021年2月17日周三 下午5:42写道:

>
> 如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。


flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 文章 casel.chen
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。