RE: Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-20 Thread Jiabao Sun
Hi Chosen,

Whether kafka appender is supported or not has no relation to the 
flink-kubernetes-operator. 
It only depends on whether log4j2 supports kafka appender.

From the error message, it appears that the error is caused by the absence of 
the log4j-layout-template-json[1] plugin.
For the customized JARs, we can consider customizing the base image or refer to 
the examples in the podTemplate[2]
and use initContainers to download the JAR files and place them in the 
flink/lib directory.

Hope it helps.

Best,
Jiabao

[1] https://logging.apache.org/log4j/2.x/manual/json-template-layout.html
[2] 
https://github.com/apache/flink-kubernetes-operator/blob/808edfd156dc12932b6dd03146ccd2bec49963fb/examples/pod-template.yaml

On 2023/12/05 14:42:44 秋成 王 wrote:
> Hi,
> 
> I am recently working on syncing my Flink log to Kafka via log4j2 Kafka 
> appender. I have a log4j2.properties file which works fine locally, say run 
> my flink fat jar form terminal via following command:
>   PS D:\repo>>java -cp .\reconciliation-1.0-SNAPSHOT.jar 
> The log can be synced to Kafka successfully when run locally.
> 
> The contents of log4j2.properties file are pasted below:
> rootLogger.level = INFO
> rootLogger.appenderRef.kafka.ref = KafkaLog
> appender.kafka.type = Kafka
> appender.kafka.name = KafkaLog
> 
> appender.kafka.topic = topicName
> appender.kafka.properties[0].type=Property
> appender.kafka.properties[0].name=bootstrap.servers
> appender.kafka.properties[0].value=
> appender.kafka.properties[1].type=Property
> appender.kafka.properties[1].name=sasl.mechanism
> appender.kafka.properties[1].value=PLAIN
> appender.kafka.properties[2].type=Property
> appender.kafka.properties[2].name=sasl.jaas.config
> appender.kafka.properties[2].value=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="$ConnectionString" 
> password="${env:log_event_hub_connection_string}";
> appender.kafka.properties[3].type=Property
> appender.kafka.properties[3].name=security.protocol
> appender.kafka.properties[3].value=SASL_SSL
> 
> appender.kafka.layout.type = JsonTemplateLayout
> appender.kafka.layout.eventTemplateUri = classpath:kusto-applogv2-layout.json
> appender.kafka.layout.eventTemplateAdditionalField[0].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[0].key = Application
> appender.kafka.layout.eventTemplateAdditionalField[0].value = reconciliation
> appender.kafka.layout.eventTemplateAdditionalField[0].format = String
> appender.kafka.layout.eventTemplateAdditionalField[1].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[1].key = Language
> appender.kafka.layout.eventTemplateAdditionalField[1].value = Java
> appender.kafka.layout.eventTemplateAdditionalField[1].format = String
> 
> 
> I am now deploying Flink via Flink Kubernetes operator. However, after I 
> copied the contents in log4j2.properties file to log4j-console.properties 
> under section of logConfiguration in FlinkDeployment yaml, the kafka Appender 
> failed to init with error message:
> 
>   2023-12-05 10:12:36,903 main ERROR Unable to locate plugin type for 
> JsonTemplateLayout
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:37,047 main ERROR Unable to locate plugin for 
> JsonTemplateLayout
> 
> 
> My question is that Does Flink Kubernetes operator support Kafka appender 
> configuration in log4j-console.properties? If so can anyone provide me with 
> an example?
> 
> 
> PS: similar error message once showed up when run locally, I fixed the issue 
> with sulotion posted here. via adding
> 
> com.github.edwgiz.mavenShadePlugin.log4j2CacheTransformer.PluginsCacheFileTransformer
>  to pom file.
> 
> java - Console contains an invalid element or attribute "JsonTemplateLayout" 
> even after adding dependency - Stack 
> Overflow
> 
> 
> Thanks,
> 
> Chosen
> 

Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-05 Thread 秋成 王
Hi,

I am recently working on syncing my Flink log to Kafka via log4j2 Kafka 
appender. I have a log4j2.properties file which works fine locally, say run my 
flink fat jar form terminal via following command:
  PS D:\repo>>java -cp .\reconciliation-1.0-SNAPSHOT.jar 
The log can be synced to Kafka successfully when run locally.

The contents of log4j2.properties file are pasted below:
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = KafkaLog
appender.kafka.type = Kafka
appender.kafka.name = KafkaLog

appender.kafka.topic = topicName
appender.kafka.properties[0].type=Property
appender.kafka.properties[0].name=bootstrap.servers
appender.kafka.properties[0].value=
appender.kafka.properties[1].type=Property
appender.kafka.properties[1].name=sasl.mechanism
appender.kafka.properties[1].value=PLAIN
appender.kafka.properties[2].type=Property
appender.kafka.properties[2].name=sasl.jaas.config
appender.kafka.properties[2].value=org.apache.kafka.common.security.plain.PlainLoginModule
 required username="$ConnectionString" 
password="${env:log_event_hub_connection_string}";
appender.kafka.properties[3].type=Property
appender.kafka.properties[3].name=security.protocol
appender.kafka.properties[3].value=SASL_SSL

appender.kafka.layout.type = JsonTemplateLayout
appender.kafka.layout.eventTemplateUri = classpath:kusto-applogv2-layout.json
appender.kafka.layout.eventTemplateAdditionalField[0].type = 
EventTemplateAdditionalField
appender.kafka.layout.eventTemplateAdditionalField[0].key = Application
appender.kafka.layout.eventTemplateAdditionalField[0].value = reconciliation
appender.kafka.layout.eventTemplateAdditionalField[0].format = String
appender.kafka.layout.eventTemplateAdditionalField[1].type = 
EventTemplateAdditionalField
appender.kafka.layout.eventTemplateAdditionalField[1].key = Language
appender.kafka.layout.eventTemplateAdditionalField[1].value = Java
appender.kafka.layout.eventTemplateAdditionalField[1].format = String


I am now deploying Flink via Flink Kubernetes operator. However, after I copied 
the contents in log4j2.properties file to log4j-console.properties under 
section of logConfiguration in FlinkDeployment yaml, the kafka Appender failed 
to init with error message:

  2023-12-05 10:12:36,903 main ERROR Unable to locate plugin type for 
JsonTemplateLayout

  2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
EventTemplateAdditionalField

  2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
EventTemplateAdditionalField

  2023-12-05 10:12:37,047 main ERROR Unable to locate plugin for 
JsonTemplateLayout


My question is that Does Flink Kubernetes operator support Kafka appender 
configuration in log4j-console.properties? If so can anyone provide me with an 
example?


PS: similar error message once showed up when run locally, I fixed the issue 
with sulotion posted here. via adding

com.github.edwgiz.mavenShadePlugin.log4j2CacheTransformer.PluginsCacheFileTransformer
 to pom file.

java - Console contains an invalid element or attribute "JsonTemplateLayout" 
even after adding dependency - Stack 
Overflow


Thanks,

Chosen


回复:flink on k8s 任务状态监控问题

2023-07-16 Thread tanjialiang
Hi, 在1.15之前,一般是通过history server[1]去拿到最终状态,在1.15之后可以设置这两个Experimental参数
execution.shutdown-on-application-finish[2]
execution.submit-failed-job-on-application-error[3]


设置两个参数的前提条件是必须开启了JobManager的HA[4]


[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/advanced/historyserver
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-shutdown-on-application-finish
[3]: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-submit-failed-job-on-application-error
[4]: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/overview


best,
tanjialiang.

回复:flink on k8s 任务状态监控问题

2023-07-16 Thread 阿华田


history server是不是有延迟性 做不到实时获取任务的状态

| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2023年07月15日 12:14,casel.chen 写道:
可以查看history server














在 2023-07-14 18:36:42,"阿华田"  写道:


hello  各位大佬, flink on K8S  ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 
无法判断flink任务是正常Finished  还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink on k8s 任务状态监控问题

2023-07-14 Thread 阿华田


hello  各位大佬, flink on K8S  ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 
无法判断flink任务是正常Finished  还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 Thread Shammon FY
Hi

像上面提到的,jobClient.get().getAccumulators()会从flink集群获取作业相关信息,如果是application模式,作业结束后flink集群也会退出。你可以通过其他方式,包括session集群运行或者启动history
server等方式,也可以通过自定义metrics等输出到其他系统

Best,
Shammon


On Tue, Mar 7, 2023 at 11:27 PM 李银苗  wrote:

> 退订


Re:Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 Thread 李银苗
退订

Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-06 Thread Weihua Hu
Hi,

按照你的描述,我猜测你使用的是 Application 模式吧?这种模式下 user code 会在 JobManager 侧执行,Job
执行结束后会直接 shutdown cluster。

可以尝试使用 session mode[1] 部署 cluster

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode

Best,
Weihua


On Mon, Mar 6, 2023 at 8:54 PM wangwei  wrote:

>
> Hi,大佬们
>
> 如何在任务结束后获取Accumulator 数据?
> 参考代码:(但是无法获取)
> ableResult execute = statementSet.execute();
> Optional jobClient = execute.getJobClient();
> jobClient.get().getAccumulators().get()
>
> PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?
>
> 大佬求助!!先磕为敬
>


flink on K8S(operator) 如何获取 Accumulator

2023-03-06 Thread wangwei

Hi,大佬们

如何在任务结束后获取Accumulator 数据?
参考代码:(但是无法获取)
ableResult execute = statementSet.execute();
Optional jobClient = execute.getJobClient();
jobClient.get().getAccumulators().get()

PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?

大佬求助!!先磕为敬


Re: How to get failed streaming Flink job log in Flink Native K8s mode?

2023-01-03 Thread Yang Wang
I think you might need a sidecar container or daemonset to collect the
Flink logs and store into a persistent storage.
You could find more information here[1].

[1].
https://www.alibabacloud.com/blog/best-practices-of-kubernetes-log-collection_596356

Best,
Yang

hjw  于2022年12月22日周四 23:28写道:

> On Flink Native K8s mode, the pod of JM and TM will disappear if the
> streaming job failed.Are there any ways to get the log of the failed
> Streaming job?
> I only think of a solution that is to mount job logs to NFS for
> persistence through pv-pvc defined in pod-template.
>
> ENV:
> Flink version:1.15.0
> Mode: Flink kubernetes Operator 1.2.0(Application Mode)
>
> --
> Best,
> Hjw
>


How to get failed streaming Flink job log in Flink Native K8s mode?

2022-12-22 Thread hjw
On Flink Native K8s mode, the pod of JM and TM will disappear if the streaming 
job failed.Are there any ways to get the log of the failed Streaming job?
I only think of a solution that is to mount job logs to NFS for persistence 
through pv-pvc defined in pod-template.


ENV:
Flink version:1.15.0
Mode: Flink kubernetes Operator 1.2.0(Application Mode)



--

Best,
Hjw

flink on k8s节点网络io飙高问题如何解决?

2022-12-05 Thread casel.chen
我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。
目前我想的办法是利用节点亲和性手动从负载高的节点上迁移出那些耗网络io高的作业pod到负载低的节点,但是过一段时间又会出现类似的问题,请问:
1. 有什么办法可以彻底消除这种网络负载不均衡问题么?
2. k8s能否根据pod网络io负载进行合理调度吗?

Re: flink on k8s 提交作业,使用 oss 作为 checkpoint 地址,但找不到 oss

2022-11-07 Thread Lijie Wang
 flink-oss-fs-hadoop-1.13.6.jar 这个 jar 需要放到  flink 的 lib 目录下

Best,
Lijie

highfei2011  于2022年11月1日周二 16:23写道:

> 包冲突了。
>
>
> 在 2022年11月1日 15:39,highfei2011 写道:
>
>
> flink 版本:apache flink 1.13.6 flink operator 版本: 1.2.0
> 提交命令:kubernetes-jobmanager.sh kubernetes-application 异常: Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oss'. The scheme is directly
> supported by Flink through t he following plugin: flink-oss-fs-hadoop.
> Please ensure that each plugin resides within its own subfolder within the
> plugins directory. See https://ci.apache.org/projects/flink/flink-docs
> -stable/ops/plugins.html for more information. If you want to use a Hadoop
> file system for that scheme, please add the scheme to the configuration
> fs.allowed-fallback-filesystems. For a f ull list of supported file
> systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> 我排查了 /opt/flink/opt/目录,下面是有 flink-oss-fs-hadoop-1.13.6.jar 注:本地测试正常,仅使用
> flink operator 提交时,发生如上异常。


Re: Flink Native K8S RBAC

2022-10-20 Thread Yang Wang
I have created a ticket[1] to fill the missing part in the native K8s
documentation.

[1]. https://issues.apache.org/jira/browse/FLINK-29705

Best,
Yang

Gyula Fóra  于2022年10月20日周四 13:37写道:

> Hi!
>
> As a reference you can look at how the Flink Kubernetes Operator manages
> RBAC settings:
>
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/
>
> https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml
>
> Cheers,
> Gyula
>
> On Wed, Oct 19, 2022 at 9:46 PM Calvin D Souza via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> I am using custom service account for flink native k8s. These are the
>> rules for the clusterrole I’m using:
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["pods"]
>> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>> - apiGroups: [""]
>> resources: ["services"]
>> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>> - apiGroups: ["apps"]
>> resources: ["deployments"]
>> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>> - apiGroups: [""]
>> resources: ["pods/log"]
>> verbs: ["get", "list", "watch"]
>> - apiGroups: ["extensions"]
>> resources: ["deployments"]
>> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>>
>>
>> Are there any that I am missing or that are not needed?
>>
>> Thanks,
>> Calvin
>>
>


Re: Flink Native K8S RBAC

2022-10-19 Thread Gyula Fóra
Hi!

As a reference you can look at how the Flink Kubernetes Operator manages
RBAC settings:

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/
https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml

Cheers,
Gyula

On Wed, Oct 19, 2022 at 9:46 PM Calvin D Souza via user <
user@flink.apache.org> wrote:

> Hi,
>
> I am using custom service account for flink native k8s. These are the
> rules for the clusterrole I’m using:
>
> rules:
> - apiGroups: [""]
> resources: ["pods"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["services"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: ["apps"]
> resources: ["deployments"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["pods/log"]
> verbs: ["get", "list", "watch"]
> - apiGroups: ["extensions"]
> resources: ["deployments"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>
>
> Are there any that I am missing or that are not needed?
>
> Thanks,
> Calvin
>


Flink Native K8S RBAC

2022-10-19 Thread Calvin D Souza via user
Hi,

I am using custom service account for flink native k8s. These are the rules for 
the clusterrole I’m using:

rules:
  - apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "list", "watch"]
  - apiGroups: ["extensions"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]


Are there any that I am missing or that are not needed?

Thanks,
Calvin

flink on k8s的taskmanager为啥不是Replica Sets

2022-08-31 Thread Jiacheng Jiang
请问大伙:

Flink on k8s里面,为什么taskmanager是单独的pod,而不是做成Replica Sets 利用副本数来控制taskmanager的数量





从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送



Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 Thread Wu,Zhiheng
找不到TM的日志。因为TM还没有启动起来,pod就挂了
我看下是否是这个原因,目前确实没有增加-Dkubernetes.taskmanager.service-account这个参数
-Dkubernetes.taskmanager.service-account这个参数是在./bin/kubernetes-session.sh启动session集群的时候加的吗

在 2022/8/31 下午4:10,“Yang Wang” 写入:

我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> 
>-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod 
status:
> Failed(reason=null, message=null)
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0,
> taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes,
> numSlots=6}, current pending count: 2.
> >
> 

Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 Thread Yang Wang
我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> >-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod status:
> Failed(reason=null, message=null)
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0,
> taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes,
> numSlots=6}, current pending count: 2.
> >
> >2022-08-29 23:49:07,514 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 13 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,514 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. 

Re:【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-30 Thread Xuyang
Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
>【问题描述】
>启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
>
>1. 任务配置和启动过程
>
>a)  修改conf/flink.yaml配置文件,增加HA配置
>kubernetes.cluster-id: realtime-monitor
>high-availability: 
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor
>// 这是一个NFS路径,以pvc挂载到pod
>
>b)  先通过以下命令创建一个无状态部署,建立一个session集群
>
>./bin/kubernetes-session.sh \
>
>-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
>
>-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
>
>-Dkubernetes.cluster-id=realtime-monitor \
>
>-Dkubernetes.jobmanager.service-account=wuzhiheng \
>
>-Dkubernetes.namespace=monitor \
>
>-Dtaskmanager.numberOfTaskSlots=6 \
>
>-Dtaskmanager.memory.process.size=8192m \
>
>-Djobmanager.memory.process.size=2048m
>
>c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
>
>2022-08-29 23:49:04,150 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
>realtime-monitor-taskmanager-1-13 is created.
>
>2022-08-29 23:49:04,152 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
>realtime-monitor-taskmanager-1-12 is created.
>
>2022-08-29 23:49:04,161 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
>TaskManager pod: realtime-monitor-taskmanager-1-12
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requested worker realtime-monitor-taskmanager-1-12 with resource spec 
>WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
>taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
>managedMemSize=0 bytes, numSlots=6}.
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
>TaskManager pod: realtime-monitor-taskmanager-1-13
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requested worker realtime-monitor-taskmanager-1-13 with resource spec 
>WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
>taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
>managedMemSize=0 bytes, numSlots=6}.
>
>2022-08-29 23:49:07,176 WARN  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Reaching max start worker failure rate: 12 events detected in the recent 
>interval, reaching the threshold 10.00.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Will not retry creating worker in 3000 ms.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec 
>{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
>bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6} was requested in current attempt and has not registered. Current 
>pending count after removing: 1.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod 
>terminated, container termination statuses: [flink-main-container(exitCode=1, 
>reason=Error, message=null)], pod status: Failed(reason=null, message=null)
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, 
>taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, 
>networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6}, current pending count: 2.
>
>2022-08-29 23:49:07,514 WARN  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Reaching max start worker failure rate: 13 events detected in the recent 
>interval, reaching the threshold 10.00.
>
>2022-08-29 23:49:07,514 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec 
>{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
>bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6} was requested in current attempt and has not registered. Current 
>pending count after removing: 1.
>
>2022-08-29 23:49:07,514 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod 
>terminated, container termination statuses: 

【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-29 Thread Wu,Zhiheng
【问题描述】
启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务

1. 任务配置和启动过程

a)  修改conf/flink.yaml配置文件,增加HA配置
kubernetes.cluster-id: realtime-monitor
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor 
   // 这是一个NFS路径,以pvc挂载到pod

b)  先通过以下命令创建一个无状态部署,建立一个session集群

./bin/kubernetes-session.sh \

-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
 \

-Dkubernetes.pod-template-file=./conf/pod-template.yaml \

-Dkubernetes.cluster-id=realtime-monitor \

-Dkubernetes.jobmanager.service-account=wuzhiheng \

-Dkubernetes.namespace=monitor \

-Dtaskmanager.numberOfTaskSlots=6 \

-Dtaskmanager.memory.process.size=8192m \

-Djobmanager.memory.process.size=2048m

c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志

2022-08-29 23:49:04,150 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-13 is created.

2022-08-29 23:49:04,152 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-12 is created.

2022-08-29 23:49:04,161 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-12

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-12 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:04,162 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-13

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-13 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:07,176 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 12 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Will 
not retry creating worker in 3000 ms.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, 
taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6}, current pending count: 2.

2022-08-29 23:49:07,514 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 13 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,515 INFO  

Re: Re:Re: flink on k8s作业失败后如何自动释放资源?

2022-08-15 Thread zhanghao.chen
这个是符合预期的,你可以调整 flink 
的故障恢复策略来控制这个行为:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/。

Best,
Zhanghao Chen

From: casel.chen 
Sent: Tuesday, August 16, 2022 8:33
To: user-zh@flink.apache.org 
Subject: Re:Re: flink on k8s作业失败后如何自动释放资源?

native模式,发现作业失败后会自动重试几次,最后部署和pod消失

















在 2022-08-14 16:55:48,"yu'an huang"  写道:
>你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗?
>
>
>
>> On 14 Aug 2022, at 9:55 AM, casel.chen  wrote:
>>
>> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?
>


Re:Re: flink on k8s作业失败后如何自动释放资源?

2022-08-15 Thread casel.chen
native模式,发现作业失败后会自动重试几次,最后部署和pod消失

















在 2022-08-14 16:55:48,"yu'an huang"  写道:
>你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗?
>
>
>
>> On 14 Aug 2022, at 9:55 AM, casel.chen  wrote:
>> 
>> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?
>


Re: flink on k8s作业失败后如何自动释放资源?

2022-08-14 Thread yu'an huang
你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗?



> On 14 Aug 2022, at 9:55 AM, casel.chen  wrote:
> 
> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?



Re: flink on k8s作业支持弹性扩缩容吗?

2022-08-14 Thread yu'an huang
你好,

当前,Flink 支持根据现有的资源大小 (TaskManager 数量) 自动调整parallelism,但是添加或者减少 TaskManager 
则必须手动操作,比如设计一个外部的监控系统根据某些指标判断增减TaskManager数量。这种模式也只支持 standalone的部署方式,所以native 
Kubernetes也是不支持的。

你可以阅读下下面的文档了解下更多 Flink 目前关于Elastic Scaling的尝试:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/
 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/>
 
https://flink.apache.org/2021/05/06/reactive-mode.html


> On 14 Aug 2022, at 10:38 AM, casel.chen  wrote:
> 
> flink on k8s作业能否在给定资源范围内自动根据上游流量大小实现弹性扩缩容?例如增加并发度和TaskManager数量等



flink on k8s作业支持弹性扩缩容吗?

2022-08-13 Thread casel.chen
flink on k8s作业能否在给定资源范围内自动根据上游流量大小实现弹性扩缩容?例如增加并发度和TaskManager数量等

flink on k8s作业失败后如何自动释放资源?

2022-08-13 Thread casel.chen
flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?

Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-19 Thread Yang Wang
你的理解是没有问题的

之所以将FlinkSessionJob拆成单独的CR来管理,主要是因为这样也更符合K8s的语义,在Session集群内每个Job也可以作为K8s资源来管理,Job状态变化就能及时更新到Status里面


Best,
Yang

yidan zhao  于2022年7月14日周四 23:01写道:

> 再咨询下关于 flink-k8s-operator 的问题。
> 我看了看问的文档,提供了2个CRD,分别为 FlinkDeployment 和 FlinkSessionJob。不知道如下理解对不对:
> (1)对于 application-mode 方式提交运行的任务,则用 FlinkDeployment,并配置好 job 部分。 会自动创建
> flink 集群,并根据 job 配置运行job。
>  这种方式不需要考虑集群创建、任务提交的步骤,本身就是一体。
> (2)对于 session 集群的创建,也是用 FlinkDeployment ,只是不需要指定 job 配置即可。
> (3)配合通过(2)方式创建的 session 集群,则可以配合 FlinkSessionJob 提交任务。
>
> Yang Wang  于2022年7月12日周二 17:10写道:
> >
> > 如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的
> >
> > 最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]
> >
> > [1]. https://github.com/apache/flink-kubernetes-operator
> >
> > Best,
> > Yang
> >
> > yidan zhao  于2022年7月12日周二 13:17写道:
> >
> > > 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> > > 那么使用 --target kubernetes-session
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> > > cluster 的 svc 的 clusterIp 去提交呢。
> > >
> > > yidan zhao  于2022年7月12日周二 12:50写道:
> > > >
> > > > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> > > >
> > > >
> > > > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> > > >
> > > > yidan zhao  于2022年7月12日周二 12:48写道:
> > > > >
> > > > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > > > >
> > > > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > > > >
> > > > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > > > >
> > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > > > >
> > > > > > > 如下步骤参考的文档
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > >
> > > > > > > 版本:1.15
> > > > > > >
> > > > > > > (1)创建集群:
> > > > > > >
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > > (2)提交任务:
> > > > > > > ./bin/flink run \
> > > > > > > --target kubernetes-session \
> > > > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > > > >
> > > > > > > svc是ClusterIp类型
> > > > > > >
> > > > > > > 第二步提交任务环节,显示如下:
> > > > > > > Executing example with default input data.
> > > > > > > Use --input to specify file input.
> > > > > > > Printing result to stdout. Use --output to specify output path.
> > > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor
> [] -
> > > > > > > Please note that Flink client operations(e.g. cancel, list,
> stop,
> > > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > > since
> > > > > > > 'kubernetes.rest-service.exposed.type' has been set to
> ClusterIP.
> > > > > > > 2022-07-12 10:23:23,027 INFO
> > > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor
> [] -
> > > > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> > > JobManager
> > > > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081

Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-14 Thread yidan zhao
再咨询下关于 flink-k8s-operator 的问题。
我看了看问的文档,提供了2个CRD,分别为 FlinkDeployment 和 FlinkSessionJob。不知道如下理解对不对:
(1)对于 application-mode 方式提交运行的任务,则用 FlinkDeployment,并配置好 job 部分。 会自动创建
flink 集群,并根据 job 配置运行job。
 这种方式不需要考虑集群创建、任务提交的步骤,本身就是一体。
(2)对于 session 集群的创建,也是用 FlinkDeployment ,只是不需要指定 job 配置即可。
(3)配合通过(2)方式创建的 session 集群,则可以配合 FlinkSessionJob 提交任务。

Yang Wang  于2022年7月12日周二 17:10写道:
>
> 如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的
>
> 最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]
>
> [1]. https://github.com/apache/flink-kubernetes-operator
>
> Best,
> Yang
>
> yidan zhao  于2022年7月12日周二 13:17写道:
>
> > 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> > 那么使用 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> > cluster 的 svc 的 clusterIp 去提交呢。
> >
> > yidan zhao  于2022年7月12日周二 12:50写道:
> > >
> > > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> > >
> > >
> > > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> > >
> > > yidan zhao  于2022年7月12日周二 12:48写道:
> > > >
> > > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > > >
> > > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > > >
> > > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > > >
> > > > > 2022-07-12 10:23:23,021 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > > >
> > > > > > 如下步骤参考的文档
> > > > > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > >
> > > > > > 版本:1.15
> > > > > >
> > > > > > (1)创建集群:
> > > > > >
> > > > > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > > (2)提交任务:
> > > > > > ./bin/flink run \
> > > > > > --target kubernetes-session \
> > > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > > >
> > > > > > svc是ClusterIp类型
> > > > > >
> > > > > > 第二步提交任务环节,显示如下:
> > > > > > Executing example with default input data.
> > > > > > Use --input to specify file input.
> > > > > > Printing result to stdout. Use --output to specify output path.
> > > > > > 2022-07-12 10:23:23,021 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > > 2022-07-12 10:23:23,027 INFO
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> > JobManager
> > > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > > > 2022-07-12 10:23:23,044 WARN
> > > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> > since
> > > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > >
> > > > > > 
> > > > > >  The program finished with the following exception:
> > 

Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-12 Thread Yang Wang
如果你K8s集群内的机器配置的DNS Server也是coredns,那就可以正常解析clusterIP对应的service的

最初ClusterIP的设计也是让任务管理的Pod来使用,例如flink-kubernetes-operator[1]

[1]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

yidan zhao  于2022年7月12日周二 13:17写道:

> 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> 那么使用 --target kubernetes-session
> -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> cluster 的 svc 的 clusterIp 去提交呢。
>
> yidan zhao  于2022年7月12日周二 12:50写道:
> >
> > 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
> >
> >
> > 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
> >
> > yidan zhao  于2022年7月12日周二 12:48写道:
> > >
> > > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> > >
> > > Yang Wang  于2022年7月12日周二 12:07写道:
> > > >
> > > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > > 否则你就需要NodePort或者LoadBalancer的方式了
> > > >
> > > > 2022-07-12 10:23:23,021 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > yidan zhao  于2022年7月12日周二 10:40写道:
> > > >
> > > > > 如下步骤参考的文档
> > > > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > >
> > > > > 版本:1.15
> > > > >
> > > > > (1)创建集群:
> > > > >
> > > > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > > (2)提交任务:
> > > > > ./bin/flink run \
> > > > > --target kubernetes-session \
> > > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > > ./examples/streaming/TopSpeedWindowing.jar
> > > > >
> > > > > svc是ClusterIp类型
> > > > >
> > > > > 第二步提交任务环节,显示如下:
> > > > > Executing example with default input data.
> > > > > Use --input to specify file input.
> > > > > Printing result to stdout. Use --output to specify output path.
> > > > > 2022-07-12 10:23:23,021 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > > 2022-07-12 10:23:23,027 INFO
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Retrieve flink cluster my-first-flink-cluster successfully,
> JobManager
> > > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > > 2022-07-12 10:23:23,044 WARN
> > > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > > savepoint, etc.) won't work from outside the Kubernetes cluster
> since
> > > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > >
> > > > > 
> > > > >  The program finished with the following exception:
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method caused an error: Failed to execute job
> > > > > 'CarTopSpeedWindowingExample'.
> > > > > ...
> > > > > Caused by: org.apache.flink.util.FlinkException: Failed to execute
> job
> > > > > 'CarTopSpeedWindowingExample'.
> > > > > ...
> > > > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > > > Failed to submit JobGraph.
> > > > > ...
> > > > > Caused by:
> org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > > > Could not complete the operation. Number of retries has been
> > > > > exhausted.
> > > > > ...
> > > > > Caused by: java.util.concurrent.CompletionException:
> > > > > java.net.UnknownHostException: my-first-flink-cluster-rest.test:
> Name
> > > > > or service not known
> > > > > ...
> > > > > Caused by: java.net.UnknownHostException:
> > > > > my-first-flink-cluster-rest.test: Name or service not known
> > > > >
> > > > >
> > > > > 如上,根据 --target kubernetes-session
> > > > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > > >
> my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > > > >
> > > > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > > > >
>


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
那么使用 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
cluster 的 svc 的 clusterIp 去提交呢。

yidan zhao  于2022年7月12日周二 12:50写道:
>
> 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
>
>
> 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
>
> yidan zhao  于2022年7月12日周二 12:48写道:
> >
> > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> >
> > Yang Wang  于2022年7月12日周二 12:07写道:
> > >
> > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > 否则你就需要NodePort或者LoadBalancer的方式了
> > >
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > yidan zhao  于2022年7月12日周二 10:40写道:
> > >
> > > > 如下步骤参考的文档
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 版本:1.15
> > > >
> > > > (1)创建集群:
> > > >
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > (2)提交任务:
> > > > ./bin/flink run \
> > > > --target kubernetes-session \
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > ./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > svc是ClusterIp类型
> > > >
> > > > 第二步提交任务环节,显示如下:
> > > > Executing example with default input data.
> > > > Use --input to specify file input.
> > > > Printing result to stdout. Use --output to specify output path.
> > > > 2022-07-12 10:23:23,021 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > 2022-07-12 10:23:23,027 INFO
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > 2022-07-12 10:23:23,044 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > >
> > > > 
> > > >  The program finished with the following exception:
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method caused an error: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > > Failed to submit JobGraph.
> > > > ...
> > > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > > Could not complete the operation. Number of retries has been
> > > > exhausted.
> > > > ...
> > > > Caused by: java.util.concurrent.CompletionException:
> > > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > > or service not known
> > > > ...
> > > > Caused by: java.net.UnknownHostException:
> > > > my-first-flink-cluster-rest.test: Name or service not known
> > > >
> > > >
> > > > 如上,根据 --target kubernetes-session
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > > >
> > > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?


其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。

yidan zhao  于2022年7月12日周二 12:48写道:
>
> 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
>
> Yang Wang  于2022年7月12日周二 12:07写道:
> >
> > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > 否则你就需要NodePort或者LoadBalancer的方式了
> >
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> >
> > Best,
> > Yang
> >
> > yidan zhao  于2022年7月12日周二 10:40写道:
> >
> > > 如下步骤参考的文档
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > >
> > > 版本:1.15
> > >
> > > (1)创建集群:
> > >
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > (2)提交任务:
> > > ./bin/flink run \
> > > --target kubernetes-session \
> > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > ./examples/streaming/TopSpeedWindowing.jar
> > >
> > > svc是ClusterIp类型
> > >
> > > 第二步提交任务环节,显示如下:
> > > Executing example with default input data.
> > > Use --input to specify file input.
> > > Printing result to stdout. Use --output to specify output path.
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > 2022-07-12 10:23:23,027 INFO
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > 2022-07-12 10:23:23,044 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > > 
> > >  The program finished with the following exception:
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method caused an error: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > Failed to submit JobGraph.
> > > ...
> > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > Could not complete the operation. Number of retries has been
> > > exhausted.
> > > ...
> > > Caused by: java.util.concurrent.CompletionException:
> > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > or service not known
> > > ...
> > > Caused by: java.net.UnknownHostException:
> > > my-first-flink-cluster-rest.test: Name or service not known
> > >
> > >
> > > 如上,根据 --target kubernetes-session
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > >
> > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。

Yang Wang  于2022年7月12日周二 12:07写道:
>
> 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> 否则你就需要NodePort或者LoadBalancer的方式了
>
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
>
> Best,
> Yang
>
> yidan zhao  于2022年7月12日周二 10:40写道:
>
> > 如下步骤参考的文档
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> >
> > 版本:1.15
> >
> > (1)创建集群:
> >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > (2)提交任务:
> > ./bin/flink run \
> > --target kubernetes-session \
> > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > ./examples/streaming/TopSpeedWindowing.jar
> >
> > svc是ClusterIp类型
> >
> > 第二步提交任务环节,显示如下:
> > Executing example with default input data.
> > Use --input to specify file input.
> > Printing result to stdout. Use --output to specify output path.
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > 2022-07-12 10:23:23,027 INFO
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > 2022-07-12 10:23:23,044 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> > 
> >  The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > Failed to submit JobGraph.
> > ...
> > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > Could not complete the operation. Number of retries has been
> > exhausted.
> > ...
> > Caused by: java.util.concurrent.CompletionException:
> > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > or service not known
> > ...
> > Caused by: java.net.UnknownHostException:
> > my-first-flink-cluster-rest.test: Name or service not known
> >
> >
> > 如上,根据 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> >
> > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread Yang Wang
日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
否则你就需要NodePort或者LoadBalancer的方式了

2022-07-12 10:23:23,021 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.


Best,
Yang

yidan zhao  于2022年7月12日周二 10:40写道:

> 如下步骤参考的文档
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
>
> 版本:1.15
>
> (1)创建集群:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> (2)提交任务:
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=my-first-flink-cluster \
> ./examples/streaming/TopSpeedWindowing.jar
>
> svc是ClusterIp类型
>
> 第二步提交任务环节,显示如下:
> Executing example with default input data.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> 2022-07-12 10:23:23,027 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> Web Interface: http://my-first-flink-cluster-rest.test:8081
> 2022-07-12 10:23:23,044 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit JobGraph.
> ...
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been
> exhausted.
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> or service not known
> ...
> Caused by: java.net.UnknownHostException:
> my-first-flink-cluster-rest.test: Name or service not known
>
>
> 如上,根据 --target kubernetes-session
> -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
>
> 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
>


flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 Thread yidan zhao
如下步骤参考的文档 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes

版本:1.15

(1)创建集群:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
(2)提交任务:
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar

svc是ClusterIp类型

第二步提交任务环节,显示如下:
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2022-07-12 10:23:23,021 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-07-12 10:23:23,027 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Retrieve flink cluster my-first-flink-cluster successfully, JobManager
Web Interface: http://my-first-flink-cluster-rest.test:8081
2022-07-12 10:23:23,044 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.


 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Failed to execute job
'CarTopSpeedWindowingExample'.
...
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'CarTopSpeedWindowingExample'.
...
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
...
Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been
exhausted.
...
Caused by: java.util.concurrent.CompletionException:
java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
or service not known
...
Caused by: java.net.UnknownHostException:
my-first-flink-cluster-rest.test: Name or service not known


如上,根据 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。

我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。


Re: flink on k8s的application模式

2022-06-28 Thread Weihua Hu
Hi,
图片看不到了,是不是在Main 方法中调用了两次 env.execute 呢?可以提供下日志

Best,
Weihua


On Tue, Jun 28, 2022 at 8:52 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:

>
> flink版本:1.13.1
> 提交flink on k8s的application模式,提交完成发现webui的Running Job有两个,跟session模式非常像
> 截图在附件
>


flink on k8s??application????

2022-06-28 Thread ??????
flink??1.13.1
flink on k8s??application??webui??Running 
Job??session??
??

flink on k8s??application????

2022-06-28 Thread ??????
flink??1.13.1
flink on k8s??application??webui??Running 
Job??session??
??

Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 Thread shimin huang
flink版本1.13.0
/home/hdfs/flink-1.13.0/bin/flink run-application \
-t kubernetes-application \
-s spPath \
-p 32 \
-Dresourcemanager.taskmanager-timeout=6 \
-Dkubernetes.namespace=xxx \
-Dkubernetes.service-account=xxx \
-Dkubernetes.taskmanager.service-account=xxx \
-Dkubernetes.cluster-id= \
-Dkubernetes.container.image.pull-secrets= \
-Dkubernetes.rest-service.exposed.type=NodePort  \
-Dkubernetes.config.file=/cce.conf \
-Denv.java.opts="-DHADOOP_USER_NAME=hdfs" \
-Dkubernetes.pod-template-file=/home/hdfs/jars/flink-pod.yaml \
-Dkubernetes.taskmanager.cpu=1 \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dtaskmanager.numberOfTaskSlots=16 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.memory.managed.fraction=0.1 \
-Dtaskmanager.memory.network.fraction=0.1 \
-Dtaskmanager.memory.network.max=2048m \
-Dtaskmanager.memory.network.min=512m \
-Dstate.checkpoints.num-retained=20 \
-Dstate.backend.rocksdb.memory.managed=true \
-Dstate.backend.rocksdb.checkpoint.transfer.thread.num=5 \
-Dstate.backend.rocksdb.localdir=/tmp/rocksdb \
-Dstate.backend.incremental=true \
-Dclassloader.resolve-order=parent-first \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\

-Dhigh-availability.storageDir=hdfs://bmr-cluster/flink/kubernetes/ha/recovery
\
-c  \

Weihua Hu  于2022年5月17日周二 21:54写道:

> Hi, shimin
> 用的哪个版本的 Flink?提交命令是什么呢?
>
>
> Best,
> Weihua
>
> > 2022年5月17日 下午1:48,shimin huang  写道:
> >
> > flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> > 错误堆栈如下:
> > java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not
> find
> > Flink job ()
> > at
> >
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> > at
> >
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> > Could not find Flink job ()
> > at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> > at java.util.Optional.orElseGet(Optional.java:267)
> > at

Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 Thread Weihua Hu
Hi, shimin
用的哪个版本的 Flink?提交命令是什么呢?


Best,
Weihua

> 2022年5月17日 下午1:48,shimin huang  写道:
> 
> flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> 错误堆栈如下:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job ()
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> at
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> Could not find Flink job ()
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 common frames omitted
> 2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN
> o.a.f.

flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-16 Thread shimin huang
flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
错误堆栈如下:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job ()
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
at
com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
Could not find Flink job ()
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
at java.util.Optional.orElseGet(Optional.java:267)
at
org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 common frames omitted
2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN
 o.a.f.c.d.application.ApplicationDispatcherBootstrap  - Application failed
unexpectedly:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job ()
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at
java.util.concurrent.CompletableFuture.uniApplyStage

Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 Thread shimin huang
感谢 我去了解了解

huweihua  于2022年4月14日周四 20:06写道:

> 使用 NodePort 会默认使用 api server 的 host + nodeport, 预期 K8S 集群内部所有节点都会转发
> nodeport 的流量,如果无法访问,可能是你使用的 K8S 做了一些封禁,把 NodePort 的流量转发功能禁用了
>
>
> > 2022年4月14日 下午5:22,shimin huang  写道:
> >
> > 使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink
> list命令也无法获取对应
> > cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢
> >
> > shimin huang  于2022年4月14日周四 17:20写道:
> >
> >> flink version: flink 1.13.0
> >>
> >>
>
>


Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 Thread huweihua
使用 NodePort 会默认使用 api server 的 host + nodeport, 预期 K8S 集群内部所有节点都会转发 nodeport 
的流量,如果无法访问,可能是你使用的 K8S 做了一些封禁,把 NodePort 的流量转发功能禁用了


> 2022年4月14日 下午5:22,shimin huang  写道:
> 
> 使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink list命令也无法获取对应
> cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢
> 
> shimin huang  于2022年4月14日周四 17:20写道:
> 
>> flink version: flink 1.13.0
>> 
>> 



Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 Thread shimin huang
使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink list命令也无法获取对应
cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢

shimin huang  于2022年4月14日周四 17:20写道:

> flink version: flink 1.13.0
>
>


flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 Thread shimin huang
flink version: flink 1.13.0


Re: flink on k8s场景,大家一般如何解决访问hdfs的问题呢。

2022-03-30 Thread LuNing Wang
也可以使用MinIO做存储。

casel.chen  于2022年3月30日周三 12:05写道:

> 我们是直接使用云存储,像阿里云的oss,没有再搭建hadoop集群。如果flink on
> k8s的确需要访问hadoop的话,是需要打包hadoop发行包在镜像里面的,配置好core-site.xml, hdfs-site.xml等
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-03-30 12:01:54,"yidan zhao"  写道:
> >如题,是需要打包hadoop client到镜像中吗。
>


flink on k8s场景,大家一般如何解决访问hdfs的问题呢。

2022-03-29 Thread yidan zhao
如题,是需要打包hadoop client到镜像中吗。


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-29 Thread shimin huang
好的 我了解下 感谢!

yu'an huang  于2022年3月28日周一 22:12写道:

> 你好,
>
>
> 可以看看这个链接中关于usrlib的介绍(Application mode部分)。
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/docker/#docker-hub-flink-images
>
> Kubernetes不像yarn一样提供了ship文件的功能。对于Kubernetes application mode来说,用户程序是运行在Job
>
> Manager的,要求所有的artifacts都已经在镜像中存在。Flink会自动将$FLINK_HOME/usrlib目录下的文件都放入用户程序的classpath中,所以你需要按照链接中的方法,创建镜像,将你需要的artifacts提前放到镜像之中。然后在提交命令中指定主类和主类所用的JAR就可以了。
>
>
>
>
> On Mon, 28 Mar 2022 at 8:26 PM, shimin huang 
> wrote:
>
> > 1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。
> >
> > Geng Biao  于2022年3月28日周一 20:18写道:
> >
> > > Hi shimin,
> > > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
> > >
> > > Best,
> > > Biao
> > >
> > > 获取 Outlook for iOS<https://aka.ms/o0ukef>
> > > ________
> > > 发件人: shimin huang 
> > > 发送时间: Monday, March 28, 2022 8:14:28 PM
> > > 收件人: user-zh@flink.apache.org 
> > > 主题: flink on k8s是否有替代yarn.ship-files的参数
> > >
> > > flink version 1.12.0
> > >
> > > 近期在将flink on yarn迁移至flink on
> > >
> > >
> >
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> > >
> > >
> >
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
> > >
> >
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 Thread yu'an huang
你好,


可以看看这个链接中关于usrlib的介绍(Application mode部分)。
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/docker/#docker-hub-flink-images

Kubernetes不像yarn一样提供了ship文件的功能。对于Kubernetes application mode来说,用户程序是运行在Job
Manager的,要求所有的artifacts都已经在镜像中存在。Flink会自动将$FLINK_HOME/usrlib目录下的文件都放入用户程序的classpath中,所以你需要按照链接中的方法,创建镜像,将你需要的artifacts提前放到镜像之中。然后在提交命令中指定主类和主类所用的JAR就可以了。




On Mon, 28 Mar 2022 at 8:26 PM, shimin huang 
wrote:

> 1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。
>
> Geng Biao  于2022年3月28日周一 20:18写道:
>
> > Hi shimin,
> > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
> >
> > Best,
> > Biao
> >
> > 获取 Outlook for iOS<https://aka.ms/o0ukef>
> > 
> > 发件人: shimin huang 
> > 发送时间: Monday, March 28, 2022 8:14:28 PM
> > 收件人: user-zh@flink.apache.org 
> > 主题: flink on k8s是否有替代yarn.ship-files的参数
> >
> > flink version 1.12.0
> >
> > 近期在将flink on yarn迁移至flink on
> >
> >
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> >
> >
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
> >
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 Thread shimin huang
1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。

Geng Biao  于2022年3月28日周一 20:18写道:

> Hi shimin,
> 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
>
> Best,
> Biao
>
> 获取 Outlook for iOS<https://aka.ms/o0ukef>
> 
> 发件人: shimin huang 
> 发送时间: Monday, March 28, 2022 8:14:28 PM
> 收件人: user-zh@flink.apache.org 
> 主题: flink on k8s是否有替代yarn.ship-files的参数
>
> flink version 1.12.0
>
> 近期在将flink on yarn迁移至flink on
>
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
>
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 Thread shimin huang
external-resource..yarn.config-key这个配置贴错了应该是这个
external-resource..kubernetes.config-key

shimin huang  于2022年3月28日周一 20:14写道:

> flink version 1.12.0
>
> 近期在将flink on yarn迁移至flink on
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 Thread Geng Biao
Hi shimin,
外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。

Best,
Biao

获取 Outlook for iOS<https://aka.ms/o0ukef>

发件人: shimin huang 
发送时间: Monday, March 28, 2022 8:14:28 PM
收件人: user-zh@flink.apache.org 
主题: flink on k8s是否有替代yarn.ship-files的参数

flink version 1.12.0

近期在将flink on yarn迁移至flink on
k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗


flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 Thread shimin huang
flink version 1.12.0

近期在将flink on yarn迁移至flink on
k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗


Re: AWS Kinesis Flink vs K8s

2022-03-03 Thread Puneet Duggal
Hi Jeremy,

Thank you for this detailed answer and yes this surely helps.. 

Regards,
Puneet

> On 16-Feb-2022, at 9:21 PM, Ber, Jeremy  wrote:
> 
> Hi Puneet,
> Amazon Kinesis Data Analytics for Apache Flink is a managed Apache Flink 
> offering--it removes the need to setup your own checkpointing and 
> snapshotting by providing a built-in mechanism for both.
> KDA Flink runs in session mode but only allows 1 application per cluster. We 
> also offer Kinesis Data Analytics Studio, which is an Apache Zeppelin based 
> notebook development experience for Apache Flink. This feature runs in 
> session mode as well, but allows for multiple jobs per cluster. The Job 
> Manager runs in high availability mode using Zookeeper across multiple 
> Availability Zones. We use Amazon EKS behind the scenes to recover in the 
> event of pod failures.
> You can read more about this 
> here:https://docs.aws.amazon.com/kinesisanalytics/latest/java/disaster-recovery-resiliency.html
>  
> <https://docs.aws.amazon.com/kinesisanalytics/latest/java/disaster-recovery-resiliency.html>
> If you decide to run your workloads on K8s, you would be responsible for 
> implementing the above features, plus monitoring and deployments would 
> require additional lift. The benefit of using K8s yourself would be access to 
> the flink-conf and full control over cluster instance types and configuration 
> which you do not have in KDA today.
>  
> Hope this helps,
> Jeremy
>  
>  
>  
>  
> From: Danny Cranmer mailto:dannycran...@apache.org>>
> Date: Wednesday, February 16, 2022 at 9:27 AM
> To: Puneet Duggal  <mailto:puneetduggal1...@gmail.com>>, "Ber, Jeremy"  <mailto:jd...@amazon.com>>
> Cc: user mailto:user@flink.apache.org>>
> Subject: RE: [EXTERNAL] AWS Kinesis Flink vs K8s
>  
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
>  
> +Jeremy who can help answer this question.
>  
> Thanks,
>  
> On Wed, Feb 16, 2022 at 10:26 AM Puneet Duggal  <mailto:puneetduggal1...@gmail.com>> wrote:
> Hi,
> 
> Just wanted to ask the community various pros and cons of deploying flink 
> using AWS Kinesis vs using K8s application mode. Currently we are deploying 
> flink cluster in HA session standalone mode and planning to switch to 
> application deployment mode.
> 
> Regards, 
> Puneet



Flink On K8s ???????? SSL????????

2022-03-02 Thread hjw
K8sk8s~/.kube/config.SSL??kubectl??k8s
 (kubectl get pod -n namespace ??)??

Flink: 1.13.6
~/.kube/config

apiVersion:v1
kind:config
cluster:
-name: "yf-dev-cluster1"
cluster:
server: "https://in-acpmanager.test.yfzx.cn/k8s/clusters/c-t5h2t;
certificate-authority-data : ??"

:
2022-03-02 18:59:30 | OkHttp 
https://in-acpmanager.test.yfzx.cn/...io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener
   Exec Failure javax.net.ssl.SSLPeerUnverifiedException Hostname 
in-acpmanager.test.yfzx.cn not verified: certificate: 
sha256/cw2T2s+Swhl7z+H35/3C1dTLxL26OOMO5VoEN9kAZCA= DN: 
CN=in-acpmanager.test.yfzx.cn subjectAltNames: [] 
io.fabric8.kubernetes.client.KubernetesClientException: Failed to start 
websocket at 
io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onFailure(WatcherWebSocketListener.java:77)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onFailure(RealWebSocket.java:216)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:180)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Suppressed: 
java.lang.Throwable: waiting here at 
io.fabric8.kubernetes.client.utils.Utils.waitUntilReady(Utils.java:164) 
at 
io.fabric8.kubernetes.client.utils.Utils.waitUntilReadyOrFail(Utils.java:175)   
  at 
io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.waitUntilReady(WatcherWebSocketListener.java:120)
 at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.waitUntilReady(WatchConnectionManager.java:82)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:705)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:678)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:

Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname 
in-acpmanager.test.yfzx.cn not verified: certificate: 
sha256/cw2T2s+Swhl7z+H35/3C1dTLxL26OOMO5VoEN9kAZCA= DN: 
CN=in-acpmanager.test.yfzx.cn subjectAltNames: [] at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:350)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:300)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:185)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
 at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
 at org.apache.flink.kubernetes.shade

Re: AWS Kinesis Flink vs K8s

2022-02-16 Thread Danny Cranmer
+Jeremy who can help answer this question.

Thanks,

On Wed, Feb 16, 2022 at 10:26 AM Puneet Duggal 
wrote:

> Hi,
>
> Just wanted to ask the community various pros and cons of deploying flink
> using AWS Kinesis vs using K8s application mode. Currently we are deploying
> flink cluster in HA session standalone mode and planning to switch to
> application deployment mode.
>
> Regards,
> Puneet


AWS Kinesis Flink vs K8s

2022-02-16 Thread Puneet Duggal
Hi,

Just wanted to ask the community various pros and cons of deploying flink using 
AWS Kinesis vs using K8s application mode. Currently we are deploying flink 
cluster in HA session standalone mode and planning to switch to application 
deployment mode.

Regards, 
Puneet

Re: Flink native k8s integration vs. operator

2022-01-20 Thread Alexis Sarda-Espinosa
Hi Robert,

I agree with you, I mean that's why I was writing a K8s operator, but the 
restriction wasn't decided by me, it was imposed on me. I guess my thinking was 
rather that an operator wouldn't necessarily supersede standalone+reactive, at 
least not in my case, but that certainly doesn't mean an operator is a bad 
idea, it's just something that other users might want to keep in mind.

Regards,
Alexis.


From: Robert Metzger 
Sent: Thursday, January 20, 2022 7:06 PM
To: Alexis Sarda-Espinosa 
Cc: dev ; user 
Subject: Re: Flink native k8s integration vs. operator

Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…

I don't think this restriction applies to many organizations. K8s operators are 
the de facto standard for deploying all kinds of software. There are quite many 
projects that used to just have a Helm chart, that are now switching over to 
provide operators, because they provide a much better experience.
If you have more specifics on this concern that is relevant for the Flink 
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.

This requirement is not strictly needed to deploy Flink on K8s. Only with the 
native K8s integration of Flink, you need to give the Flink JVM a role that 
allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:

Hi everyone,



Since I see this is getting some traction, I’d like to add a couple things. I 
had been developing a Kubernetes controller for Flink as a Proof of Concept at 
my company; I called it Flork because it was to be a Flink Orchestrator for 
Kubernetes. In the end, we will most likely not use this controller due to 
security concerns that were communicated to me. These concerns stem from the 
fact that our product would be used by customers in their own Kubernetes 
clusters, and many customers don’t want:



- The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…



- Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.



I mention these in case they prove to be relevant for others in the current 
context. For us, it means we have to stick with something like standalone 
Kubernetes + reactive/adaptive.



Nevertheless, the PoC I had was already functional and, while I would have to 
request permission to contribute the code to the community, it might be useful 
for these efforts. However, I’d first ask if there is actually interest in this 
code, considering these are some of the “features” it currently has:



* The CRD relies on the Pod Template support included in Flink itself. As such, 
some of the fields in the CRD are “vanilla” pod specs, and the schema reflects 
that because it embeds a flattened version of the schema from [1]. I’d also 
have a basic Helm chart ready.



* The code is written in a mixture of Java and Kotlin, and is built with 
Gradle. I made heavy use of Kotlin Coroutines to implement some of the core 
logic in a non-blocking way.



* The code already supports High Availability by leveraging Kubernetes leases 
and the corresponding helpers in Fabric8’s client.



* The main deployment logic is delegated to Flink’s own flink-kubernetes module 
[2]. Nevertheless, my build shadows all the fabric8 classes and service 
definitions embedded in said module, so that the rest of the code can use other 
kubernetes-client versions independently.



* The controller handles savepoint creation for redeployments after CR changes, 
e.g. upgrades. This would also work after controller fail-over with/without HA.



* The code supports some extension for custom container images: classes defined 
in META-INF/services/ files are called as decorators for Flink’s conf file 
and/or the pod specs defined in the CR, and they could be copied to the image 
on top of an official base version.



* A deployment mode without CRD could be supported --- I have some code that 
can run on top of the core controller and allows “embedding” a CR in a Config 
Map key. The translation between the CM and the core controller code is then 
done transparently.



* I have a module that integrates th

Re: Flink native k8s integration vs. operator

2022-01-20 Thread Robert Metzger
Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to
> me was that such resources are global (for a given cluster) and that is not
> desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…


I don't think this restriction applies to many organizations. K8s operators
are the de facto standard for deploying all kinds of software. There are
quite many projects that used to just have a Helm chart, that are now
switching over to provide operators, because they provide a much better
experience.
If you have more specifics on this concern that is relevant for the Flink
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.


This requirement is not strictly needed to deploy Flink on K8s. Only with
the native K8s integration of Flink, you need to give the Flink JVM a role
that allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi everyone,
>
>
>
> Since I see this is getting some traction, I’d like to add a couple
> things. I had been developing a Kubernetes controller for Flink as a Proof
> of Concept at my company; I called it Flork because it was to be a Flink
> Orchestrator for Kubernetes. In the end, we will most likely not use this
> controller due to security concerns that were communicated to me. These
> concerns stem from the fact that our product would be used by customers in
> their own Kubernetes clusters, and many customers don’t want:
>
>
>
> - The usage of Custom Resource Definitions (CRDs). The main reason given
> to me was that such resources are global (for a given cluster) and that is
> not desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…
>
>
>
> - Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.
>
>
>
> I mention these in case they prove to be relevant for others in the
> current context. For us, it means we have to stick with something like
> standalone Kubernetes + reactive/adaptive.
>
>
>
> Nevertheless, the PoC I had was already functional and, while I would have
> to request permission to contribute the code to the community, it might be
> useful for these efforts. However, I’d first ask if there is actually
> interest in this code, considering these are some of the “features” it
> currently has:
>
>
>
> * The CRD relies on the Pod Template support included in Flink itself. As
> such, some of the fields in the CRD are “vanilla” pod specs, and the schema
> reflects that because it embeds a flattened version of the schema from [1].
> I’d also have a basic Helm chart ready.
>
>
>
> * The code is written in a mixture of Java and Kotlin, and is built with
> Gradle. I made heavy use of Kotlin Coroutines to implement some of the core
> logic in a non-blocking way.
>
>
>
> * The code already supports High Availability by leveraging Kubernetes
> leases and the corresponding helpers in Fabric8’s client.
>
>
>
> * The main deployment logic is delegated to Flink’s own flink-kubernetes
> module [2]. Nevertheless, my build shadows all the fabric8 classes and
> service definitions embedded in said module, so that the rest of the code
> can use other kubernetes-client versions independently.
>
>
>
> * The controller handles savepoint creation for redeployments after CR
> changes, e.g. upgrades. This would also work after controller fail-over
> with/without HA.
>
>
>
> * The code supports some extension for custom container images: classes
> defined in META-INF/services/ files are called as decorators for Flink’s
> conf file and/or the pod specs defined in the CR, and they could be copied
> to the image on top of an official base version.
>
>
>
> * A deployment mode without CRD could be supported --- I have some code
> that can run on top of the core controller and allows “embedding” a CR in a
> Config Map key. The translation between the CM and the core controller code
> is then done transparently.
>
>
>
> * I have a module that integrates the code with Inversion of Control
> containers such as Spring. I only used javax annotations (soon to be
> jakarta), so it’s not tied to Spring.
>
>
>
> Something I haven’t considered at all in my code is ingress fo

Re: Flink native k8s integration vs. operator

2022-01-17 Thread Gyula Fóra
Hi Yang!

Thanks for the input!

I agree with you on both points that you made. Even if we might support
both standalone and native modes in the long run, we should probably build
the first version on top of the native integration.
This I feel will result in a much simpler, minimalistic first version that
will already support the most important features. We are familiar with your
PoC implementation and I think it's a great idea to use that as a base.

As for Java / Go, I think Java is the obvious choice here. I would have to
think very hard to make any good arguments for picking Go :)

Cheers,
Gyula



On Mon, Jan 17, 2022 at 10:30 AM Yang Wang  wrote:

>  Glad to see that the interest of this thread keeps going. And thanks
> Thomas, Gyula, and Marton for driving this effort.
>
> I want to share my two cents about the Flink K8s operator.
>
> > Standalone deployment VS native K8s integration
>
> There is already some feature requirement issue[1] for the existing
> GoogleCloudPlatform/flink-on-k8s-operator to support native K8s
> integration. So I think
> it will be great if the new introduced K8s operator could support native
> K8s mode. I could imagine some advantages for using native mode. e.g.
> dynamic allocation,
> stability improvement, etc.
>
> Compared with standalone + reactive mode, the native K8s could not
> integrate with auto-scaling(allocate/remove TaskManager pods based on
> metrics) well.
> Since the reconcile behavior for standalone and native K8s mode will be
> different, I am not sure whether we will support them both at the very
> beginning.
>
>
> > Go VS Java
>
> Although most of the K8s operators are developed in Go, which could benefit
> from the prosperous ecosystem and various tools. I lean to develop the K8s
> operator under Flink umbrella using Java.
> Then the Flink contributors will be easier to get involved. We could use
> the same Kubernetes Java client with Flink. When Flink exposes some public
> deployment interfaces(e.g. ApplicationDeployer)
> in the future, the K8s operator will also benefit a lot from this.
>
> I already have a simple PoC project of this implementation[2]. Hope you
> could get some inspirations from this.
>
>
> [1].
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/168
> [2]. https://github.com/wangyang0918/flink-native-k8s-operator
>
>
> Best,
> Yang
>
>
>
> Xintong Song  于2022年1月14日周五 15:47写道:
>
> > Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.
> >
> > Looking forward to the public discussion. Please feel free to reach out
> if
> > there's anything you need from us.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang <
> chenyazhangche...@gmail.com>
> > wrote:
> >
> >> Thanks Thomas, Gyula, and Marton for driving this effort! It would
> >> greatly ease the adoption of Apache Flink on Kubernetes and help to
> address
> >> the current operational pain points as mentioned. Look forward to the
> >> proposal and more discussions!
> >>
> >> Best,
> >> Chenya
> >>
> >> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi <
> balassi.mar...@gmail.com>
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> I am pleased to see the level of enthusiasm and technical consideration
> >>> already emerging in this thread. I wholeheartedly support building an
> >>> operator and endorsing it via placing it under the Apache Flink
> umbrella
> >>> (as a separate repository) as the current lack of it is clearly
> becoming
> >>> an
> >>> adoption bottleneck for large scale Flink users. The next logical step
> is
> >>> to write a FLIP to agree on the technical details, so that we can put
> >>> forward the proposal to the Flink PMC for creating a new repository
> with
> >>> a
> >>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
> >>> initial wording on the proposal which we will put up for public
> >>> discussion
> >>> in the coming weeks.
> >>>
> >>> Best,
> >>> Marton
> >>>
> >>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
> >>> wrote:
> >>>
> >>> > Hi Thomas,
> >>> >
> >>> > Yes, I was referring to a separate repository under Apache Flink.
> >>> >
> >>> > Cheers,
> >>> >
> >>> > Konstantin
> >>> >
> >>> &g

Re: Flink native k8s integration vs. operator

2022-01-17 Thread Yang Wang
 Glad to see that the interest of this thread keeps going. And thanks
Thomas, Gyula, and Marton for driving this effort.

I want to share my two cents about the Flink K8s operator.

> Standalone deployment VS native K8s integration

There is already some feature requirement issue[1] for the existing
GoogleCloudPlatform/flink-on-k8s-operator to support native K8s
integration. So I think
it will be great if the new introduced K8s operator could support native
K8s mode. I could imagine some advantages for using native mode. e.g.
dynamic allocation,
stability improvement, etc.

Compared with standalone + reactive mode, the native K8s could not
integrate with auto-scaling(allocate/remove TaskManager pods based on
metrics) well.
Since the reconcile behavior for standalone and native K8s mode will be
different, I am not sure whether we will support them both at the very
beginning.


> Go VS Java

Although most of the K8s operators are developed in Go, which could benefit
from the prosperous ecosystem and various tools. I lean to develop the K8s
operator under Flink umbrella using Java.
Then the Flink contributors will be easier to get involved. We could use
the same Kubernetes Java client with Flink. When Flink exposes some public
deployment interfaces(e.g. ApplicationDeployer)
in the future, the K8s operator will also benefit a lot from this.

I already have a simple PoC project of this implementation[2]. Hope you
could get some inspirations from this.


[1]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/168
[2]. https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang



Xintong Song  于2022年1月14日周五 15:47写道:

> Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.
>
> Looking forward to the public discussion. Please feel free to reach out if
> there's anything you need from us.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang 
> wrote:
>
>> Thanks Thomas, Gyula, and Marton for driving this effort! It would
>> greatly ease the adoption of Apache Flink on Kubernetes and help to address
>> the current operational pain points as mentioned. Look forward to the
>> proposal and more discussions!
>>
>> Best,
>> Chenya
>>
>> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am pleased to see the level of enthusiasm and technical consideration
>>> already emerging in this thread. I wholeheartedly support building an
>>> operator and endorsing it via placing it under the Apache Flink umbrella
>>> (as a separate repository) as the current lack of it is clearly becoming
>>> an
>>> adoption bottleneck for large scale Flink users. The next logical step is
>>> to write a FLIP to agree on the technical details, so that we can put
>>> forward the proposal to the Flink PMC for creating a new repository with
>>> a
>>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
>>> initial wording on the proposal which we will put up for public
>>> discussion
>>> in the coming weeks.
>>>
>>> Best,
>>> Marton
>>>
>>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
>>> wrote:
>>>
>>> > Hi Thomas,
>>> >
>>> > Yes, I was referring to a separate repository under Apache Flink.
>>> >
>>> > Cheers,
>>> >
>>> > Konstantin
>>> >
>>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>>> >
>>> >> Hi everyone,
>>> >>
>>> >> Thanks for the feedback and discussion. A few additional thoughts:
>>> >>
>>> >> [Konstantin] > With respect to common lifecycle management operations:
>>> >> these features are
>>> >> > not available (within Apache Flink) for any of the other resource
>>> >> providers
>>> >> > (YARN, Standalone) either. From this perspective, I wouldn't
>>> consider
>>> >> this
>>> >> > a shortcoming of the Kubernetes integration.
>>> >>
>>> >> I think time and evolution of the ecosystem are factors to consider as
>>> >> well. The state and usage of Flink was much different when YARN
>>> >> integration was novel. Expectations are different today and the
>>> >> lifecycle functionality provided by an operator may as well be
>>> >> considered essential to support the concept of a Flink application on
>>> >> k8s. After few years learning from operator experience outside of
>>> 

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Xintong Song
Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.

Looking forward to the public discussion. Please feel free to reach out if
there's anything you need from us.

Thank you~

Xintong Song



On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang 
wrote:

> Thanks Thomas, Gyula, and Marton for driving this effort! It would greatly
> ease the adoption of Apache Flink on Kubernetes and help to address the
> current operational pain points as mentioned. Look forward to the proposal
> and more discussions!
>
> Best,
> Chenya
>
> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
> wrote:
>
>> Hi All,
>>
>> I am pleased to see the level of enthusiasm and technical consideration
>> already emerging in this thread. I wholeheartedly support building an
>> operator and endorsing it via placing it under the Apache Flink umbrella
>> (as a separate repository) as the current lack of it is clearly becoming
>> an
>> adoption bottleneck for large scale Flink users. The next logical step is
>> to write a FLIP to agree on the technical details, so that we can put
>> forward the proposal to the Flink PMC for creating a new repository with a
>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
>> initial wording on the proposal which we will put up for public discussion
>> in the coming weeks.
>>
>> Best,
>> Marton
>>
>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
>> wrote:
>>
>> > Hi Thomas,
>> >
>> > Yes, I was referring to a separate repository under Apache Flink.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> Thanks for the feedback and discussion. A few additional thoughts:
>> >>
>> >> [Konstantin] > With respect to common lifecycle management operations:
>> >> these features are
>> >> > not available (within Apache Flink) for any of the other resource
>> >> providers
>> >> > (YARN, Standalone) either. From this perspective, I wouldn't consider
>> >> this
>> >> > a shortcoming of the Kubernetes integration.
>> >>
>> >> I think time and evolution of the ecosystem are factors to consider as
>> >> well. The state and usage of Flink was much different when YARN
>> >> integration was novel. Expectations are different today and the
>> >> lifecycle functionality provided by an operator may as well be
>> >> considered essential to support the concept of a Flink application on
>> >> k8s. After few years learning from operator experience outside of
>> >> Flink it might be a good time to fill the gap.
>> >>
>> >> [Konstantin] > I still believe that we should keep this focus on low
>> >> > level composable building blocks (like Jobs and Snapshots) in Apache
>> >> Flink
>> >> > to make it easy for everyone to build fitting higher level
>> abstractions
>> >> > like a FlinkApplication Custom Resource on top of it.
>> >>
>> >> I completely agree that it is important that the basic functions of
>> >> Flink are solid and continued focus is necessary. Thanks for sharing
>> >> the pointers, these are great improvements. At the same time,
>> >> ecosystem, contributor base and user spectrum are growing. There have
>> >> been significant additions in many areas of Flink including connectors
>> >> and higher level abstractions like statefun, SQL and Python. It's also
>> >> evident from additional repositories/subprojects that we have in Flink
>> >> today.
>> >>
>> >> [Konstantin] > Having said this, if others in the community have the
>> >> capacity to push and
>> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
>> Apache
>> >> > Flink, I don't see any blockers. If or when this happens, I'd see
>> some
>> >> > clear benefits of using a separate repository (easier independent
>> >> > versioning and releases, different build system & tooling (go, I
>> >> assume)).
>> >>
>> >> Naturally different contributors to the project have different focus.
>> >> Let's find out if there is strong enough interest to take this on and
>> >> strong enough commitment to maintain. As I see it, there is a
>> >> tremendous amount of internal investment goi

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Chenya Zhang
Thanks Thomas, Gyula, and Marton for driving this effort! It would greatly
ease the adoption of Apache Flink on Kubernetes and help to address the
current operational pain points as mentioned. Look forward to the proposal
and more discussions!

Best,
Chenya

On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
wrote:

> Hi All,
>
> I am pleased to see the level of enthusiasm and technical consideration
> already emerging in this thread. I wholeheartedly support building an
> operator and endorsing it via placing it under the Apache Flink umbrella
> (as a separate repository) as the current lack of it is clearly becoming an
> adoption bottleneck for large scale Flink users. The next logical step is
> to write a FLIP to agree on the technical details, so that we can put
> forward the proposal to the Flink PMC for creating a new repository with a
> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
> initial wording on the proposal which we will put up for public discussion
> in the coming weeks.
>
> Best,
> Marton
>
> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
> wrote:
>
> > Hi Thomas,
> >
> > Yes, I was referring to a separate repository under Apache Flink.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
> >
> >> Hi everyone,
> >>
> >> Thanks for the feedback and discussion. A few additional thoughts:
> >>
> >> [Konstantin] > With respect to common lifecycle management operations:
> >> these features are
> >> > not available (within Apache Flink) for any of the other resource
> >> providers
> >> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> >> this
> >> > a shortcoming of the Kubernetes integration.
> >>
> >> I think time and evolution of the ecosystem are factors to consider as
> >> well. The state and usage of Flink was much different when YARN
> >> integration was novel. Expectations are different today and the
> >> lifecycle functionality provided by an operator may as well be
> >> considered essential to support the concept of a Flink application on
> >> k8s. After few years learning from operator experience outside of
> >> Flink it might be a good time to fill the gap.
> >>
> >> [Konstantin] > I still believe that we should keep this focus on low
> >> > level composable building blocks (like Jobs and Snapshots) in Apache
> >> Flink
> >> > to make it easy for everyone to build fitting higher level
> abstractions
> >> > like a FlinkApplication Custom Resource on top of it.
> >>
> >> I completely agree that it is important that the basic functions of
> >> Flink are solid and continued focus is necessary. Thanks for sharing
> >> the pointers, these are great improvements. At the same time,
> >> ecosystem, contributor base and user spectrum are growing. There have
> >> been significant additions in many areas of Flink including connectors
> >> and higher level abstractions like statefun, SQL and Python. It's also
> >> evident from additional repositories/subprojects that we have in Flink
> >> today.
> >>
> >> [Konstantin] > Having said this, if others in the community have the
> >> capacity to push and
> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
> Apache
> >> > Flink, I don't see any blockers. If or when this happens, I'd see some
> >> > clear benefits of using a separate repository (easier independent
> >> > versioning and releases, different build system & tooling (go, I
> >> assume)).
> >>
> >> Naturally different contributors to the project have different focus.
> >> Let's find out if there is strong enough interest to take this on and
> >> strong enough commitment to maintain. As I see it, there is a
> >> tremendous amount of internal investment going into operationalizing
> >> Flink within many companies. Improvements to the operational side of
> >> Flink like the operator would complement Flink nicely. I assume that
> >> you are referring to a separate repository within Apache Flink, which
> >> would give it the chance to achieve better sustainability than the
> >> existing external operator efforts. There is also the fact that some
> >> organizations which are heavily invested in operationalizing Flink are
> >> allowing contributing to Apache Flink itself but less so to arbitrary
> >> github projects. Regardin

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Márton Balassi
Hi All,

I am pleased to see the level of enthusiasm and technical consideration
already emerging in this thread. I wholeheartedly support building an
operator and endorsing it via placing it under the Apache Flink umbrella
(as a separate repository) as the current lack of it is clearly becoming an
adoption bottleneck for large scale Flink users. The next logical step is
to write a FLIP to agree on the technical details, so that we can put
forward the proposal to the Flink PMC for creating a new repository with a
clear purpose in mind. I volunteer to work with Thomas and Gyula on the
initial wording on the proposal which we will put up for public discussion
in the coming weeks.

Best,
Marton

On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf  wrote:

> Hi Thomas,
>
> Yes, I was referring to a separate repository under Apache Flink.
>
> Cheers,
>
> Konstantin
>
> On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>
>> Hi everyone,
>>
>> Thanks for the feedback and discussion. A few additional thoughts:
>>
>> [Konstantin] > With respect to common lifecycle management operations:
>> these features are
>> > not available (within Apache Flink) for any of the other resource
>> providers
>> > (YARN, Standalone) either. From this perspective, I wouldn't consider
>> this
>> > a shortcoming of the Kubernetes integration.
>>
>> I think time and evolution of the ecosystem are factors to consider as
>> well. The state and usage of Flink was much different when YARN
>> integration was novel. Expectations are different today and the
>> lifecycle functionality provided by an operator may as well be
>> considered essential to support the concept of a Flink application on
>> k8s. After few years learning from operator experience outside of
>> Flink it might be a good time to fill the gap.
>>
>> [Konstantin] > I still believe that we should keep this focus on low
>> > level composable building blocks (like Jobs and Snapshots) in Apache
>> Flink
>> > to make it easy for everyone to build fitting higher level abstractions
>> > like a FlinkApplication Custom Resource on top of it.
>>
>> I completely agree that it is important that the basic functions of
>> Flink are solid and continued focus is necessary. Thanks for sharing
>> the pointers, these are great improvements. At the same time,
>> ecosystem, contributor base and user spectrum are growing. There have
>> been significant additions in many areas of Flink including connectors
>> and higher level abstractions like statefun, SQL and Python. It's also
>> evident from additional repositories/subprojects that we have in Flink
>> today.
>>
>> [Konstantin] > Having said this, if others in the community have the
>> capacity to push and
>> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
>> > Flink, I don't see any blockers. If or when this happens, I'd see some
>> > clear benefits of using a separate repository (easier independent
>> > versioning and releases, different build system & tooling (go, I
>> assume)).
>>
>> Naturally different contributors to the project have different focus.
>> Let's find out if there is strong enough interest to take this on and
>> strong enough commitment to maintain. As I see it, there is a
>> tremendous amount of internal investment going into operationalizing
>> Flink within many companies. Improvements to the operational side of
>> Flink like the operator would complement Flink nicely. I assume that
>> you are referring to a separate repository within Apache Flink, which
>> would give it the chance to achieve better sustainability than the
>> existing external operator efforts. There is also the fact that some
>> organizations which are heavily invested in operationalizing Flink are
>> allowing contributing to Apache Flink itself but less so to arbitrary
>> github projects. Regarding the tooling, it could well turn out that
>> Java is a good alternative given the ecosystem focus and that there is
>> an opportunity for reuse in certain aspects (metrics, logging etc.).
>>
>> [Yang] > I think Xintong has given a strong point why we introduced
>> the native K8s integration, which is active resource management.
>> > I have a concrete example for this in the production. When a K8s node
>> is down, the standalone K8s deployment will take longer
>> > recovery time based on the K8s eviction time(IIRC, default is 5
>> minutes). For the native K8s integration, Flink RM could be aware of the
>> > TM heartbeat lost and allocate a new one timely.
>>

Re: Flink native k8s integration vs. operator

2022-01-13 Thread Konstantin Knauf
Hi Thomas,

Yes, I was referring to a separate repository under Apache Flink.

Cheers,

Konstantin

On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:

> Hi everyone,
>
> Thanks for the feedback and discussion. A few additional thoughts:
>
> [Konstantin] > With respect to common lifecycle management operations:
> these features are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> this
> > a shortcoming of the Kubernetes integration.
>
> I think time and evolution of the ecosystem are factors to consider as
> well. The state and usage of Flink was much different when YARN
> integration was novel. Expectations are different today and the
> lifecycle functionality provided by an operator may as well be
> considered essential to support the concept of a Flink application on
> k8s. After few years learning from operator experience outside of
> Flink it might be a good time to fill the gap.
>
> [Konstantin] > I still believe that we should keep this focus on low
> > level composable building blocks (like Jobs and Snapshots) in Apache
> Flink
> > to make it easy for everyone to build fitting higher level abstractions
> > like a FlinkApplication Custom Resource on top of it.
>
> I completely agree that it is important that the basic functions of
> Flink are solid and continued focus is necessary. Thanks for sharing
> the pointers, these are great improvements. At the same time,
> ecosystem, contributor base and user spectrum are growing. There have
> been significant additions in many areas of Flink including connectors
> and higher level abstractions like statefun, SQL and Python. It's also
> evident from additional repositories/subprojects that we have in Flink
> today.
>
> [Konstantin] > Having said this, if others in the community have the
> capacity to push and
> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
> > Flink, I don't see any blockers. If or when this happens, I'd see some
> > clear benefits of using a separate repository (easier independent
> > versioning and releases, different build system & tooling (go, I
> assume)).
>
> Naturally different contributors to the project have different focus.
> Let's find out if there is strong enough interest to take this on and
> strong enough commitment to maintain. As I see it, there is a
> tremendous amount of internal investment going into operationalizing
> Flink within many companies. Improvements to the operational side of
> Flink like the operator would complement Flink nicely. I assume that
> you are referring to a separate repository within Apache Flink, which
> would give it the chance to achieve better sustainability than the
> existing external operator efforts. There is also the fact that some
> organizations which are heavily invested in operationalizing Flink are
> allowing contributing to Apache Flink itself but less so to arbitrary
> github projects. Regarding the tooling, it could well turn out that
> Java is a good alternative given the ecosystem focus and that there is
> an opportunity for reuse in certain aspects (metrics, logging etc.).
>
> [Yang] > I think Xintong has given a strong point why we introduced
> the native K8s integration, which is active resource management.
> > I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> > recovery time based on the K8s eviction time(IIRC, default is 5
> minutes). For the native K8s integration, Flink RM could be aware of the
> > TM heartbeat lost and allocate a new one timely.
>
> Thanks for sharing this, we should evaluate it as part of a proposal.
> If we can optimize recovery or scaling with active resource management
> then perhaps it is worth to support it through the operator.
> Previously mentioned operators all rely on the standalone model.
>
> Cheers,
> Thomas
>
> On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf 
> wrote:
> >
> > cc dev@
> >
> > Hi Thomas, Hi everyone,
> >
> > Thank you for starting this discussion and sorry for chiming in late.
> >
> > I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
> > Integration", in particular, it does actually not integrate well with the
> > Kubernetes ecosystem despite being called "native" (tooling, security
> > concerns).
> >
> > With respect to common lifecycle management operations: these features
> are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone

Re: Flink native k8s integration vs. operator

2022-01-12 Thread Thomas Weise
Hi everyone,

Thanks for the feedback and discussion. A few additional thoughts:

[Konstantin] > With respect to common lifecycle management operations:
these features are
> not available (within Apache Flink) for any of the other resource providers
> (YARN, Standalone) either. From this perspective, I wouldn't consider this
> a shortcoming of the Kubernetes integration.

I think time and evolution of the ecosystem are factors to consider as
well. The state and usage of Flink was much different when YARN
integration was novel. Expectations are different today and the
lifecycle functionality provided by an operator may as well be
considered essential to support the concept of a Flink application on
k8s. After few years learning from operator experience outside of
Flink it might be a good time to fill the gap.

[Konstantin] > I still believe that we should keep this focus on low
> level composable building blocks (like Jobs and Snapshots) in Apache Flink
> to make it easy for everyone to build fitting higher level abstractions
> like a FlinkApplication Custom Resource on top of it.

I completely agree that it is important that the basic functions of
Flink are solid and continued focus is necessary. Thanks for sharing
the pointers, these are great improvements. At the same time,
ecosystem, contributor base and user spectrum are growing. There have
been significant additions in many areas of Flink including connectors
and higher level abstractions like statefun, SQL and Python. It's also
evident from additional repositories/subprojects that we have in Flink
today.

[Konstantin] > Having said this, if others in the community have the
capacity to push and
> *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
> Flink, I don't see any blockers. If or when this happens, I'd see some
> clear benefits of using a separate repository (easier independent
> versioning and releases, different build system & tooling (go, I assume)).

Naturally different contributors to the project have different focus.
Let's find out if there is strong enough interest to take this on and
strong enough commitment to maintain. As I see it, there is a
tremendous amount of internal investment going into operationalizing
Flink within many companies. Improvements to the operational side of
Flink like the operator would complement Flink nicely. I assume that
you are referring to a separate repository within Apache Flink, which
would give it the chance to achieve better sustainability than the
existing external operator efforts. There is also the fact that some
organizations which are heavily invested in operationalizing Flink are
allowing contributing to Apache Flink itself but less so to arbitrary
github projects. Regarding the tooling, it could well turn out that
Java is a good alternative given the ecosystem focus and that there is
an opportunity for reuse in certain aspects (metrics, logging etc.).

[Yang] > I think Xintong has given a strong point why we introduced
the native K8s integration, which is active resource management.
> I have a concrete example for this in the production. When a K8s node is 
> down, the standalone K8s deployment will take longer
> recovery time based on the K8s eviction time(IIRC, default is 5 minutes). For 
> the native K8s integration, Flink RM could be aware of the
> TM heartbeat lost and allocate a new one timely.

Thanks for sharing this, we should evaluate it as part of a proposal.
If we can optimize recovery or scaling with active resource management
then perhaps it is worth to support it through the operator.
Previously mentioned operators all rely on the standalone model.

Cheers,
Thomas

On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf  wrote:
>
> cc dev@
>
> Hi Thomas, Hi everyone,
>
> Thank you for starting this discussion and sorry for chiming in late.
>
> I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
> Integration", in particular, it does actually not integrate well with the
> Kubernetes ecosystem despite being called "native" (tooling, security
> concerns).
>
> With respect to common lifecycle management operations: these features are
> not available (within Apache Flink) for any of the other resource providers
> (YARN, Standalone) either. From this perspective, I wouldn't consider this
> a shortcoming of the Kubernetes integration. Instead, we have been focusing
> our efforts in Apache Flink on the operations of a single Job, and left
> orchestration and lifecycle management that spans multiple Jobs to
> ecosystem projects. I still believe that we should keep this focus on low
> level composable building blocks (like Jobs and Snapshots) in Apache Flink
> to make it easy for everyone to build fitting higher level abstractions
> like a FlinkApplication Custom Resource on top of it. For example

Re: Flink native k8s integration vs. operator

2022-01-12 Thread Konstantin Knauf
cc dev@

Hi Thomas, Hi everyone,

Thank you for starting this discussion and sorry for chiming in late.

I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
Integration", in particular, it does actually not integrate well with the
Kubernetes ecosystem despite being called "native" (tooling, security
concerns).

With respect to common lifecycle management operations: these features are
not available (within Apache Flink) for any of the other resource providers
(YARN, Standalone) either. From this perspective, I wouldn't consider this
a shortcoming of the Kubernetes integration. Instead, we have been focusing
our efforts in Apache Flink on the operations of a single Job, and left
orchestration and lifecycle management that spans multiple Jobs to
ecosystem projects. I still believe that we should keep this focus on low
level composable building blocks (like Jobs and Snapshots) in Apache Flink
to make it easy for everyone to build fitting higher level abstractions
like a FlinkApplication Custom Resource on top of it. For example, we are
currently contributing multiple improvements [1,2,3,4] to the REST API and
Application Mode that in our experience will make it easier to manage
Apache Flink with a Kubernetes operator. Given this background, I suspect a
Kubernetes Operator in Apache Flink would not be a priority for us at
Ververica - at least right now.

Having said this, if others in the community have the capacity to push and
*maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
Flink, I don't see any blockers. If or when this happens, I'd see some
clear benefits of using a separate repository (easier independent
versioning and releases, different build system & tooling (go, I assume)).

Looking forward to your thoughts,

Konstantin

[1] https://issues.apache.org/jira/browse/FLINK-24275
[2] https://issues.apache.org/jira/browse/FLINK-24208
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
[4] https://issues.apache.org/jira/browse/FLINK-24113

On Mon, Jan 10, 2022 at 2:11 PM Gyula Fóra  wrote:

> Hi All!
>
> This is a very interesting discussion.
>
> I think many users find it confusing what deployment mode to choose when
> considering a new production application on Kubernetes. With all the
> options of native, standalone and different operators this can get tricky :)
>
> I really like the idea that Thomas brought up to have at least a minimal
> operator implementation in Flink itself to cover the most common production
> job lifecycle management scenarios. I think the Flink community has a very
> strong experience in this area to create a successful implementation that
> would benefit most production users on Kubernetes.
>
> Cheers,
> Gyula
>
> On Mon, Jan 10, 2022 at 4:29 AM Yang Wang  wrote:
>
>> Thanks all for this fruitful discussion.
>>
>> I think Xintong has given a strong point why we introduced the native K8s
>> integration, which is active resource management.
>> I have a concrete example for this in the production. When a K8s node is
>> down, the standalone K8s deployment will take longer
>> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
>> For the native K8s integration, Flink RM could be aware of the
>> TM heartbeat lost and allocate a new one timely.
>>
>> Also when introducing the native K8s integration, another hit is that we
>> should make the users are easy enough to migrate from YARN deployment.
>> They already have a production-ready job life-cycle management system,
>> which is using Flink CLI to submit the Flink jobs.
>> So we provide a consistent command "bin/flink run-application -t
>> kubernetes-application/yarn-application" to start a Flink application and
>> "bin/flink cancel/stop ..."
>> to terminate a Flink application.
>>
>>
>> Compared with K8s operator, I know that this is not a K8s
>> native mechanism. Hence, I also agree that we still need a powerful K8s
>> operator which
>> could work with both standalone and native K8s modes. The major
>> difference between them is how to start the JM and TM pods. For standalone,
>> they are managed by K8s job/deployment. For native, maybe we could simply
>> create a submission carrying the "flink run-application" arguments
>> which is derived from the Flink application CR.
>>
>> Make the Flink's active resource manager can talk to the K8s operator is
>> an interesting option, which could support both standalone and native.
>> Then Flink RM just needs to declare the resource requirement(e.g. 2 *
>> <2G, 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
>> to the K8s operat

Re: Flink native k8s integration vs. operator

2022-01-10 Thread Gyula Fóra
Hi All!

This is a very interesting discussion.

I think many users find it confusing what deployment mode to choose when
considering a new production application on Kubernetes. With all the
options of native, standalone and different operators this can get tricky :)

I really like the idea that Thomas brought up to have at least a minimal
operator implementation in Flink itself to cover the most common production
job lifecycle management scenarios. I think the Flink community has a very
strong experience in this area to create a successful implementation that
would benefit most production users on Kubernetes.

Cheers,
Gyula

On Mon, Jan 10, 2022 at 4:29 AM Yang Wang  wrote:

> Thanks all for this fruitful discussion.
>
> I think Xintong has given a strong point why we introduced the native K8s
> integration, which is active resource management.
> I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
> For the native K8s integration, Flink RM could be aware of the
> TM heartbeat lost and allocate a new one timely.
>
> Also when introducing the native K8s integration, another hit is that we
> should make the users are easy enough to migrate from YARN deployment.
> They already have a production-ready job life-cycle management system,
> which is using Flink CLI to submit the Flink jobs.
> So we provide a consistent command "bin/flink run-application -t
> kubernetes-application/yarn-application" to start a Flink application and
> "bin/flink cancel/stop ..."
> to terminate a Flink application.
>
>
> Compared with K8s operator, I know that this is not a K8s
> native mechanism. Hence, I also agree that we still need a powerful K8s
> operator which
> could work with both standalone and native K8s modes. The major difference
> between them is how to start the JM and TM pods. For standalone,
> they are managed by K8s job/deployment. For native, maybe we could simply
> create a submission carrying the "flink run-application" arguments
> which is derived from the Flink application CR.
>
> Make the Flink's active resource manager can talk to the K8s operator is
> an interesting option, which could support both standalone and native.
> Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G,
> 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
> to the K8s operator. It feels like an intermediate form between native and
> standalone mode :)
>
>
>
> Best,
> Yang
>
>
>
> Xintong Song  于2022年1月7日周五 12:02写道:
>
>> Hi folks,
>>
>> Thanks for the discussion. I'd like to share my two cents on this topic.
>>
>> Firstly, I'd like to clarify my understanding of the concepts "native k8s
>> integration" and "active resource management".
>> - Native k8s integration means Flink's master interacts with k8s' api
>> server directly. It acts like embedding an operator inside Flink's master,
>> which manages the resources (pod, deployment, configmap, etc.) and watches
>> / reacts to related events.
>> - Active resource management means Flink can actively start / terminate
>> workers as needed. Its key characteristic is that the resource a Flink
>> deployment uses is decided by the job's execution plan, unlike the opposite
>> reactive mode (resource available to the deployment decides the execution
>> plan) or the standalone mode (both execution plan and deployment resources
>> are predefined).
>>
>> Currently, we have the yarn and native k8s deployments (and the recently
>> removed mesos deployment) in active mode, due to their ability to request /
>> release worker resources from the underlying cluster. And all the existing
>> operators, AFAIK, work with a Flink standalone deployment, where Flink
>> cannot request / release resources by itself.
>>
>> From this perspective, I think a large part of the native k8s integration
>> advantages come from the active mode: being able to better understand the
>> job's resource requirements and adjust the deployment resource accordingly.
>> Both fine-grained resource management (customizing TM resources for
>> different tasks / operators) and adaptive batch scheduler (rescale the
>> deployment w.r.t. different stages) fall into this category.
>>
>> I'm wondering if we can have an operator that also works with the active
>> mode. Instead of talking to the api server directly for adding / deleting
>> resources, Flink's active resource manager can talk to the operator (via
>> CR) about the resources the deployme

Re: Flink native k8s integration vs. operator

2022-01-09 Thread Yang Wang
Thanks all for this fruitful discussion.

I think Xintong has given a strong point why we introduced the native K8s
integration, which is active resource management.
I have a concrete example for this in the production. When a K8s node is
down, the standalone K8s deployment will take longer
recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
For the native K8s integration, Flink RM could be aware of the
TM heartbeat lost and allocate a new one timely.

Also when introducing the native K8s integration, another hit is that we
should make the users are easy enough to migrate from YARN deployment.
They already have a production-ready job life-cycle management system,
which is using Flink CLI to submit the Flink jobs.
So we provide a consistent command "bin/flink run-application -t
kubernetes-application/yarn-application" to start a Flink application and
"bin/flink cancel/stop ..."
to terminate a Flink application.


Compared with K8s operator, I know that this is not a K8s native mechanism.
Hence, I also agree that we still need a powerful K8s operator which
could work with both standalone and native K8s modes. The major difference
between them is how to start the JM and TM pods. For standalone,
they are managed by K8s job/deployment. For native, maybe we could simply
create a submission carrying the "flink run-application" arguments
which is derived from the Flink application CR.

Make the Flink's active resource manager can talk to the K8s operator is an
interesting option, which could support both standalone and native.
Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G,
1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
to the K8s operator. It feels like an intermediate form between native and
standalone mode :)



Best,
Yang



Xintong Song  于2022年1月7日周五 12:02写道:

> Hi folks,
>
> Thanks for the discussion. I'd like to share my two cents on this topic.
>
> Firstly, I'd like to clarify my understanding of the concepts "native k8s
> integration" and "active resource management".
> - Native k8s integration means Flink's master interacts with k8s' api
> server directly. It acts like embedding an operator inside Flink's master,
> which manages the resources (pod, deployment, configmap, etc.) and watches
> / reacts to related events.
> - Active resource management means Flink can actively start / terminate
> workers as needed. Its key characteristic is that the resource a Flink
> deployment uses is decided by the job's execution plan, unlike the opposite
> reactive mode (resource available to the deployment decides the execution
> plan) or the standalone mode (both execution plan and deployment resources
> are predefined).
>
> Currently, we have the yarn and native k8s deployments (and the recently
> removed mesos deployment) in active mode, due to their ability to request /
> release worker resources from the underlying cluster. And all the existing
> operators, AFAIK, work with a Flink standalone deployment, where Flink
> cannot request / release resources by itself.
>
> From this perspective, I think a large part of the native k8s integration
> advantages come from the active mode: being able to better understand the
> job's resource requirements and adjust the deployment resource accordingly.
> Both fine-grained resource management (customizing TM resources for
> different tasks / operators) and adaptive batch scheduler (rescale the
> deployment w.r.t. different stages) fall into this category.
>
> I'm wondering if we can have an operator that also works with the active
> mode. Instead of talking to the api server directly for adding / deleting
> resources, Flink's active resource manager can talk to the operator (via
> CR) about the resources the deployment needs, and let the operator to
> actually add / remove the resources. The operator should be able to work
> with (active) or without (standalone) the information of deployment's
> resource requirements. In this way, users are free to choose between active
> and reactive (e.g., HPA) rescaling, while always benefiting from the
> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
> alignment with the K8s ecosystem (Flink client free, operating via kubectl,
> etc.).
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise  wrote:
>
>> Hi David,
>>
>> Thank you for the reply and context!
>>
>> As for workload types and where native integration might fit: I think
>> that any k8s native solution that satisfies category 3) can also take
>> care of 1) and 2) while the native integration by itself can't achieve
>> that. Existence of [1] might serve as further indication.
>>
>&

Re: Flink native k8s integration vs. operator

2022-01-06 Thread Xintong Song
Hi folks,

Thanks for the discussion. I'd like to share my two cents on this topic.

Firstly, I'd like to clarify my understanding of the concepts "native k8s
integration" and "active resource management".
- Native k8s integration means Flink's master interacts with k8s' api
server directly. It acts like embedding an operator inside Flink's master,
which manages the resources (pod, deployment, configmap, etc.) and watches
/ reacts to related events.
- Active resource management means Flink can actively start / terminate
workers as needed. Its key characteristic is that the resource a Flink
deployment uses is decided by the job's execution plan, unlike the opposite
reactive mode (resource available to the deployment decides the execution
plan) or the standalone mode (both execution plan and deployment resources
are predefined).

Currently, we have the yarn and native k8s deployments (and the recently
removed mesos deployment) in active mode, due to their ability to request /
release worker resources from the underlying cluster. And all the existing
operators, AFAIK, work with a Flink standalone deployment, where Flink
cannot request / release resources by itself.

>From this perspective, I think a large part of the native k8s integration
advantages come from the active mode: being able to better understand the
job's resource requirements and adjust the deployment resource accordingly.
Both fine-grained resource management (customizing TM resources for
different tasks / operators) and adaptive batch scheduler (rescale the
deployment w.r.t. different stages) fall into this category.

I'm wondering if we can have an operator that also works with the active
mode. Instead of talking to the api server directly for adding / deleting
resources, Flink's active resource manager can talk to the operator (via
CR) about the resources the deployment needs, and let the operator to
actually add / remove the resources. The operator should be able to work
with (active) or without (standalone) the information of deployment's
resource requirements. In this way, users are free to choose between active
and reactive (e.g., HPA) rescaling, while always benefiting from the
beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
alignment with the K8s ecosystem (Flink client free, operating via kubectl,
etc.).

Thank you~

Xintong Song



On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise  wrote:

> Hi David,
>
> Thank you for the reply and context!
>
> As for workload types and where native integration might fit: I think
> that any k8s native solution that satisfies category 3) can also take
> care of 1) and 2) while the native integration by itself can't achieve
> that. Existence of [1] might serve as further indication.
>
> The k8s operator pattern would be an essential building block for a
> k8s native solution that is interoperable with k8s ecosystem tooling
> like kubectl, which is why [2] and subsequent derived art were
> created. Specifically the CRD allows us to directly express the
> concept of a Flink application consisting of job manager and task
> manager pods along with associated create/update/delete operations.
>
> Would it make sense to gauge interest to have such an operator as part
> of Flink? It appears so from discussions like [3]. I think such
> addition would significantly lower the barrier to adoption, since like
> you mentioned one cannot really run mission critical streaming
> workloads with just the Apache Flink release binaries alone. While it
> is great to have multiple k8s operators to choose from that are
> managed outside Flink, it is unfortunately also evident that today's
> hot operator turns into tomorrow's tech debt. I think such fate would
> be less likely within the project, when multiple parties can join
> forces and benefit from each other's contributions. There were similar
> considerations and discussions around Docker images in the past.
>
> Out of the features that you listed it is particularly the application
> upgrade that needs to be solved through an external process like
> operator. The good thing is that many folks have already thought hard
> about this and in existing implementations we see different strategies
> that have their merit and production mileage (certainly applies to
> [2]). We could combine the best of these ideas into a unified
> implementation as part of Flink itself as starting point.
>
> Cheers,
> Thomas
>
>
> [1] https://github.com/wangyang0918/flink-native-k8s-operator
> [2] https://github.com/lyft/flinkk8soperator
> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
>
>
> On Tue, Jan 4, 2022 at 4:04 AM David Morávek  wrote:
> >
> > Hi Thomas,
> >
> > AFAIK there are no specific plans in this direction with the native
> integration, but I'd 

Re: Flink native k8s integration vs. operator

2022-01-05 Thread Thomas Weise
Hi David,

Thank you for the reply and context!

As for workload types and where native integration might fit: I think
that any k8s native solution that satisfies category 3) can also take
care of 1) and 2) while the native integration by itself can't achieve
that. Existence of [1] might serve as further indication.

The k8s operator pattern would be an essential building block for a
k8s native solution that is interoperable with k8s ecosystem tooling
like kubectl, which is why [2] and subsequent derived art were
created. Specifically the CRD allows us to directly express the
concept of a Flink application consisting of job manager and task
manager pods along with associated create/update/delete operations.

Would it make sense to gauge interest to have such an operator as part
of Flink? It appears so from discussions like [3]. I think such
addition would significantly lower the barrier to adoption, since like
you mentioned one cannot really run mission critical streaming
workloads with just the Apache Flink release binaries alone. While it
is great to have multiple k8s operators to choose from that are
managed outside Flink, it is unfortunately also evident that today's
hot operator turns into tomorrow's tech debt. I think such fate would
be less likely within the project, when multiple parties can join
forces and benefit from each other's contributions. There were similar
considerations and discussions around Docker images in the past.

Out of the features that you listed it is particularly the application
upgrade that needs to be solved through an external process like
operator. The good thing is that many folks have already thought hard
about this and in existing implementations we see different strategies
that have their merit and production mileage (certainly applies to
[2]). We could combine the best of these ideas into a unified
implementation as part of Flink itself as starting point.

Cheers,
Thomas


[1] https://github.com/wangyang0918/flink-native-k8s-operator
[2] https://github.com/lyft/flinkk8soperator
[3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080


On Tue, Jan 4, 2022 at 4:04 AM David Morávek  wrote:
>
> Hi Thomas,
>
> AFAIK there are no specific plans in this direction with the native 
> integration, but I'd like to share some thoughts on the topic
>
> In my understanding there are three major groups of workloads in Flink:
>
> 1) Batch workloads
> 2) Interactive workloads (Both Batch and Streaming; eg. SQL Gateway / 
> Zeppelin Notebooks / VVP ...)
> 3) "Mission Critical" Streaming workloads
>
> I think the native integration fits really well in the first two categories. 
> Let's talk about these first:
>
> 1) Batch workloads
>
> You don't really need to address the upgrade story here. The interesting 
> topic is how to "dynamically" adjust parallelism as the workload can change 
> between stages. This is where the Adaptive Batch Scheduler [1] comes into 
> play. To leverage the scheduler to the full extend, it needs to be deployed 
> with the remote shuffle service in place [2], so the Flink's Resource Manager 
> can free TaskManagers that are no longer needed.
>
> This can IMO work really well with the native integration as there is really 
> clear approach on how the Resource Manager should decide on what resources 
> should be allocated.
>
> 2) Interactive workloads
>
> Again, the upgrade story is not really interesting in this scenario. For 
> batch workloads, it's basically the same as the above. For streaming one this 
> gets tricky. The main initiative that we current have in terms of auto 
> scaling / re-scaling of the streaming workloads is the reactive mode 
> (adaptive scheduler) [3].
>
> I can totally see how the reactive mode could be integrated in the native 
> integration, but with the application mode, which is not really suitable for 
> the interactive workloads. For integration with session cluster, we'd first 
> need to address the "scheduling" problem of how to distribute newly available 
> resources between multiple jobs.
>
> What's pretty neat here is that the interpreter (zeppelin, sql gw, ...) have 
> a really convenient way of spinning up a new cluster inside k8s.
>
> 3) "Mission Critical" Streaming workloads
>
> This one is IMO the primary reason why one would consider building a new 
> operator these days as this needs a careful lifecycle management of the 
> pipeline. I assume this is also the use case that you're investigating, am I 
> correct?
>
> I'd second the requirements that you've already stated:
> a) Resource efficiency - being able to re-scale based on the workload, in 
> order to keep up with the input / not waste resources
> b) Fast recovery
> c) Application upgrades
>
> I personally don't think that the n

Re: Flink native k8s integration vs. operator

2022-01-04 Thread David Morávek
, it may be fairly simple to implement. What I'm
struggling with are the more complex upgrade scenarios such as dual, blue /
green deployment.


To sum this up, I'd really love if Flink could provide great out-of the box
experience with standalone mode on k8s, that makes the experience as close
to running / operating any other application as possible.

I'd really appreciate to hear your thoughts on this topic.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
[2] https://github.com/flink-extended/flink-remote-shuffle
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes

Best,
D.

On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise  wrote:

> Hi,
>
> I was recently looking at the Flink native Kubernetes integration [1]
> to get an idea how it relates to existing operator based solutions
> [2], [3].
>
> Part of the native integration's motivations was simplicity (no extra
> component to install), but arguably that is also a shortcoming. The
> k8s operator model can offer support for application lifecycle like
> upgrade and rescaling, as well as job submission without a Flink
> client.
>
> When using the Flink native integration it would still be necessary to
> provide that controller functionality. Is the idea to use the native
> integration for task manager resource allocation in tandem with an
> operator that provides the external controller functionality? If
> anyone using the Flink native integration can share experience, I
> would be curious to learn more about the specific setup and if there
> are plans to expand the k8s native integration capabilities.
>
> For example:
>
> * Application upgrade with features such as [4]. Since the job manager
> is part of the deployment it cannot orchestrate the deployment. It
> needs to be the responsibility of an external process. Has anyone
> contemplated adding such a component to Flink itself?
>
> * Rescaling: Theoretically a parallelism change could be performed w/o
> restart of the job manager pod. Hence, building blocks to trigger and
> apply rescaling could be part of Flink itself. Has this been explored
> further?
>
> Yang kindly pointed me to [5]. Is the recommendation/conclusion that
> when a k8s operator is already used, then let it be in charge of the
> task manager resource allocation? If so, what scenario was the native
> k8s integration originally intended for?
>
> Thanks,
> Thomas
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
> [2] https://github.com/lyft/flinkk8soperator
> [3] https://github.com/spotify/flink-on-k8s-operator
> [4]
> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
> [5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d
>


Flink native k8s integration vs. operator

2022-01-03 Thread Thomas Weise
Hi,

I was recently looking at the Flink native Kubernetes integration [1]
to get an idea how it relates to existing operator based solutions
[2], [3].

Part of the native integration's motivations was simplicity (no extra
component to install), but arguably that is also a shortcoming. The
k8s operator model can offer support for application lifecycle like
upgrade and rescaling, as well as job submission without a Flink
client.

When using the Flink native integration it would still be necessary to
provide that controller functionality. Is the idea to use the native
integration for task manager resource allocation in tandem with an
operator that provides the external controller functionality? If
anyone using the Flink native integration can share experience, I
would be curious to learn more about the specific setup and if there
are plans to expand the k8s native integration capabilities.

For example:

* Application upgrade with features such as [4]. Since the job manager
is part of the deployment it cannot orchestrate the deployment. It
needs to be the responsibility of an external process. Has anyone
contemplated adding such a component to Flink itself?

* Rescaling: Theoretically a parallelism change could be performed w/o
restart of the job manager pod. Hence, building blocks to trigger and
apply rescaling could be part of Flink itself. Has this been explored
further?

Yang kindly pointed me to [5]. Is the recommendation/conclusion that
when a k8s operator is already used, then let it be in charge of the
task manager resource allocation? If so, what scenario was the native
k8s integration originally intended for?

Thanks,
Thomas

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
[2] https://github.com/lyft/flinkk8soperator
[3] https://github.com/spotify/flink-on-k8s-operator
[4] https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
[5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d


Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

2021-12-15 Thread Yang Wang
Could you please check whether the JobManager has a long fullGC, which will
cause the leadership lost?

BTW, increasing the timeout should help.

high-availability.kubernetes.leader-election.lease-duration: 60s
high-availability.kubernetes.leader-election.renew-deadline: 60s

Best,
Yang


Alexey Trenikhun  于2021年12月14日周二 05:36写道:

> Hi David,
>
> Setup is application mode, single job, single JM (Kubernetes job), k8s
> v1.18.2. I'm attaching JM log.
>
>
> Thanks,
> Alexey
> --
> *From:* David Morávek 
> *Sent:* Monday, December 13, 2021 12:59 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Flink 1.13.3, k8s HA - ResourceManager was revoked
> leadership
>
> Hi Alexey,
>
> please be aware that the json-based logs in the mail may not make it pass
> the spam filter (at least for gmail they did not) :(
>
> K8s based leader election is based on optimistic locking of the underlying
> config-map (~ periodically updating the lease annotation of the
> config-map). If JM fails to update this lease within a deadline, the
> leadership is lost.
>
> Can you please elaborate a bit about your setup and your k8s related Flink
> configurations? Also could you share the whole JM log by any chance (gist /
> email attachment)?
>
> Best,
> D.
>
> On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun  wrote:
>
> Hello,
> I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts
> after some time, in log below job runs ~8 minutes, then suddenly leadership
> was revoked, job reaches terminal state and K8s restarts failed JM:
>
> {"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1)
> (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to
> RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
>
> {"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager
> akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was
> revoked leadership. Clearing fencing
> token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> KubernetesLeaderRetrievalDriver{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is
> closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot
> manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner
> was revoked the leadership with leader id
> 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the
> DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping
> SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProces

Re: Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

2021-12-13 Thread David Morávek
Hi Alexey,

please be aware that the json-based logs in the mail may not make it pass
the spam filter (at least for gmail they did not) :(

K8s based leader election is based on optimistic locking of the underlying
config-map (~ periodically updating the lease annotation of the
config-map). If JM fails to update this lease within a deadline, the
leadership is lost.

Can you please elaborate a bit about your setup and your k8s related Flink
configurations? Also could you share the whole JM log by any chance (gist /
email attachment)?

Best,
D.

On Sat, Dec 11, 2021 at 6:47 AM Alexey Trenikhun  wrote:

> Hello,
> I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts
> after some time, in log below job runs ~8 minutes, then suddenly leadership
> was revoked, job reaches terminal state and K8s restarts failed JM:
>
> {"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1)
> (47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to
> RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
>
> {"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager
> akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was
> revoked leadership. Clearing fencing
> token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping
> KubernetesLeaderRetrievalDriver{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is
> closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot
> manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner
> was revoked the leadership with leader id
> 138b4029-88eb-409f-98cc-e296fe400eb8. Stopping the
> DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping
> SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher
> akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
> {"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently
> running jobs of dispatcher akka.tcp://
> flink@10.244.104.239:6123/user/rpc/dispatcher_1
> .","logger_name":"org.apache.flink.runt

Flink 1.13.3, k8s HA - ResourceManager was revoked leadership

2021-12-10 Thread Alexey Trenikhun
Hello,
I'm running Flink 1.13.3 with Kubernetes HA. JM periodically restarts after 
some time, in log below job runs ~8 minutes, then suddenly leadership was 
revoked, job reaches terminal state and K8s restarts failed JM:

{"timestamp":"2021-12-11T04:51:53.697Z","message":"Agent Info (1/1) 
(47e6706e52ad96111a3d722cc56b5752) switched from INITIALIZING to 
RUNNING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.483Z","message":"ResourceManager 
akka.tcp://flink@10.244.104.239:6123/user/rpc/resourcemanager_0 was revoked 
leadership. Clearing fencing 
token.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping 
DefaultLeaderRetrievalService.","logger_name":"org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.484Z","message":"Stopping 
KubernetesLeaderRetrievalDriver{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.485Z","message":"The watcher is 
closing.","logger_name":"org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.487Z","message":"Suspending the slot 
manager.","logger_name":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.555Z","message":"DefaultDispatcherRunner was 
revoked the leadership with leader id 138b4029-88eb-409f-98cc-e296fe400eb8. 
Stopping the 
DispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.556Z","message":"Stopping 
SessionDispatcherLeaderProcess.","logger_name":"org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess","thread_name":"KubernetesLeaderElector-ExecutorService-thread-1","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.557Z","message":"Stopping dispatcher 
akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.558Z","message":"Stopping all currently 
running jobs of dispatcher 
akka.tcp://flink@10.244.104.239:6123/user/rpc/dispatcher_1.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.560Z","message":"Stopping the JobMaster for 
job 
gim().","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
{"timestamp":"2021-12-11T05:06:10.565Z","message":"Job gim 
() switched from state RUNNING to 
SUSPENDED.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"org.apache.flink.util.FlinkException:
 Scheduler is being stopped.\n\tat 
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)\n\tat
 org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)\n\tat 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat 

flink on k8s 使用OSS作为backend遇到的业务数据丢失问题

2021-11-29 Thread 赵旭晨
现在使用阿里云oss作为状态存储后端。发现业务数据无法在checkpoint中回放,重点代码如下:
main:
sink 算子:


checkpoint时将业务数据存入oss


重启后应该从最近checkpoint中将数据回放
以上代码在直接跑在虚拟机上的作业(状态存储是本地硬盘)是没问题的,业务数据能够从检查点中回放的


但是一旦使用k8s-oss体系,
检查点业务数据存入没问题


但我将作业重启后


由于context.isRestored()是false,所以无法将重启前作业的业务数据回放,导致数据丢失。
还请各位大佬帮忙看看,谢谢谢~~~







Re: Start Flink cluster, k8s pod behavior

2021-10-08 Thread Yang Wang
Did you use the "jobmanager.sh start-foreground" in your own
"run-job-manager.sh", just like what Flink has done
in the docker-entrypoint.sh[1]?

I strongly suggest to start the Flink session cluster with official
yamls[2].

[1].
https://github.com/apache/flink-docker/blob/master/1.13/scala_2.11-java11-debian/docker-entrypoint.sh#L114
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/kubernetes/#starting-a-kubernetes-cluster-session-mode

Best,
Yang

Qihua Yang  于2021年10月1日周五 上午2:59写道:

> Looks like after script *flink-daemon.sh *complete, it return exit 0.
> Kubernetes regard it as done. Is that expected?
>
> Thanks,
> Qihua
>
> On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang  wrote:
>
>> Thank you for your reply.
>> From the log, exit code is 0, and reason is Completed.
>> Looks like the cluster is fine. But why kubenetes restart the pod. As you
>> said, from perspective of Kubernetes everything is done. Then how to
>> prevent the restart?
>> It didn't even give chance to upload and run a jar
>>
>> Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
>> Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
>> Command:
>>   /opt/flink/bin/entrypoint.sh
>> Args:
>>   /opt/flink/bin/run-job-manager.sh
>> State:  Waiting
>>   Reason:   CrashLoopBackOff
>> Last State: Terminated
>>   Reason:   Completed
>>   Exit Code:0
>>   Started:  Wed, 29 Sep 2021 20:12:30 -0700
>>   Finished: Wed, 29 Sep 2021 20:12:45 -0700
>> Ready:  False
>> Restart Count:  131
>>
>> Thanks,
>> Qihua
>>
>> On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler 
>> wrote:
>>
>>> Is the run-job-manager.sh script actually blocking?
>>> Since you (apparently) use that as an entrypoint, if that scripts exits
>>> after starting the JM then from the perspective of Kubernetes everything is
>>> done.
>>>
>>> On 30/09/2021 08:59, Matthias Pohl wrote:
>>>
>>> Hi Qihua,
>>> I guess, looking into kubectl describe and the JobManager logs would
>>> help in understanding what's going on.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>>>
 Hi,
 I deployed flink in session mode. I didn't run any jobs. I saw below
 logs. That is normal, same as Flink menual shows.

 + /opt/flink/bin/run-job-manager.sh
 Starting HA cluster with 1 masters.
 Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
 Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.


 But when I check kubectl, it shows status is Completed. After a while,
 status changed to CrashLoopBackOff, and pod restart.
 NAME  READY
   STATUS RESTARTS   AGE
 job-manager-776dcf6dd-xzs8g   0/1 Completed  5
  5m27s

 NAME  READY
   STATUS RESTARTS   AGE
 job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
  7m35s

 Anyone can help me understand why?
 Why do kubernetes regard this pod as completed and restart? Should I
 config something? either Flink side or Kubernetes side? From the Flink
 manual, after the cluster is started, I can upload a jar to run the
 application.

 Thanks,
 Qihua

>>>
>>>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
Looks like after script *flink-daemon.sh *complete, it return exit 0.
Kubernetes regard it as done. Is that expected?

Thanks,
Qihua

On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang  wrote:

> Thank you for your reply.
> From the log, exit code is 0, and reason is Completed.
> Looks like the cluster is fine. But why kubenetes restart the pod. As you
> said, from perspective of Kubernetes everything is done. Then how to
> prevent the restart?
> It didn't even give chance to upload and run a jar
>
> Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
> Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
> Command:
>   /opt/flink/bin/entrypoint.sh
> Args:
>   /opt/flink/bin/run-job-manager.sh
> State:  Waiting
>   Reason:   CrashLoopBackOff
> Last State: Terminated
>   Reason:   Completed
>   Exit Code:0
>   Started:  Wed, 29 Sep 2021 20:12:30 -0700
>   Finished: Wed, 29 Sep 2021 20:12:45 -0700
> Ready:  False
> Restart Count:  131
>
> Thanks,
> Qihua
>
> On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler 
> wrote:
>
>> Is the run-job-manager.sh script actually blocking?
>> Since you (apparently) use that as an entrypoint, if that scripts exits
>> after starting the JM then from the perspective of Kubernetes everything is
>> done.
>>
>> On 30/09/2021 08:59, Matthias Pohl wrote:
>>
>> Hi Qihua,
>> I guess, looking into kubectl describe and the JobManager logs would help
>> in understanding what's going on.
>>
>> Best,
>> Matthias
>>
>> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>>
>>> Hi,
>>> I deployed flink in session mode. I didn't run any jobs. I saw below
>>> logs. That is normal, same as Flink menual shows.
>>>
>>> + /opt/flink/bin/run-job-manager.sh
>>> Starting HA cluster with 1 masters.
>>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>>
>>>
>>> But when I check kubectl, it shows status is Completed. After a while,
>>> status changed to CrashLoopBackOff, and pod restart.
>>> NAME  READY
>>>   STATUS RESTARTS   AGE
>>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>>  5m27s
>>>
>>> NAME  READY
>>>   STATUS RESTARTS   AGE
>>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>>  7m35s
>>>
>>> Anyone can help me understand why?
>>> Why do kubernetes regard this pod as completed and restart? Should I
>>> config something? either Flink side or Kubernetes side? From the Flink
>>> manual, after the cluster is started, I can upload a jar to run the
>>> application.
>>>
>>> Thanks,
>>> Qihua
>>>
>>
>>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
Thank you for your reply.
>From the log, exit code is 0, and reason is Completed.
Looks like the cluster is fine. But why kubenetes restart the pod. As you
said, from perspective of Kubernetes everything is done. Then how to
prevent the restart?
It didn't even give chance to upload and run a jar

Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
Command:
  /opt/flink/bin/entrypoint.sh
Args:
  /opt/flink/bin/run-job-manager.sh
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Completed
  Exit Code:0
  Started:  Wed, 29 Sep 2021 20:12:30 -0700
  Finished: Wed, 29 Sep 2021 20:12:45 -0700
Ready:  False
Restart Count:  131

Thanks,
Qihua

On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler  wrote:

> Is the run-job-manager.sh script actually blocking?
> Since you (apparently) use that as an entrypoint, if that scripts exits
> after starting the JM then from the perspective of Kubernetes everything is
> done.
>
> On 30/09/2021 08:59, Matthias Pohl wrote:
>
> Hi Qihua,
> I guess, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>
>> Hi,
>> I deployed flink in session mode. I didn't run any jobs. I saw below
>> logs. That is normal, same as Flink menual shows.
>>
>> + /opt/flink/bin/run-job-manager.sh
>> Starting HA cluster with 1 masters.
>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>
>>
>> But when I check kubectl, it shows status is Completed. After a while,
>> status changed to CrashLoopBackOff, and pod restart.
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>  5m27s
>>
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>  7m35s
>>
>> Anyone can help me understand why?
>> Why do kubernetes regard this pod as completed and restart? Should I
>> config something? either Flink side or Kubernetes side? From the Flink
>> manual, after the cluster is started, I can upload a jar to run the
>> application.
>>
>> Thanks,
>> Qihua
>>
>
>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
I did check the kubectl describe, it shows below info. Reason is Completed.

Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
Command:
  /opt/flink/bin/entrypoint.sh
Args:
  /opt/flink/bin/run-job-manager.sh
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Completed
  Exit Code:0
  Started:  Wed, 29 Sep 2021 20:12:30 -0700
  Finished: Wed, 29 Sep 2021 20:12:45 -0700
Ready:  False
Restart Count:  131


On Wed, Sep 29, 2021 at 11:59 PM Matthias Pohl 
wrote:

> Hi Qihua,
> I guess, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>
>> Hi,
>> I deployed flink in session mode. I didn't run any jobs. I saw below
>> logs. That is normal, same as Flink menual shows.
>>
>> + /opt/flink/bin/run-job-manager.sh
>> Starting HA cluster with 1 masters.
>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>
>> But when I check kubectl, it shows status is Completed. After a while,
>> status changed to CrashLoopBackOff, and pod restart.
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>  5m27s
>>
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>  7m35s
>>
>> Anyone can help me understand why?
>> Why do kubernetes regard this pod as completed and restart? Should I
>> config something? either Flink side or Kubernetes side? From the Flink
>> manual, after the cluster is started, I can upload a jar to run the
>> application.
>>
>> Thanks,
>> Qihua
>>
>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Chesnay Schepler

Is the run-job-manager.sh script actually blocking?
Since you (apparently) use that as an entrypoint, if that scripts exits 
after starting the JM then from the perspective of Kubernetes everything 
is done.


On 30/09/2021 08:59, Matthias Pohl wrote:

Hi Qihua,
I guess, looking into kubectl describe and the JobManager logs would 
help in understanding what's going on.


Best,
Matthias

On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang > wrote:


Hi,
I deployed flink in session mode. I didn't run any jobs. I saw
below logs. That is normal, same as Flink menual shows.

+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.

But when I check kubectl, it shows status is Completed. After a
while, status changed to CrashLoopBackOff, and pod restart.
NAME              READY   STATUS             RESTARTS   AGE
job-manager-776dcf6dd-xzs8g       0/1     Completed      5        
 5m27s

NAME              READY   STATUS             RESTARTS   AGE
job-manager-776dcf6dd-xzs8g       0/1 CrashLoopBackOff   5        
 7m35s

Anyone can help me understand why?
Why do kubernetes regard this pod as completed and restart? Should
I config something? either Flink side or Kubernetes side? From the
Flink manual, after the cluster is started, I can upload a jar to
run the application.

Thanks,
Qihua





Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Matthias Pohl
Hi Qihua,
I guess, looking into kubectl describe and the JobManager logs would help
in understanding what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:

> Hi,
> I deployed flink in session mode. I didn't run any jobs. I saw below logs.
> That is normal, same as Flink menual shows.
>
> + /opt/flink/bin/run-job-manager.sh
> Starting HA cluster with 1 masters.
> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>
> But when I check kubectl, it shows status is Completed. After a while,
> status changed to CrashLoopBackOff, and pod restart.
> NAME  READY
> STATUS RESTARTS   AGE
> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>  5m27s
>
> NAME  READY
> STATUS RESTARTS   AGE
> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>  7m35s
>
> Anyone can help me understand why?
> Why do kubernetes regard this pod as completed and restart? Should I
> config something? either Flink side or Kubernetes side? From the Flink
> manual, after the cluster is started, I can upload a jar to run the
> application.
>
> Thanks,
> Qihua
>


Start Flink cluster, k8s pod behavior

2021-09-29 Thread Qihua Yang
Hi,
I deployed flink in session mode. I didn't run any jobs. I saw below logs.
That is normal, same as Flink menual shows.

+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.

But when I check kubectl, it shows status is Completed. After a while,
status changed to CrashLoopBackOff, and pod restart.
NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 Completed  5
 5m27s

NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
 7m35s

Anyone can help me understand why?
Why do kubernetes regard this pod as completed and restart? Should I config
something? either Flink side or Kubernetes side? From the Flink manual,
after the cluster is started, I can upload a jar to run the application.

Thanks,
Qihua


Re: flink on k8s作业日志持久化问题

2021-08-22 Thread Yang Wang
日志采集一般两种方式:
1. K8s节点侧统一收集,例如阿里云的ilogtail[1],Flink一般只需要输出到标准输出或者emptyDir挂载就可以了
2. 利用log4j2 custom appender,直接将日志推送到存储服务(OSS、阿里云SLS等),需要自己写一个插件或者使用阿里云现有提供的

[1]. https://help.aliyun.com/document_detail/87540.html

Best,
Yang

东东  于2021年8月23日周一 下午12:14写道:

>
>
>
> 把容器的日志采集下来不就行了么,K8s下ELK采集容器日志的方案很成熟啊,基本上官方Helm Charts默认安装就能全采集下来。
>
>
>
> 在 2021-08-23 11:37:54,"casel.chen"  写道:
> >flink 1.12.1版本,作业通过flink run命令提交,运行在native
> k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root
> cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云
> EKS,是否可以配置作业使用阿里云日志服务呢?我知道k8s应用可以配置ecs宿主机路径映射存储,但这需要修改创建作业的yaml文件,不过我没有看到flink
> on k8s启动命令有这个选项,求解答,谢谢!
>


flink on k8s作业日志持久化问题

2021-08-22 Thread casel.chen
flink 1.12.1版本,作业通过flink run命令提交,运行在native 
k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root 
cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云 
EKS,是否可以配置作业使用阿里云日志服务呢?我知道k8s应用可以配置ecs宿主机路径映射存储,但这需要修改创建作业的yaml文件,不过我没有看到flink 
on k8s启动命令有这个选项,求解答,谢谢!

Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-22 Thread L . C . Hsieh


I also noticed that Flink s3-hadoop plugin has Hadoop common dependency. I' 
trying this.

>From the logs, the plugin is enabled:

Enabling required built-in plugins
Linking flink-s3-fs-hadoop-1.12-SNAPSHOT.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.12-SNAPSHOT.jar

But I also got:

java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable at 
java.net.URLClassLoader.findClass(URLClassLoader.java:382) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)

So it looks like user code cannot use plugin's classes (e.g. Hadoop classes)?

I don't see hadoop-common is shaded at the plugin.


On 2021/08/22 18:24:24, L. C. Hsieh  wrote: 
> 
> As I know, flink-shaded-hadoop is not officially supported since Flink 1.11 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html).
> 
> Anyway, I installed Hadoop common package into the docker images to make 
> Flink happy. I marked the hadoop dependencies in the iceberg-test application 
> as "provided". Looks okay as it won't throw the previous LinkageError anymore.
> 
> But a new issue is, the application submitted cannot be run because the task 
> manager pod is failed to run. There is warning message by describing pod 
> status:
> 
>   Warning  FailedMount  4m4s  kubeletUnable to 
> attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
> volumes=[flink-config-volume kube-api-access-772x5 hadoop-config-volume]: 
> timed out waiting for the condition
>   Warning  FailedMount  108s (x2 over 6m22s)  kubeletUnable to 
> attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
> volumes=[hadoop-config-volume flink-config-volume kube-api-access-772x5]: 
> timed out waiting for the condition
>   Warning  FailedMount  11s (x12 over 8m25s)  kubelet
> MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
> "hadoop-config-my-first-flink-cluster" not found
> 
> Seems it cannot mount "hadoop-config-volume".
> 
> From the doc 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/),
>  it looks like that Flink will add some internal ConfigMap volumes 
> automatically. So again, I am not sure what is wrong in above steps...
> 
> 
> On 2021/08/22 10:01:25, Manong Karl  wrote: 
> > I prefer using flink bundled hadoop, such as
> > https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar.
> > May help.
> > 
> > L. C. Hsieh  于2021年8月22日周日 上午1:40写道:
> > 
> > >
> > > BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop
> > > common dependency. So I'm not sure why Flink throws such exception. Based
> > > on Flink doc, I suppose that Flink binary doesn't include Hadoop
> > > dependencies, right?
> > >
> > > Based on the exception, looks like when FlinkCatalogFactory (from Iceberg)
> > > calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders
> > > are different and referring to different Hadoop Configuration Class 
> > > objects.
> > >
> > > I'm not familiar with Flink. So I'm wondering what step is wrong during
> > > the testing? It is a pretty simple test to verify Iceberg and Flink.
> > >
> > > On 2021/08/21 08:50:05, L. C. Hsieh  wrote:
> > > >
> > > > Thanks for replying.
> > > >
> > > > I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12
> > > actually.
> > > >
> > > > Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java
> > > application. I got new exception as below:
> > > >
> > > > java.lang.LinkageError: loader constraint violation: when resolving
> > > method
> > > "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
> > > the class loader (instance of org/apache/flink/util/ChildFirstClassLoader)
> > > of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and 
> > > the
> > > class loader (instance of sun/misc/Launcher$AppClassLoader) for the
> > > method's defining class, org/apache/flink/runtime/util/HadoopUtils, have
> > > different Class objects for the type org/apache/hadoop/conf/Configuration
> > > used in the signature at
> > > org.apach

Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-22 Thread L . C . Hsieh


As I know, flink-shaded-hadoop is not officially supported since Flink 1.11 
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html).

Anyway, I installed Hadoop common package into the docker images to make Flink 
happy. I marked the hadoop dependencies in the iceberg-test application as 
"provided". Looks okay as it won't throw the previous LinkageError anymore.

But a new issue is, the application submitted cannot be run because the task 
manager pod is failed to run. There is warning message by describing pod status:

  Warning  FailedMount  4m4s  kubeletUnable to 
attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
volumes=[flink-config-volume kube-api-access-772x5 hadoop-config-volume]: timed 
out waiting for the condition
  Warning  FailedMount  108s (x2 over 6m22s)  kubeletUnable to 
attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
volumes=[hadoop-config-volume flink-config-volume kube-api-access-772x5]: timed 
out waiting for the condition
  Warning  FailedMount  11s (x12 over 8m25s)  kubelet
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
"hadoop-config-my-first-flink-cluster" not found

Seems it cannot mount "hadoop-config-volume".

>From the doc 
>(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/),
> it looks like that Flink will add some internal ConfigMap volumes 
>automatically. So again, I am not sure what is wrong in above steps...


On 2021/08/22 10:01:25, Manong Karl  wrote: 
> I prefer using flink bundled hadoop, such as
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar.
> May help.
> 
> L. C. Hsieh  于2021年8月22日周日 上午1:40写道:
> 
> >
> > BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop
> > common dependency. So I'm not sure why Flink throws such exception. Based
> > on Flink doc, I suppose that Flink binary doesn't include Hadoop
> > dependencies, right?
> >
> > Based on the exception, looks like when FlinkCatalogFactory (from Iceberg)
> > calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders
> > are different and referring to different Hadoop Configuration Class objects.
> >
> > I'm not familiar with Flink. So I'm wondering what step is wrong during
> > the testing? It is a pretty simple test to verify Iceberg and Flink.
> >
> > On 2021/08/21 08:50:05, L. C. Hsieh  wrote:
> > >
> > > Thanks for replying.
> > >
> > > I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12
> > actually.
> > >
> > > Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java
> > application. I got new exception as below:
> > >
> > > java.lang.LinkageError: loader constraint violation: when resolving
> > method
> > "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
> > the class loader (instance of org/apache/flink/util/ChildFirstClassLoader)
> > of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the
> > class loader (instance of sun/misc/Launcher$AppClassLoader) for the
> > method's defining class, org/apache/flink/runtime/util/HadoopUtils, have
> > different Class objects for the type org/apache/hadoop/conf/Configuration
> > used in the signature at
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)
> > >
> > >
> > > On 2021/08/21 08:11:33, Manong Karl  wrote:
> > > > Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> > > >
> > > > L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> > > >
> > > > > Hi, I'm testing using Flink to write Iceberg table. I run Flink
> > native K8S
> > > > > cluster locally and submit a simple Java program that writes out
> > Iceberg
> > > > > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > > > > exception:
> > > > >
> > > > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> > at
> > > > >
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > > > > at
> > > > >
> > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > > > > at
> > > > >
> > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> > > > >
> > > > > The uploaded is a fat jar. I also checked the uploaded application
> > jar. It
> > > > > has the Configuration class. So I don't know what is wrong there.
> > Any idea
> > > > > or suggestion? Thanks.
> > > > >
> > > >
> > >
> >
> 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-22 Thread Manong Karl
I prefer using flink bundled hadoop, such as
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar.
May help.

L. C. Hsieh  于2021年8月22日周日 上午1:40写道:

>
> BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop
> common dependency. So I'm not sure why Flink throws such exception. Based
> on Flink doc, I suppose that Flink binary doesn't include Hadoop
> dependencies, right?
>
> Based on the exception, looks like when FlinkCatalogFactory (from Iceberg)
> calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders
> are different and referring to different Hadoop Configuration Class objects.
>
> I'm not familiar with Flink. So I'm wondering what step is wrong during
> the testing? It is a pretty simple test to verify Iceberg and Flink.
>
> On 2021/08/21 08:50:05, L. C. Hsieh  wrote:
> >
> > Thanks for replying.
> >
> > I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12
> actually.
> >
> > Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java
> application. I got new exception as below:
> >
> > java.lang.LinkageError: loader constraint violation: when resolving
> method
> "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
> the class loader (instance of org/apache/flink/util/ChildFirstClassLoader)
> of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the
> class loader (instance of sun/misc/Launcher$AppClassLoader) for the
> method's defining class, org/apache/flink/runtime/util/HadoopUtils, have
> different Class objects for the type org/apache/hadoop/conf/Configuration
> used in the signature at
> org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)
> >
> >
> > On 2021/08/21 08:11:33, Manong Karl  wrote:
> > > Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> > >
> > > L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> > >
> > > > Hi, I'm testing using Flink to write Iceberg table. I run Flink
> native K8S
> > > > cluster locally and submit a simple Java program that writes out
> Iceberg
> > > > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > > > exception:
> > > >
> > > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> at
> > > >
> org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > > > at
> > > >
> org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > > > at
> > > >
> com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> > > >
> > > > The uploaded is a fat jar. I also checked the uploaded application
> jar. It
> > > > has the Configuration class. So I don't know what is wrong there.
> Any idea
> > > > or suggestion? Thanks.
> > > >
> > >
> >
>


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh


BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop 
common dependency. So I'm not sure why Flink throws such exception. Based on 
Flink doc, I suppose that Flink binary doesn't include Hadoop dependencies, 
right?

Based on the exception, looks like when FlinkCatalogFactory (from Iceberg) 
calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders are 
different and referring to different Hadoop Configuration Class objects.

I'm not familiar with Flink. So I'm wondering what step is wrong during the 
testing? It is a pretty simple test to verify Iceberg and Flink.

On 2021/08/21 08:50:05, L. C. Hsieh  wrote: 
> 
> Thanks for replying.
> 
> I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12 actually.
> 
> Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java application. I 
> got new exception as below:
> 
> java.lang.LinkageError: loader constraint violation: when resolving method 
> "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
>  the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) 
> of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the 
> class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's 
> defining class, org/apache/flink/runtime/util/HadoopUtils, have different 
> Class objects for the type org/apache/hadoop/conf/Configuration used in the 
> signature at 
> org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)
> 
> 
> On 2021/08/21 08:11:33, Manong Karl  wrote: 
> > Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> > 
> > L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> > 
> > > Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S
> > > cluster locally and submit a simple Java program that writes out Iceberg
> > > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > > exception:
> > >
> > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at
> > > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > > at
> > > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > > at
> > > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> > >
> > > The uploaded is a fat jar. I also checked the uploaded application jar. It
> > > has the Configuration class. So I don't know what is wrong there. Any idea
> > > or suggestion? Thanks.
> > >
> > 
> 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh


Thanks for replying.

I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12 actually.

Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java application. I 
got new exception as below:

java.lang.LinkageError: loader constraint violation: when resolving method 
"org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
 the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of 
the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the class 
loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining 
class, org/apache/flink/runtime/util/HadoopUtils, have different Class objects 
for the type org/apache/hadoop/conf/Configuration used in the signature at 
org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)


On 2021/08/21 08:11:33, Manong Karl  wrote: 
> Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> 
> L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> 
> > Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S
> > cluster locally and submit a simple Java program that writes out Iceberg
> > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > exception:
> >
> > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > at
> > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > at
> > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> >
> > The uploaded is a fat jar. I also checked the uploaded application jar. It
> > has the Configuration class. So I don't know what is wrong there. Any idea
> > or suggestion? Thanks.
> >
> 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread Manong Karl
Iceberg v0.11 or v0.12 not capable with flink v1.13.x.

L. C. Hsieh  于2021年8月21日周六 下午3:52写道:

> Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S
> cluster locally and submit a simple Java program that writes out Iceberg
> table (https://github.com/spancer/flink-iceberg-demo). But got an
> exception:
>
> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at
> org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> at
> org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> at
> com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
>
> The uploaded is a fat jar. I also checked the uploaded application jar. It
> has the Configuration class. So I don't know what is wrong there. Any idea
> or suggestion? Thanks.
>


Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh
Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S 
cluster locally and submit a simple Java program that writes out Iceberg table 
(https://github.com/spancer/flink-iceberg-demo). But got an exception:

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at 
org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
 at org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49) 
at 
com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)

The uploaded is a fat jar. I also checked the uploaded application jar. It has 
the Configuration class. So I don't know what is wrong there. Any idea or 
suggestion? Thanks.


有没有flink on k8s operator相关的中文使用资料

2021-08-11 Thread casel.chen
最近想了解怎么使用flink on k8s operator,查了下业内有lyft和google的,有没有这方面的中文使用资料?
另外想知道ververica platform是不是也是基于flink on k8s operator开发的?社区版是否开源以支持二次开发?

Re: question about flink on k8s per-job mode

2021-06-21 Thread Yang Wang
如果不严格区分名字的话,application模式和per-job模式在K8s环境下是没有差异的,都是在JM完成的任务提交
目前的K8s application mode在高可用开启的情况下,也只能支持提交一个任务

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode

Best,
Yang


Best,
Yang

at003  于2021年6月17日周四 下午4:51写道:

> 哈喽,各位专家/大神:
>
> 为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。
>
> 谢谢
>
>
>
>


question about flink on k8s per-job mode

2021-06-17 Thread at003
哈喽,各位专家/大神:


为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。


谢谢

Re: Metric counter gets reset when leader jobmanager changes in Flink native K8s HA solution

2021-06-14 Thread Prasanna kumar
amit,

This is expected behaviour from counter . If the total count irrespective
of the restarts needed to be found, aggregate functions need to be applied
on the counter . Example  sum(Rate(counter))
https://prometheus.io/docs/prometheus/latest/querying/functions/

Prasanna.

On Tue, Jun 15, 2021 at 8:25 AM Amit Bhatia 
wrote:

> Hi,
>
> We have configured jobmanager HA with flink 1.12.1 on the k8s environment.
> We have implemented a HA solution using Native K8s HA solution (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink).
> We have used deployment controller for both jobmanager & taskmanager pods.
>
> So whenever a leader jobmanager crashes and the same jobmanager becomes
> leader again then everything works fine but whenever a leader jobmanager
> crashes and some other standby jobmanager becomes leader then metric count
> gets reset and it starts the request count again from 1. Is it the expected
> behaviour ? or is there any specific configuration required so that even if
> the leader jobmanager changes then instead of resetting the metric count it
> continues the count.
>
> Regards,
> Amit
>


Metric counter gets reset when leader jobmanager changes in Flink native K8s HA solution

2021-06-14 Thread Amit Bhatia
Hi,

We have configured jobmanager HA with flink 1.12.1 on the k8s environment.
We have implemented a HA solution using Native K8s HA solution (
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink).
We have used deployment controller for both jobmanager & taskmanager pods.

So whenever a leader jobmanager crashes and the same jobmanager becomes
leader again then everything works fine but whenever a leader jobmanager
crashes and some other standby jobmanager becomes leader then metric count
gets reset and it starts the request count again from 1. Is it the expected
behaviour ? or is there any specific configuration required so that even if
the leader jobmanager changes then instead of resetting the metric count it
continues the count.

Regards,
Amit


Re: Flink in k8s operators list

2021-05-31 Thread Svend
Hi Ilya,

Thanks for the kind feed-back.

We hit the first issue you mention related to K8s 1.18+, we then updated the 
controller-gen version to 0.2.4 in the makefile as described in the ticket you 
linked, and then ran "make deploy", which worked around the issue for us.

I'm not aware of the 2nd issue you refer to related to in-progress job? In case 
that helps, we access the Flink-UI by simply opening a port-forward on port 
8081 on the job manager, which among other things shows the currently running 
jobs.

Svend


On Mon, 31 May 2021, at 12:00 PM, Ilya Karpov wrote:
> Hi Svend,
> 
> thank you so much to sharing your experience! GCP k8s operator looks 
> promising (currently i’m trying to build it and run helm chart. An issue 
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/266> 
> with k8s version 1.18+ is road block right now, but I see that there is a 
> solution), and also seems like flink team also refers 
> <http://disq.us/p/2f7goq6> to it this implementation.
> 
> In your setup did you solve the problem of visualising list of in-progress 
> jobs?
> 
> > One worrying point though is that the maintainers of the repo seem to have 
> > become silent in March this year.
> lyfts implementation <https://github.com/lyft/flinkk8soperator> (haven’t 
> tried it yet) seems to be even more abandoned (last release 20/04/2020).
> 
>> 29 мая 2021 г., в 11:23, Svend  написал(а):
>> 
>> Hi Ilya,
>> 
>> At my company we're currently using the GCP k8s operator (2nd on your list). 
>> Our usage is very moderate, but so far it works great for us.
>> 
>> We appreciate that when upgrading the application, it triggers automatically 
>> a savepoint during shutdown and resumes from it when restarting. It also 
>> allows to take savepoints at regular intervals (we take one per day 
>> currently).
>> 
>> We're using it with Flink 1.12.4 and AWS EKS.
>> 
>> Getting the Flink metrics and logs exported to our monitoring system worked 
>> out of the box. 
>> 
>> Configuring IAM roles and K8s service account for saving checkpoints and 
>> savepoints to S3 required a bit more fiddling although we got it working. 
>> 
>> Happy to share code snippet about any of that if that's useful :)
>> 
>> It was last updated with Flink 1.11 in mind, so there is currently no 
>> built-in support for the reactive scaling mode recently added in Flink 1.13.
>> 
>> One worrying point though is that the maintainers of the repo seem to have 
>> become silent in March this year. There is a small and active community 
>> around it though and issues and PRs keep on arriving and are waiting for 
>> feed-back. It's all free and OSS, so who are we to complain? Though it's 
>> still an important attention point.
>> 
>> Hope this helps,
>> 
>> Svend
>> 
>> 
>> 
>> 
>> 
>> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
>>> Hi there,
>>> 
>>> I’m making a little research about the easiest way to deploy link job to 
>>> k8s cluster and manage its lifecycle by *k8s operator*. The list of 
>>> solutions is below:
>>> - https://github.com/fintechstudios/ververica-platform-k8s-operator
>>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
>>> - https://kudo.dev/docs/examples/apache-flink.html
>>> - https://github.com/wangyang0918/flink-native-k8s-operator
>>> 
>>> If you are using smth that is not listed above please share! Any share 
>>> about how specific solution works is greatly appreciated.
>>> 
>>> Thanks in advance


Re: Flink in k8s operators list

2021-05-31 Thread Ilya Karpov
Hi Svend,

thank you so much to sharing your experience! GCP k8s operator looks promising 
(currently i’m trying to build it and run helm chart. An issue 
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/266> with 
k8s version 1.18+ is road block right now, but I see that there is a solution), 
and also seems like flink team also refers <http://disq.us/p/2f7goq6> to it 
this implementation.

In your setup did you solve the problem of visualising list of in-progress jobs?

> One worrying point though is that the maintainers of the repo seem to have 
> become silent in March this year.
lyfts implementation <https://github.com/lyft/flinkk8soperator> (haven’t tried 
it yet) seems to be even more abandoned (last release 20/04/2020).

> 29 мая 2021 г., в 11:23, Svend  написал(а):
> 
> Hi Ilya,
> 
> At my company we're currently using the GCP k8s operator (2nd on your list). 
> Our usage is very moderate, but so far it works great for us.
> 
> We appreciate that when upgrading the application, it triggers automatically 
> a savepoint during shutdown and resumes from it when restarting. It also 
> allows to take savepoints at regular intervals (we take one per day 
> currently).
> 
> We're using it with Flink 1.12.4 and AWS EKS.
> 
> Getting the Flink metrics and logs exported to our monitoring system worked 
> out of the box. 
> 
> Configuring IAM roles and K8s service account for saving checkpoints and 
> savepoints to S3 required a bit more fiddling although we got it working. 
> 
> Happy to share code snippet about any of that if that's useful :)
> 
> It was last updated with Flink 1.11 in mind, so there is currently no 
> built-in support for the reactive scaling mode recently added in Flink 1.13.
> 
> One worrying point though is that the maintainers of the repo seem to have 
> become silent in March this year. There is a small and active community 
> around it though and issues and PRs keep on arriving and are waiting for 
> feed-back. It's all free and OSS, so who are we to complain? Though it's 
> still an important attention point.
> 
> Hope this helps,
> 
> Svend
> 
> 
> 
> 
> 
> On Fri, 28 May 2021, at 9:09 AM, Ilya Karpov wrote:
>> Hi there,
>> 
>> I’m making a little research about the easiest way to deploy link job to k8s 
>> cluster and manage its lifecycle by k8s operator. The list of solutions is 
>> below:
>> - https://github.com/fintechstudios/ververica-platform-k8s-operator 
>> <https://github.com/fintechstudios/ververica-platform-k8s-operator>
>> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator 
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator>
>> - https://kudo.dev/docs/examples/apache-flink.html 
>> <https://kudo.dev/docs/examples/apache-flink.html>
>> - https://github.com/wangyang0918/flink-native-k8s-operator 
>> <https://github.com/wangyang0918/flink-native-k8s-operator>
>> 
>> If you are using smth that is not listed above please share! Any share about 
>> how specific solution works is greatly appreciated.
>> 
>> Thanks in advance



  1   2   >