Re: Flink k8s operator unstable deployment

2024-08-31 Thread Naci Simsek
Hi Arthur,In your initial mail, it was seen an explicit job id set:$internal.pipeline.job-id, 044d28b712536c1d1feed3475f2b8111This might be the reason of duplicatedJobSubmission exception.In the job config on your last reply, I could not see such setting. You could verify from the JM logs that when the job needs to get resubmitted due to job failure, a randomized job-id must be defined by the operator.About HA, you are right, it is benefitial to use it for production in order for job to keep track of the checkpoints and help to recover from last checkpoint automatically. It is maybe not clearly stated in Flink doc for Kubernetes HA but in K8s HA case, there is always only 1 JM pod exists, there is NO standby replicated JM pods created. HA only works here as a job store, which this information related to the latest snapshot is getting passed to the newly created JM pod if existing one dies, to recover the job automatically.Regards,NaciOn 30. Aug 2024, at 15:59, Arthur Catrisse  wrote:kubernetes.jobmanager.replicas, 1
execution.submit-failed-job-on-application-error, true
high-availability.cluster-id, my_name
kubernetes.jobmanager.cpu.limit-factor, 10
pipeline.max-parallelism, 6
kubernetes.service-account, flink
kubernetes.cluster-id, my_name
high-availability.storageDir, s3://my_bucket/recovery
taskmanager.memory.flink.size, 1024m
parallelism.default, 1
kubernetes.namespace, flink
fs.s3a.aws.credentials.provider, com.amazonaws.auth.DefaultAWSCredentialsProviderChain
kubernetes.jobmanager.owner.reference, apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-0,name:my_name,controller:false,blockOwnerDeletion:true
state.backend.type, rocksdb
kubernetes.container.image.ref, 0.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref
jobmanager.memory.flink.size, 1024m
taskmanager.memory.process.size, 2048m
kubernetes.internal.jobmanager.entrypoint.class, org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
pipeline.name, my_name
execution.savepoint.path, s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0
state.backend.local-recovery, false
state.backend.rocksdb.localdir, /opt/flink/state
kubernetes.pod-template-file.taskmanager, /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml
state.backend.incremental, true
web.cancel.enable, false
execution.shutdown-on-application-finish, false
job-result-store.delete-on-commit, false
$internal.pipeline.job-id, 044d28b712536c1d1feed3475f2b8111
taskmanager.memory.managed.fraction, 0.6
$internal.flink.version, v1_19
execution.checkpointing.max-concurrent-checkpoints, 1
kubernetes.pod-template-file.jobmanager, /tmp/flink_op_generated_podTemplate_834737432685891333.yaml
blob.server.port, 6102
kubernetes.jobmanager.annotations, flinkdeployment.flink.apache.org/generation:5
job-result-store.storage-path, s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000
fs.allowed-fallback-filesystems, s3
high-availability.type, kubernetes
state.savepoints.dir, s3://my_bucket/savepoints
$internal.application.program-args, -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM
taskmanager.numberOfTaskSlots, 2
kubernetes.rest-service.exposed.type, ClusterIP
high-availability.jobmanager.port, 6101
process.working-dir, /tmp/workdir
$internal.application.main, org.apache.flink.client.python.PythonDriver
execution.target, kubernetes-application
jobmanager.memory.process.size, 2048m
taskmanager.rpc.port, 6100
internal.cluster.execution-mode, NORMAL
kubernetes.jobmanager.tolerations, key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION
pipeline.jars, local:///opt/flink/opt/flink-python-1.19.0.jar
state.checkpoints.dir, s3://my_bucket/checkpoints
jobmanager.memory.off-heap.size, 134217728b
jobmanager.memory.jvm-overhead.min, 805306368b
jobmanager.memory.jvm-metaspace.size, 268435456b
jobmanager.memory.heap.size, 939524096b
jobmanager.memory.jvm-overhead.max, 805306368b
Example of logs when failingLast pod before error :INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..(nothing else after, while other times, when no fail, we have among others terminating logs e.gorg.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.etc)DuplicateJobSubmission Error crashloop :2024-08-26 09:16:36,385 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted.2024-08-26 09:16:36,385 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=e

Re: Flink k8s operator unstable deployment

2024-08-30 Thread Arthur Catrisse via user
Hi Naci,
Thanks for your answer.

We do not explicitly define the job-id. As we are using
the flink-kubernetes-operator, I suppose it's the operator handling this ID.
The job is defined in the FlinkDeployment charts, where we have specs for
jobmanager, taskmanager and the job :
job:
jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py",
"/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"]
parallelism: 1
upgradeMode: savepoint
state: running

We are testing without HA right now, can't reproduce the error yet, maybe
this could indeed be linked. But wouldn't we need to keep HA anyways when
in production ?
On the topic of HA, we have tried enabling it by adding
*high-availability.type:
kubernetes* & *high-availability.storageDir, *but even when we define*
kubernetes.jobmanager.replicas:
2*, the deployment interprets it at *1* anyways. Anything we could be
missing ?

Our charts :
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-job
spec:
image: {{ .Values.global.image.registry}}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
flinkVersion: v1_19
ingress:
template: "{{ .Values.ingress.host }}/my-job/"
className: "traefik-traefik-ingress"
annotations:
traefik.ingress.kubernetes.io/router.entrypoints: "private"
traefik.ingress.kubernetes.io/router.middlewares:
"traefik-hstssecureheaders@kubernetescrd, flink-strip-prefix@kubernetescrd"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
kubernetes.operator.savepoint.history.max.age: "72 h"
kubernetes.operator.savepoint.history.max.count: "3"
kubernetes.operator.jm-deployment-recovery.enabled: "true"
kubernetes.operator.cluster.health-check.enabled: "true"
kubernetes.operator.job.restart.failed: "true"
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.stabilization.interval: 1m
kubernetes.operator.job.autoscaler.metrics.window: 5m
kubernetes.operator.job.autoscaler.target.utilization: "0.6"
kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
kubernetes.operator.job.autoscaler.restart.time: 2m
kubernetes.operator.job.autoscaler.catch-up.duration: 5m
pipeline.max-parallelism: "6"
kubernetes.jobmanager.replicas: "2"
kubernetes.jobmanager.tolerations:
key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
serviceAccount: flink
jobManager:
podTemplate:
spec:
terminationGracePeriodSeconds: 600
containers:
- image: {{ .Values.global.image.registry}}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
name: flink-main-container
resources:
limits:
memory: "2048Mi"
requests:
memory: "2048Mi"
cpu: "1"
replicas: 2
nodeSelector:
churn-rate: low
tolerations:
- key: dedicated
value: low-churn
operator: Equal
effect: NoSchedule
envFrom:
- secretRef:
name: flink-external-secrets
taskManager:
podTemplate:
spec:
terminationGracePeriodSeconds: 600
securityContext:
runAsGroup: 
runAsUser: 
fsGroup: 
containers:
- image: {{ .Values.global.image.registry }}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
name: flink-main-container
resources:
limits:
memory: "2048Mi"
requests:
memory: "2048Mi"
cpu: "0.01"
replicas: 1
nodeSelector:
churn-rate: low
tolerations:
- key: dedicated
value: low-churn
operator: Equal
effect: NoSchedule
podTemplate:
metadata:
labels:
app.kubernetes.io/part-of: flink
job:
jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py",
"/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"]
parallelism: 1
upgradeMode: savepoint
state: {{ .Values.jobs.my_job.state }}
and some extra variables

taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 2048m
taskmanager.memory.flink.size: 1024m
jobmanager.memory.flink.size: 1024m
taskmanager.memory.managed.fraction: 0.6
kubernetes.jobmanager.cpu.limit-factor: 10
state.backend.type: rocksdb
state.backend.rocksdb.localdir: /opt/flink/state
state.backend.incremental: true
state.backend.local-recovery: false
execution.checkpointing.max-concurrent-checkpoints: 1
high-availability.type: kubernetes
fs.s3a.aws.credentials.provider:
com.amazonaws.auth.DefaultAWSCredentialsProviderChain
fs.allowed-fallback-filesystems: s3
high-availability.storageDir: s3://my-bucket/recovery
state.savepoints.dir: s3://my-bucket/savepoints
state.checkpoints.dir: s3://my-bucket/checkpoints
process.working-dir: /tmp/workdir

Best regards,
Arthur


On Wed, Aug 28, 2024 at 11:03 PM Naci Simsek  wrote:

> Hi Arthur,
>
> How you submit your job? Are you explicitly setting job id when submitting
> the job?
> Have you also tried without HA to see the behavior?
>
> Looks like the job is submitted with the same ID with the previous job,
> which the job result stored in HA does not let you submit it with the same
> job_id.
>
> BR,
> Naci
>
> On 28. Aug 2024, at 17:32, Arthur 

Re: Flink k8s operator unstable deployment

2024-08-28 Thread Naci Simsek
Hi Arthur,How you submit your job? Are you explicitly setting job id when submitting the job?Have you also tried without HA to see the behavior?Looks like the job is submitted with the same ID with the previous job, which the job result stored in HA does not let you submit it with the same job_id.BR,NaciOn 28. Aug 2024, at 17:32, Arthur Catrisse via user  wrote:Hello,We are running into issues when deploying flink on kubernetes using the flink-kubernetes-operator with a FlinkDeployment
Occasionally, when a JobManager gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loop by a DuplicateJobSubmissionException
We did increase the terminationGracePeriodSeconds but it doesn't seem to help.Is it expected that the operator isn't able to get jobmanagers back into a stable state ?  Do you have an idea if we're missing something ?Here are our flink configurations :kubernetes.jobmanager.replicas, 1
execution.submit-failed-job-on-application-error, true
high-availability.cluster-id, my_name
kubernetes.jobmanager.cpu.limit-factor, 10
pipeline.max-parallelism, 6
kubernetes.service-account, flink
kubernetes.cluster-id, my_name
high-availability.storageDir, s3://my_bucket/recovery
taskmanager.memory.flink.size, 1024m
parallelism.default, 1
kubernetes.namespace, flink
fs.s3a.aws.credentials.provider, com.amazonaws.auth.DefaultAWSCredentialsProviderChain
kubernetes.jobmanager.owner.reference, apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-0,name:my_name,controller:false,blockOwnerDeletion:true
state.backend.type, rocksdb
kubernetes.container.image.ref, 0.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref
jobmanager.memory.flink.size, 1024m
taskmanager.memory.process.size, 2048m
kubernetes.internal.jobmanager.entrypoint.class, org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
pipeline.name, my_name
execution.savepoint.path, s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0
state.backend.local-recovery, false
state.backend.rocksdb.localdir, /opt/flink/state
kubernetes.pod-template-file.taskmanager, /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml
state.backend.incremental, true
web.cancel.enable, false
execution.shutdown-on-application-finish, false
job-result-store.delete-on-commit, false
$internal.pipeline.job-id, 044d28b712536c1d1feed3475f2b8111
taskmanager.memory.managed.fraction, 0.6
$internal.flink.version, v1_19
execution.checkpointing.max-concurrent-checkpoints, 1
kubernetes.pod-template-file.jobmanager, /tmp/flink_op_generated_podTemplate_834737432685891333.yaml
blob.server.port, 6102
kubernetes.jobmanager.annotations, flinkdeployment.flink.apache.org/generation:5
job-result-store.storage-path, s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000
fs.allowed-fallback-filesystems, s3
high-availability.type, kubernetes
state.savepoints.dir, s3://my_bucket/savepoints
$internal.application.program-args, -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM
taskmanager.numberOfTaskSlots, 2
kubernetes.rest-service.exposed.type, ClusterIP
high-availability.jobmanager.port, 6101
process.working-dir, /tmp/workdir
$internal.application.main, org.apache.flink.client.python.PythonDriver
execution.target, kubernetes-application
jobmanager.memory.process.size, 2048m
taskmanager.rpc.port, 6100
internal.cluster.execution-mode, NORMAL
kubernetes.jobmanager.tolerations, key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION
pipeline.jars, local:///opt/flink/opt/flink-python-1.19.0.jar
state.checkpoints.dir, s3://my_bucket/checkpoints
jobmanager.memory.off-heap.size, 134217728b
jobmanager.memory.jvm-overhead.min, 805306368b
jobmanager.memory.jvm-metaspace.size, 268435456b
jobmanager.memory.heap.size, 939524096b
jobmanager.memory.jvm-overhead.max, 805306368b
Example of logs when failingLast pod before error :INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..(nothing else after, while other times, when no fail, we have among others terminating logs e.gorg.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.etc)DuplicateJobSubmission Error crashloop :2024-08-26 09:16:36,385 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted.2024-08-26 09:16:36,385 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb.2024-08-26 09:1

Re: Flink K8S operator does not support IPv6

2023-09-05 Thread Xiaolong Wang
FYI, adding environment variables of `
KUBERNETES_DISABLE_HOSTNAME_VERIFICATION=true` works for me.

This env variable needs to be added to both the Flink operator and the
Flink job definition.

On Tue, Aug 8, 2023 at 12:03 PM Xiaolong Wang 
wrote:

> Ok, thank you.
>
> On Tue, Aug 8, 2023 at 11:22 AM Peter Huang 
> wrote:
>
>> We will handle it asap. Please check the status of this jira
>> https://issues.apache.org/jira/browse/FLINK-32777
>>
>> On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
>> wrote:
>>
>>> Hi,
>>>
>>> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
>>> the below issues:
>>>
>>> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
 fd70:e66a:970d::1 not verified:*

 *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*

 *DN: CN=kube-apiserver*

 *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
 c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
 ,
 ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
 kubernetes.default, kubernetes.default.svc,
 kubernetes.default.svc.cluster.local]*

>>>
>>> Which seemed to be related to a known issue
>>>  of okhttp.
>>>
>>> I'm wondering if there is a plan to support IPv6 for
>>> flink-kubernetes-operator in the near future ?
>>>
>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
There is no effect of the replicas setting in native mode. Native session
clusters are "elastic", the number of task managers are determined on the
fly based on the job requirements.

Gyula

On Thu, Aug 31, 2023 at 11:19 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thank you for the response.
>
> Yes currently in my PoC I'm using standalone integration.
> Does ` spec.taskManager.replicas` has any effect when using native mode?
>
> The reason I'm asking is that I need to know what is the "cacpacity" of
> particular session cluster before I will submit the job into it.
> And the way how I was doing this was this:
>
> try (KubernetesClient kubernetesClient = new
> KubernetesClientBuilder().build()) {
> MixedOperation KubernetesResourceList,
> io.fabric8.kubernetes.client.dsl.Resource>
> resources =
> kubernetesClient.resources(FlinkDeployment.class);
>
> List items =
> resources.inNamespace("default").list().getItems();
> for (FlinkDeployment item : items) {
> System.out.println("Flink Deployments: " + item);
> System.out.println("Number of TM replicas: " +
> item.getSpec().getTaskManager().getReplicas());
> }
> }
>
>
> Thanks,
> Krzysztof
>
> czw., 31 sie 2023 o 10:44 Gyula Fóra  napisał(a):
>
>> I guess your question is in the context of the standalone integration
>> because native session deployments automatically add TMs on the fly as more
>> are necessary.
>>
>> For standalone mode you should be able to configure
>> `spec.taskManager.replicas` and if I understand correctly that will not
>> shut down the running jobs.
>> If you have problems please share your FlinkDeployment yaml and the
>> operator logs in a JIRA ticket.
>>
>> In any case the native mode is probably better fit for your use-case.
>>
>> Gyula
>>
>> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Just want to broth this up in case it was missed in the other
>>> messages/queries :)
>>>
>>> TL:DR
>>> How to add TM to Flink Session cluster via Java K8s client if Session
>>> Cluster has running jobs?
>>>
>>> Thanks,
>>> Krzysztof
>>>
>>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> napisał(a):
>>>
 Hi community,
 I have a use case where I would like to add an extra TM) to a running
 Flink session cluster that has Flink jobs deployed. Session cluster
 creation, job submission and cluster patching is done using flink k8s
 operator Java API. The Details of this are presented here [1]

 I would like to ask, what is a recommended path to add a TM to existing
 Session Cluster that currently runs number of Flink jobs using Java API.
 For simplicity lets assume that I dont want to resume jobs from a
 savepoint, just redeploy them.

 When executing steps from [1] I'm facing an issue where Session jobs
 are not redeployed on patched Session cluster however kubectl shows that
 there is FlinkSessionJob subbmited to the k8s.

 Additionally when I'm trying to delete FlinkSessionJob from kubectl,
 Flink k8s operator throws an exception described in [1]. In fact the state
 of that Flink deployment requires few steps to clean it up after that
 patch.


 [1]
 https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD

>>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Krzysztof Chmielewski
Thank you for the response.

Yes currently in my PoC I'm using standalone integration.
Does ` spec.taskManager.replicas` has any effect when using native mode?

The reason I'm asking is that I need to know what is the "cacpacity" of
particular session cluster before I will submit the job into it.
And the way how I was doing this was this:

try (KubernetesClient kubernetesClient = new
KubernetesClientBuilder().build()) {
MixedOperation,
io.fabric8.kubernetes.client.dsl.Resource>
resources =
kubernetesClient.resources(FlinkDeployment.class);

List items =
resources.inNamespace("default").list().getItems();
for (FlinkDeployment item : items) {
System.out.println("Flink Deployments: " + item);
System.out.println("Number of TM replicas: " +
item.getSpec().getTaskManager().getReplicas());
}
}


Thanks,
Krzysztof

czw., 31 sie 2023 o 10:44 Gyula Fóra  napisał(a):

> I guess your question is in the context of the standalone integration
> because native session deployments automatically add TMs on the fly as more
> are necessary.
>
> For standalone mode you should be able to configure
> `spec.taskManager.replicas` and if I understand correctly that will not
> shut down the running jobs.
> If you have problems please share your FlinkDeployment yaml and the
> operator logs in a JIRA ticket.
>
> In any case the native mode is probably better fit for your use-case.
>
> Gyula
>
> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Just want to broth this up in case it was missed in the other
>> messages/queries :)
>>
>> TL:DR
>> How to add TM to Flink Session cluster via Java K8s client if Session
>> Cluster has running jobs?
>>
>> Thanks,
>> Krzysztof
>>
>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> napisał(a):
>>
>>> Hi community,
>>> I have a use case where I would like to add an extra TM) to a running
>>> Flink session cluster that has Flink jobs deployed. Session cluster
>>> creation, job submission and cluster patching is done using flink k8s
>>> operator Java API. The Details of this are presented here [1]
>>>
>>> I would like to ask, what is a recommended path to add a TM to existing
>>> Session Cluster that currently runs number of Flink jobs using Java API.
>>> For simplicity lets assume that I dont want to resume jobs from a
>>> savepoint, just redeploy them.
>>>
>>> When executing steps from [1] I'm facing an issue where Session jobs are
>>> not redeployed on patched Session cluster however kubectl shows that there
>>> is FlinkSessionJob subbmited to the k8s.
>>>
>>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>>> Flink k8s operator throws an exception described in [1]. In fact the state
>>> of that Flink deployment requires few steps to clean it up after that
>>> patch.
>>>
>>>
>>> [1]
>>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>>
>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
I guess your question is in the context of the standalone integration
because native session deployments automatically add TMs on the fly as more
are necessary.

For standalone mode you should be able to configure
`spec.taskManager.replicas` and if I understand correctly that will not
shut down the running jobs.
If you have problems please share your FlinkDeployment yaml and the
operator logs in a JIRA ticket.

In any case the native mode is probably better fit for your use-case.

Gyula

On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Just want to broth this up in case it was missed in the other
> messages/queries :)
>
> TL:DR
> How to add TM to Flink Session cluster via Java K8s client if Session
> Cluster has running jobs?
>
> Thanks,
> Krzysztof
>
> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi community,
>> I have a use case where I would like to add an extra TM) to a running
>> Flink session cluster that has Flink jobs deployed. Session cluster
>> creation, job submission and cluster patching is done using flink k8s
>> operator Java API. The Details of this are presented here [1]
>>
>> I would like to ask, what is a recommended path to add a TM to existing
>> Session Cluster that currently runs number of Flink jobs using Java API.
>> For simplicity lets assume that I dont want to resume jobs from a
>> savepoint, just redeploy them.
>>
>> When executing steps from [1] I'm facing an issue where Session jobs are
>> not redeployed on patched Session cluster however kubectl shows that there
>> is FlinkSessionJob subbmited to the k8s.
>>
>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>> Flink k8s operator throws an exception described in [1]. In fact the state
>> of that Flink deployment requires few steps to clean it up after that
>> patch.
>>
>>
>> [1]
>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>
>


Re: flink k8s operator - problem with patching seession cluster

2023-08-30 Thread Krzysztof Chmielewski
Just want to broth this up in case it was missed in the other
messages/queries :)

TL:DR
How to add TM to Flink Session cluster via Java K8s client if Session
Cluster has running jobs?

Thanks,
Krzysztof

pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi community,
> I have a use case where I would like to add an extra TM) to a running
> Flink session cluster that has Flink jobs deployed. Session cluster
> creation, job submission and cluster patching is done using flink k8s
> operator Java API. The Details of this are presented here [1]
>
> I would like to ask, what is a recommended path to add a TM to existing
> Session Cluster that currently runs number of Flink jobs using Java API.
> For simplicity lets assume that I dont want to resume jobs from a
> savepoint, just redeploy them.
>
> When executing steps from [1] I'm facing an issue where Session jobs are
> not redeployed on patched Session cluster however kubectl shows that there
> is FlinkSessionJob subbmited to the k8s.
>
> Additionally when I'm trying to delete FlinkSessionJob from kubectl, Flink
> k8s operator throws an exception described in [1]. In fact the state of
> that Flink deployment requires few steps to clean it up after that patch.
>
>
> [1]
> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>


Re: Flink k8s operator - managde from java microservice

2023-08-16 Thread Yaroslav Tkachenko
Hi Krzysztof,

You may want to check flink-kubernetes-operator-api (
https://mvnrepository.com/artifact/org.apache.flink/flink-kubernetes-operator-api),
here's an example for reading FlinkDeployments
https://github.com/sap1ens/heimdall/blob/main/src/main/java/com/sap1ens/heimdall/kubernetes/FlinkDeploymentClient.java

And you can use standard kubernetes client methods for creating resources,
e.g. "createOrReplace".

On Wed, Aug 16, 2023 at 5:33 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> I have a use case where I would like to run Flink jobs using Apache Flink
> k8s operator.
> Where actions like job submission (new and from save point), Job cancel
> with save point, cluster creations will be triggered from Java based micro
> service.
>
> Is there any recommended/Dedicated Java API for Flink k8s operator?
> I see that there is standard k8s java client [1] and I'm wondering if this
> would be sufficient for Flink jobs.
>
> [1] https://github.com/kubernetes-client/java
>


Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Xiaolong Wang
Ok, thank you.

On Tue, Aug 8, 2023 at 11:22 AM Peter Huang 
wrote:

> We will handle it asap. Please check the status of this jira
> https://issues.apache.org/jira/browse/FLINK-32777
>
> On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
>> the below issues:
>>
>> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>>> fd70:e66a:970d::1 not verified:*
>>>
>>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>>
>>> *DN: CN=kube-apiserver*
>>>
>>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>>> ,
>>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>>> kubernetes.default, kubernetes.default.svc,
>>> kubernetes.default.svc.cluster.local]*
>>>
>>
>> Which seemed to be related to a known issue
>>  of okhttp.
>>
>> I'm wondering if there is a plan to support IPv6 for
>> flink-kubernetes-operator in the near future ?
>>
>


Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Peter Huang
We will handle it asap. Please check the status of this jira
https://issues.apache.org/jira/browse/FLINK-32777

On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
wrote:

> Hi,
>
> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
> the below issues:
>
> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>> fd70:e66a:970d::1 not verified:*
>>
>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>
>> *DN: CN=kube-apiserver*
>>
>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>> ,
>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>> kubernetes.default, kubernetes.default.svc,
>> kubernetes.default.svc.cluster.local]*
>>
>
> Which seemed to be related to a known issue
>  of okhttp.
>
> I'm wondering if there is a plan to support IPv6 for
> flink-kubernetes-operator in the near future ?
>


Re: [Flink K8s Operator] Trigger nonce missing for manual savepoint info

2023-07-12 Thread Gyula Fóra
Maybe you have inconsistent operator / CRC versions? In any case I highly
recommend upgrading to the lates operator version to get all the bug /
security fixes and improvements.

Gyula

On Wed, 12 Jul 2023 at 10:58, Paul Lam  wrote:

> Hi,
>
> I’m using K8s operator 1.3.1 with Flink 1.15.2 on 2 K8s clusters. Weird
> enough, on one K8s cluster the flink deployments would show savepoint
> trigger nonce. while the flink deployments on the other cluster wouldn’t.
>
> The normal output is as follows:
>
> ```
> Last Savepoint:
> Format Type: CANONICAL
> Location:
> hdfs://.../savepoints/61dfcb2fd7946a0001827c55/savepoint-a81885-68d00e5130b8
> Time Stamp: 1689145852437
> Trigger Nonce: 2
> Trigger Type: MANUAL
> ``
>  And the erroneous output is like this (without trigger nonce and format
> type):
> ```
> Last Savepoint:
> Location:
> hdfs://.../savepoints/61dfcb2fd7946a0001827c55/savepoint-31c74e-3347299b84ec
> Time Stamp: 1689150184799
> Trigger Type: MANUAL
> ```
>
> Do you have any ideas about what’s causing this? Thanks!
>
> Best,
> Paul Lam
>
>


Re: [Flink K8s Operator] Automatic cleanup of terminated deployments

2023-05-21 Thread Paul Lam
Hi Gyula,

Thank you and sorry for the late response. 

My use case is that users may run finite jobs (either batch jobs or finite 
stream jobs), leaving a lot of deprecated flink deployments around. I’ve filed 
a ticket[1].

[1] https://issues.apache.org/jira/browse/FLINK-32143

Best,
Paul Lam

> 2023年5月15日 00:14,Gyula Fóra  写道:
> 
> There is no such feature currently, Kubernetes resources usually do not 
> delete themselves :) 
> The problem I see here is by deleting the resource you lose all information 
> about what happened, you won't know if it failed or completed etc.
> What is the use-case you are thinking about?
> 
> If this is something you think would be good to add, please open a JIRA 
> ticket for it. But in any case this will probably merit a dev list discussion.
> 
> Gyula
> 
> On Sun, May 14, 2023 at 11:54 AM Paul Lam  > wrote:
> Hi all,
> 
> Currently, if a job turns into terminated status (e.g. FINISHED or FAILED), 
> the flinkdeployment remains until a manual cleanup is performed. I went 
> through the docs but did not find any way to clean them up automatically. Am 
> I missing something? Thanks!
> 
> Best,
> Paul Lam



Re: [Flink K8s Operator] Automatic cleanup of terminated deployments

2023-05-14 Thread Gyula Fóra
There is no such feature currently, Kubernetes resources usually do not
delete themselves :)
The problem I see here is by deleting the resource you lose all information
about what happened, you won't know if it failed or completed etc.
What is the use-case you are thinking about?

If this is something you think would be good to add, please open a JIRA
ticket for it. But in any case this will probably merit a dev list
discussion.

Gyula

On Sun, May 14, 2023 at 11:54 AM Paul Lam  wrote:

> Hi all,
>
> Currently, if a job turns into terminated status (e.g. FINISHED or
> FAILED), the flinkdeployment remains until a manual cleanup is performed. I
> went through the docs but did not find any way to clean them up
> automatically. Am I missing something? Thanks!
>
> Best,
> Paul Lam
>


Re: Flink K8s operator pod section of CRD

2023-02-24 Thread Őrhidi Mátyás
Yep!

Simple oversight, it was :/

Cheers,
Matyas

On Thu, Feb 23, 2023 at 10:54 PM Gyula Fóra  wrote:

> Hey!
> You are right, these fields could have been of the PodTemplate /
> PodTemplateSpec type (probably PodTemplateSpec is actually better).
>
> I think the reason why we used it is two fold:
>  - Simple oversight :)
>  - Flink itself "expects" the podtemplate in this form for the native
> integration as you can see here:
> https://github.com/apache/flink/blob/master/flink-kubernetes/src/test/resources/testing-pod-template.yaml
>
> I think we could actually change Pod -> PodTemplateSpec without breaking
> the api. Let me think about this and see.
>
> Cheers,
> Gyula
>
> On Fri, Feb 24, 2023 at 1:47 AM Mason Chen  wrote:
>
>> Hi all,
>>
>> Why does the FlinkDeployment CRD refer to the Pod class instead of the
>> PodTemplate class from the fabric8 library? As far as I can tell, the only
>> difference is that the Pod class exposes the PodStatus, which doesn't seem
>> mutable. Thanks in advance!
>>
>> Best,
>> Mason
>>
>


Re: Flink K8s operator pod section of CRD

2023-02-23 Thread Gyula Fóra
Hey!
You are right, these fields could have been of the PodTemplate /
PodTemplateSpec type (probably PodTemplateSpec is actually better).

I think the reason why we used it is two fold:
 - Simple oversight :)
 - Flink itself "expects" the podtemplate in this form for the native
integration as you can see here:
https://github.com/apache/flink/blob/master/flink-kubernetes/src/test/resources/testing-pod-template.yaml

I think we could actually change Pod -> PodTemplateSpec without breaking
the api. Let me think about this and see.

Cheers,
Gyula

On Fri, Feb 24, 2023 at 1:47 AM Mason Chen  wrote:

> Hi all,
>
> Why does the FlinkDeployment CRD refer to the Pod class instead of the
> PodTemplate class from the fabric8 library? As far as I can tell, the only
> difference is that the Pod class exposes the PodStatus, which doesn't seem
> mutable. Thanks in advance!
>
> Best,
> Mason
>


Re: [Flink K8s Operator] flinkdep stays in DEPLOYED and never turns STABLE

2022-12-06 Thread Paul Lam
Thanks a lot for your input, Gyula!

Best,
Paul Lam

> 2022年12月6日 18:38,Gyula Fóra  写道:
> 
> Hi!
> 
> The stable state is not marked in the reconciliation state field but instead 
> using the last stable spec field. Deployed simply means that something is 
> running :)
> 
> The structure of the status is a bit complex to avoid too much redundancy and 
> limit the size and is mostly considered to be internal for the operator.
> 
> For a user facing view of the resource state you can check:  
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#flink-resource-lifecycle
>  
> 
> 
> The status class contains a nice helper method to get the 
> ResourceLifecycleState enum if you want a single condensed status view .
> 
> Cheers 
> Gyula
> 
> On Tue, 6 Dec 2022 at 04:12, Paul Lam  > wrote:
> Hi all,
> 
> I’m trying out Flink K8s operator 1.2 with K8s 1.25 and Flink 1.15. 
> 
> I found kubectl shows that flinkdeployments stay in DEPLOYED like forever 
> (the Flink job status are RUNNING),  but the operator logs shows that the 
> flinkdeployments already turned into STABLE. 
> 
> Is that a known bug or I missed something? Thanks a lot!
> 
> Best,
> Paul Lam
> 
> 



Re: [Flink K8s Operator] flinkdep stays in DEPLOYED and never turns STABLE

2022-12-06 Thread Gyula Fóra
Hi!

The stable state is not marked in the reconciliation state field but
instead using the last stable spec field. Deployed simply means that
something is running :)

The structure of the status is a bit complex to avoid too much redundancy
and limit the size and is mostly considered to be internal for the operator.

For a user facing view of the resource state you can check:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#flink-resource-lifecycle

The status class contains a nice helper method to get the
ResourceLifecycleState enum if you want a single condensed status view .

Cheers
Gyula

On Tue, 6 Dec 2022 at 04:12, Paul Lam  wrote:

> Hi all,
>
> I’m trying out Flink K8s operator 1.2 with K8s 1.25 and Flink 1.15.
>
> I found kubectl shows that flinkdeployments stay in DEPLOYED like forever
> (the Flink job status are RUNNING),  but the operator logs shows that the
> flinkdeployments already turned into STABLE.
>
> Is that a known bug or I missed something? Thanks a lot!
>
> Best,
> Paul Lam
>
>
>


Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Dongwon Kim
Hi Gyula :-)

Okay, we're gonna upgrade to 1.15 and see what happens.

Thanks a lot for the quick feedback and the detailed explanation!

Best,

Dongwon


On Tue, Nov 22, 2022 at 5:57 PM Gyula Fóra  wrote:

> Hi Dongwon!
>
> This error mostly occurs when using Flink 1.14 and the Flink cluster goes
> into a terminal state. If a Flink job is FAILED/FINISHED (such as it
> exhausted the retry strategy), in Flink 1.14 the cluster shuts itself down
> and removes the HA metadata.
>
> In these cases the operator will only see that the cluster completely
> disappeared and there is no HA metadata and it will throw the error you
> mentioned. It does not know what happened and doesn't have any way to
> recover checkpoint information.
>
> This is fixed in Flink 1.15 where even after terminal FAILED/FINISHED
> states, the jobmanager would not shut down. This allows the operator to
> observe this terminal state and actually recover the job even if the HA
> metadata was removed.
>
> To summarize, this is mostly caused by Flink 1.14 behaviour that the
> operator cannot control. Upgrading to 1.15 allows much more robustness and
> should eliminate most of these cases.
>
> Cheers,
> Gyula
>
> On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim  wrote:
>
>> Hi,
>>
>> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
>> flink-1.14.3, we're occasionally facing the following error:
>>
>> Status:
>>>   Cluster Info:
>>> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
>>> Flink - Version:  1.14.3
>>>   Error:  HA metadata not available to restore
>>> from last state. It is possible that the job has finished or terminally
>>> failed, or the configmaps have been deleted. Manual restore required.
>>>   Job Manager Deployment Status:  ERROR
>>>   Job Status:
>>> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
>>> Job Name:  flinktest
>>> Savepoint Info:
>>>   Last Periodic Savepoint Timestamp:  0
>>>   Savepoint History:
>>>   Trigger Id:
>>>   Trigger Timestamp:  0
>>>   Trigger Type:   UNKNOWN
>>> Start Time:   1668660381400
>>> State:RECONCILING
>>> Update Time:  1668994910151
>>>   Reconciliation Status:
>>> Last Reconciled Spec:  ...
>>> Reconciliation Timestamp:  1668660371853
>>> State: DEPLOYED
>>>   Task Manager:
>>> Label Selector:  component=taskmanager,app=flinktest
>>> Replicas:1
>>> Events:
>>>   Type ReasonAge From
>>> Message
>>>    --
>>> ---
>>>   Normal   JobStatusChanged  30m Job
>>> Job status changed from RUNNING to RESTARTING
>>>   Normal   JobStatusChanged  29m Job
>>> Job status changed from RESTARTING to CREATED
>>>   Normal   JobStatusChanged  28m Job
>>> Job status changed from CREATED to RESTARTING
>>>   Warning  Missing   26m JobManagerDeployment
>>> Missing JobManager deployment
>>>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment
>>> HA metadata not available to restore from last state. It is possible that
>>> the job has finished or terminally failed, or the configmaps have been
>>> deleted. Manual restore required.
>>>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
>>> Starting deployment
>>
>>
>> We're happy with the last state mode most of the time, but we face it
>> occasionally.
>>
>> We found that it's not easy to reproduce the problem; we tried to kill
>> JMs and TMs and even shutdown the nodes on which JMs and TMs are running.
>>
>> We also checked that the file size is not zero.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>>


Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Gyula Fóra
Hi Dongwon!

This error mostly occurs when using Flink 1.14 and the Flink cluster goes
into a terminal state. If a Flink job is FAILED/FINISHED (such as it
exhausted the retry strategy), in Flink 1.14 the cluster shuts itself down
and removes the HA metadata.

In these cases the operator will only see that the cluster completely
disappeared and there is no HA metadata and it will throw the error you
mentioned. It does not know what happened and doesn't have any way to
recover checkpoint information.

This is fixed in Flink 1.15 where even after terminal FAILED/FINISHED
states, the jobmanager would not shut down. This allows the operator to
observe this terminal state and actually recover the job even if the HA
metadata was removed.

To summarize, this is mostly caused by Flink 1.14 behaviour that the
operator cannot control. Upgrading to 1.15 allows much more robustness and
should eliminate most of these cases.

Cheers,
Gyula

On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim  wrote:

> Hi,
>
> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
> flink-1.14.3, we're occasionally facing the following error:
>
> Status:
>>   Cluster Info:
>> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
>> Flink - Version:  1.14.3
>>   Error:  HA metadata not available to restore
>> from last state. It is possible that the job has finished or terminally
>> failed, or the configmaps have been deleted. Manual restore required.
>>   Job Manager Deployment Status:  ERROR
>>   Job Status:
>> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
>> Job Name:  flinktest
>> Savepoint Info:
>>   Last Periodic Savepoint Timestamp:  0
>>   Savepoint History:
>>   Trigger Id:
>>   Trigger Timestamp:  0
>>   Trigger Type:   UNKNOWN
>> Start Time:   1668660381400
>> State:RECONCILING
>> Update Time:  1668994910151
>>   Reconciliation Status:
>> Last Reconciled Spec:  ...
>> Reconciliation Timestamp:  1668660371853
>> State: DEPLOYED
>>   Task Manager:
>> Label Selector:  component=taskmanager,app=flinktest
>> Replicas:1
>> Events:
>>   Type ReasonAge From
>> Message
>>    --
>> ---
>>   Normal   JobStatusChanged  30m Job
>> Job status changed from RUNNING to RESTARTING
>>   Normal   JobStatusChanged  29m Job
>> Job status changed from RESTARTING to CREATED
>>   Normal   JobStatusChanged  28m Job
>> Job status changed from CREATED to RESTARTING
>>   Warning  Missing   26m JobManagerDeployment
>> Missing JobManager deployment
>>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment
>> HA metadata not available to restore from last state. It is possible that
>> the job has finished or terminally failed, or the configmaps have been
>> deleted. Manual restore required.
>>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
>> Starting deployment
>
>
> We're happy with the last state mode most of the time, but we face it
> occasionally.
>
> We found that it's not easy to reproduce the problem; we tried to kill JMs
> and TMs and even shutdown the nodes on which JMs and TMs are running.
>
> We also checked that the file size is not zero.
>
> Thanks,
>
> Dongwon
>
>
>


Re: Flink k8s Operator on AWS?

2022-06-27 Thread Matt Casters
The problem was a misconfiguration of the initContainer which would copy my
artifacts from s3 to an ephemeral volume.  This caused the task manager to
get started for a bit and then to be shut down.  It was hard to get logging
about this since the pods were gone before I could get logging from it.  I
chalk all that up to just me lacking a bit of experience with k8s.

That being said... It's all working now and I documented the deployment
over here:

https://hop.apache.org/manual/next/pipeline/beam/flink-k8s-operator-running-hop-pipeline.html

A big thank you to everyone that helped me out!

Cheers,
Matt

On Mon, Jun 27, 2022 at 4:59 AM Yang Wang  wrote:

> Could you please share the JobManager logs of failed deployment? It will
> also help a lot if you could show the pending pod status via "kubectl
> describe ".
>
> Given that the current Flink Kubernetes Operator is built on top of native
> K8s integration[1], the Flink ResourceManager should allocate enough
> TaskManager pods automatically.
> We need to find out what is wrong via the logs. Maybe the service account
> or taint or something else.
>
>
> [1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
>
>
> Best,
> Yang
>
> Matt Casters  于2022年6月24日周五 23:48写道:
>
>> Yes of-course.  I already feel a bit less intelligent for having asked
>> the question ;-)
>>
>> The status now is that I managed to have it all puzzled together.
>> Copying the files from s3 to an ephemeral volume takes all of 2 seconds so
>> it's really not an issue.  The cluster starts and our fat jar and Apache
>> Hop MainBeam class is found and started.
>>
>> The only thing that remains is figuring out how to configure the Flink
>> cluster itself.  I have a couple of m5.large ec2 instances in a node group
>> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
>> in the pipeline can't seem to find resources to start.
>>
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout
>>
>> Parallelism was set to 1 for the runner and there are only 2 tasks in my
>> first Beam pipeline so it should be simple enough but it just times out.
>>
>> Next step for me is to document the result which will end up on
>> hop.apache.org.   I'll probably also want to demo this in Austin at the
>> upcoming Beam summit.
>>
>> Thanks a lot for your time and help so far!
>>
>> Cheers,
>> Matt
>>
>>


Re: Flink k8s Operator on AWS?

2022-06-26 Thread Yang Wang
Could you please share the JobManager logs of failed deployment? It will
also help a lot if you could show the pending pod status via "kubectl
describe ".

Given that the current Flink Kubernetes Operator is built on top of native
K8s integration[1], the Flink ResourceManager should allocate enough
TaskManager pods automatically.
We need to find out what is wrong via the logs. Maybe the service account
or taint or something else.


[1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html


Best,
Yang

Matt Casters  于2022年6月24日周五 23:48写道:

> Yes of-course.  I already feel a bit less intelligent for having asked the
> question ;-)
>
> The status now is that I managed to have it all puzzled together.  Copying
> the files from s3 to an ephemeral volume takes all of 2 seconds so it's
> really not an issue.  The cluster starts and our fat jar and Apache Hop
> MainBeam class is found and started.
>
> The only thing that remains is figuring out how to configure the Flink
> cluster itself.  I have a couple of m5.large ec2 instances in a node group
> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
> in the pipeline can't seem to find resources to start.
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
>
> Parallelism was set to 1 for the runner and there are only 2 tasks in my
> first Beam pipeline so it should be simple enough but it just times out.
>
> Next step for me is to document the result which will end up on
> hop.apache.org.   I'll probably also want to demo this in Austin at the
> upcoming Beam summit.
>
> Thanks a lot for your time and help so far!
>
> Cheers,
> Matt
>
>


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Yes of-course.  I already feel a bit less intelligent for having asked the
question ;-)

The status now is that I managed to have it all puzzled together.  Copying
the files from s3 to an ephemeral volume takes all of 2 seconds so it's
really not an issue.  The cluster starts and our fat jar and Apache Hop
MainBeam class is found and started.

The only thing that remains is figuring out how to configure the Flink
cluster itself.  I have a couple of m5.large ec2 instances in a node group
on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
in the pipeline can't seem to find resources to start.

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

Parallelism was set to 1 for the runner and there are only 2 tasks in my
first Beam pipeline so it should be simple enough but it just times out.

Next step for me is to document the result which will end up on
hop.apache.org.   I'll probably also want to demo this in Austin at the
upcoming Beam summit.

Thanks a lot for your time and help so far!

Cheers,
Matt


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Őrhidi Mátyás
Hi Matt,

Yes. There are several official Flink images with various JVMs including
Java 11.

https://hub.docker.com/_/flink

Cheers,
Matyas

On Fri, Jun 24, 2022 at 2:06 PM Matt Casters 
wrote:

> Hi Mátyás & all,
>
> Thanks again for the advice so far. On a related note I noticed Java 8
> being used, indicated in the log.
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  JAVA_HOME: /usr/local/openjdk-8
>
> Is there a way to use Java 11 to start Flink with?
>
> Kind regards,
>
> Matt
>
> On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
> wrote:
>
>> Hi Matt,
>>
>> I believe an artifact fetcher (e.g
>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>> template (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>> is an elegant way to solve your problem.
>>
>> The operator uses K8s native integration under the hood:
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>  In
>> application mode,  the main() method of the application is executed on the
>> JobManager, hence we need the jar locally.
>>
>> You can launch a session cluster (without job spec) on the operator that
>> allows submitting jars if you would like to avoid dealing with
>> authentication, but the recommended and safe approach is to use
>> sessionjobs for this purpose.
>>
>>
>> Cheers,
>> Matyas
>>
>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Thank you very much for the help Matyas and Gyula!
>>>
>>> I just saw a video today where you were presenting the FKO.  Really nice
>>> stuff!
>>>
>>> So I'm guessing we're executing "flink run" at some point on the master
>>> and that this is when we need the jar file to be local?
>>> Am I right in assuming that this happens after the flink cluster in
>>> question was started, as part of the job execution?
>>>
>>> On the one hand I agree with the underlying idea that authentication and
>>> security should not be a responsibility of the operator.   On the other
>>> hand I could add a flink-s3 driver but then I'd also have to configure it
>>> and so on and it's just hard to get that configuration to be really clean.
>>>
>>> Do we have some service running on the flink cluster which would allow
>>> us to post/copy files from the client (running kubectl) to the master?  If
>>> so, could we add an option to the job specification to that effect?  Just
>>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>>
>>> All the best,
>>> Matt
>>>
>>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>>> wrote:
>>>
 Hi Matt,

 - In FlinkDeployments you can utilize an init container to download
 your artifact onto a shared volume, then you can refer to it as local:/..
 from the main container. FlinkDeployments comes with pod template support
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template

 - FlinkSessionJobs comes with an artifact fetcher, but it may need some
 tweaking to make it work on your environment:

 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview

 I hope it helps, let us know if you have further questions.

 Cheers,
 Matyas



 On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
 matt.cast...@neotechnology.com> wrote:

> Hi Flink team!
>
> I'm interested in getting the new Flink Kubernetes Operator to work on
> AWS EKS.  Following the documentation I got pretty far.  However, when
> trying to run a job I got the following error:
>
> Only "local" is supported as schema for application mode. This assumes
>> t
>> hat the jar is located in the image, not the Flink client. An example
>> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>
>
>  I have an Apache Hop/Beam fat jar capable of running the Flink
> pipeline in my yml file:
>
> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>
> So how could I go about getting the fat jar in a desired location for
> the operator?
>
> Getting this to work would be really cool for both short and
> long-lived pipelines in the service of all sorts of data integration work.
> It would do away with the complexity of setting up and maintaining your 
> own
> Flink cluster.
>
> Thanks in advance!
>
> All the best,
>
> Matt (mcasters, Apache Hop PMC)
>
>


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Hi Mátyás & all,

Thanks again for the advice so far. On a related note I noticed Java 8
being used, indicated in the log.

org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 JAVA_HOME: /usr/local/openjdk-8

Is there a way to use Java 11 to start Flink with?

Kind regards,

Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
 Hi Flink team!

 I'm interested in getting the new Flink Kubernetes Operator to work on
 AWS EKS.  Following the documentation I got pretty far.  However, when
 trying to run a job I got the following error:

 Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example
> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


  I have an Apache Hop/Beam fat jar capable of running the Flink
 pipeline in my yml file:

 jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar

 So how could I go about getting the fat jar in a desired location for
 the operator?

 Getting this to work would be really cool for both short and long-lived
 pipelines in the service of all sorts of data integration work.  It would
 do away with the complexity of setting up and maintaining your own Flink
 cluster.

 Thanks in advance!

 All the best,

 Matt (mcasters, Apache Hop PMC)




Re: Flink k8s Operator on AWS?

2022-06-23 Thread Yang Wang
Thanks for your valuable inputs.
To make deploying Flink on K8s easy as a normal Java application
is certainly the mission of Flink Kubernetes Operator. Obviously, we are
still a little far from this mission.

Back to the user jars download, I think it makes sense to introduce the
artifact fetcher in the Flink Kubernetes Operator or directly in the Flink.
Then things become quite easier.
Users just need to configure the S3 access key and secret.


Best,
Yang




Matt Casters  于2022年6月22日周三 15:52写道:

> Hi Yang,
>
> Thanks for the suggestion!  I looked into this volume sharing on EKS
> yesterday but I couldn't figure it out right away.
> The way that people come into the Apache Hop project is often with very
> little technical knowledge since that's sort of the goal of the project:
> make things easy.  Following page after page of complicated instructions
> just to get a few files into a pod container... I feel it's just a bit
> much.
> But again, this is my frustration with k8s, not with Flink ;-)
>
> Cheers,
> Matt
>
> On Wed, Jun 22, 2022 at 5:32 AM Yang Wang  wrote:
>
>> Matyas and Gyula have shared many great informations about how to make
>> the Flink Kubernetes Operator work on the EKS.
>>
>> One more input about how to prepare the user jars. If you are more
>> familiar with K8s, you could use persistent volume to provide the user jars
>> and them mount the volume to JobManager and TaskManager.
>> I think the EKS could support EBS, NFS and more other PVs.
>>
>> Best,
>> Yang
>>
>> Őrhidi Mátyás  于2022年6月21日周二 23:00写道:
>>
>>> Hi Matt,
>>>
>>> I believe an artifact fetcher (e.g
>>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>>> template (
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>>> is an elegant way to solve your problem.
>>>
>>> The operator uses K8s native integration under the hood:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>>  In
>>> application mode,  the main() method of the application is executed on the
>>> JobManager, hence we need the jar locally.
>>>
>>> You can launch a session cluster (without job spec) on the operator that
>>> allows submitting jars if you would like to avoid dealing with
>>> authentication, but the recommended and safe approach is to use
>>> sessionjobs for this purpose.
>>>
>>>
>>> Cheers,
>>> Matyas
>>>
>>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
 Thank you very much for the help Matyas and Gyula!

 I just saw a video today where you were presenting the FKO.  Really
 nice stuff!

 So I'm guessing we're executing "flink run" at some point on the master
 and that this is when we need the jar file to be local?
 Am I right in assuming that this happens after the flink cluster in
 question was started, as part of the job execution?

 On the one hand I agree with the underlying idea that authentication
 and security should not be a responsibility of the operator.   On the other
 hand I could add a flink-s3 driver but then I'd also have to configure it
 and so on and it's just hard to get that configuration to be really clean.

 Do we have some service running on the flink cluster which would allow
 us to post/copy files from the client (running kubectl) to the master?  If
 so, could we add an option to the job specification to that effect?  Just
 brainstorming ;-) (and forking apache/flink-kubernetes-operator)

 All the best,
 Matt

 On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
 wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download
> your artifact onto a shared volume, then you can refer to it as local:/..
> from the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need
> some tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work
>> on AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This
>>> assumes t
>>> hat the jar is located in t

Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Yang,

Thanks for the suggestion!  I looked into this volume sharing on EKS
yesterday but I couldn't figure it out right away.
The way that people come into the Apache Hop project is often with very
little technical knowledge since that's sort of the goal of the project:
make things easy.  Following page after page of complicated instructions
just to get a few files into a pod container... I feel it's just a bit
much.
But again, this is my frustration with k8s, not with Flink ;-)

Cheers,
Matt

On Wed, Jun 22, 2022 at 5:32 AM Yang Wang  wrote:

> Matyas and Gyula have shared many great informations about how to make the
> Flink Kubernetes Operator work on the EKS.
>
> One more input about how to prepare the user jars. If you are more
> familiar with K8s, you could use persistent volume to provide the user jars
> and them mount the volume to JobManager and TaskManager.
> I think the EKS could support EBS, NFS and more other PVs.
>
> Best,
> Yang
>
> Őrhidi Mátyás  于2022年6月21日周二 23:00写道:
>
>> Hi Matt,
>>
>> I believe an artifact fetcher (e.g
>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>> template (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>> is an elegant way to solve your problem.
>>
>> The operator uses K8s native integration under the hood:
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>  In
>> application mode,  the main() method of the application is executed on the
>> JobManager, hence we need the jar locally.
>>
>> You can launch a session cluster (without job spec) on the operator that
>> allows submitting jars if you would like to avoid dealing with
>> authentication, but the recommended and safe approach is to use
>> sessionjobs for this purpose.
>>
>>
>> Cheers,
>> Matyas
>>
>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Thank you very much for the help Matyas and Gyula!
>>>
>>> I just saw a video today where you were presenting the FKO.  Really nice
>>> stuff!
>>>
>>> So I'm guessing we're executing "flink run" at some point on the master
>>> and that this is when we need the jar file to be local?
>>> Am I right in assuming that this happens after the flink cluster in
>>> question was started, as part of the job execution?
>>>
>>> On the one hand I agree with the underlying idea that authentication and
>>> security should not be a responsibility of the operator.   On the other
>>> hand I could add a flink-s3 driver but then I'd also have to configure it
>>> and so on and it's just hard to get that configuration to be really clean.
>>>
>>> Do we have some service running on the flink cluster which would allow
>>> us to post/copy files from the client (running kubectl) to the master?  If
>>> so, could we add an option to the job specification to that effect?  Just
>>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>>
>>> All the best,
>>> Matt
>>>
>>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>>> wrote:
>>>
 Hi Matt,

 - In FlinkDeployments you can utilize an init container to download
 your artifact onto a shared volume, then you can refer to it as local:/..
 from the main container. FlinkDeployments comes with pod template support
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template

 - FlinkSessionJobs comes with an artifact fetcher, but it may need some
 tweaking to make it work on your environment:

 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview

 I hope it helps, let us know if you have further questions.

 Cheers,
 Matyas



 On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
 matt.cast...@neotechnology.com> wrote:

> Hi Flink team!
>
> I'm interested in getting the new Flink Kubernetes Operator to work on
> AWS EKS.  Following the documentation I got pretty far.  However, when
> trying to run a job I got the following error:
>
> Only "local" is supported as schema for application mode. This assumes
>> t
>> hat the jar is located in the image, not the Flink client. An example
>> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>
>
>  I have an Apache Hop/Beam fat jar capable of running the Flink
> pipeline in my yml file:
>
> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>
> So how could I go about getting the fat jar in a desired location for
> the operator?
>
> Getting this to work would be really cool for both short and
> long-lived pipelines in the service of all sorts of data integration work.
> It would do away with the complexity of setting up and maintaining your 
> 

Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Matyas,

Again thank you very much for the information.  I'm a beginner and all
the help is really appreciated.  After some diving into the script
behind s3-artifiact-fetcher I kind of figured it out.  Have an folder
sync'ed into the pod container of the task manager.  Then I guess we should
be able to find the files locally.

At its core what we're trying to do with a project like Apache Hop is sit
on the side of the organizations that use the software since we want to
lower complexity, maintenance costs, learning curves and so on.  Every time
I see a cryptic scarcely documented Yaml file or complicated k8s setup I
need to ask myself in which way I'm sending our users on a week-long
mission.

In a way it makes me appreciate the work Google did with Dataflow a bit
more because they looked at this problem in a holistic way and considered
the platform (GCP), the engine (Dataflow cluster on GCP k8s) and the
executing pipeline (Beam API Jar files) to be different facets of the same
problem.  Jar files get uploaded automatically, the cluster automatically
instantiated, the pipeline run, monitored and scaled automatically and at
the end shut down properly.

I want to figure out a way to do this with Flink as well since I believe,
especially on AWS (even with Spark centric options on EMR, EMR serverless),
that running a pipeline is just too complicated.  Your work really helps!

All the best,
Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
 Hi Flink team!

 I'm interested in getting the new Flink Kubernetes Operator to work on
 AWS EKS.  Following the documentation I got pretty far.  However, when
 trying to run a job I got the following error:

 Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example
> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


  I have an Apache Hop/Beam fat jar capable of running the Flink
 pipeline in my yml file:

 jarURI: s3://

Re: Flink k8s Operator on AWS?

2022-06-21 Thread Yang Wang
Matyas and Gyula have shared many great informations about how to make the
Flink Kubernetes Operator work on the EKS.

One more input about how to prepare the user jars. If you are more familiar
with K8s, you could use persistent volume to provide the user jars and them
mount the volume to JobManager and TaskManager.
I think the EKS could support EBS, NFS and more other PVs.

Best,
Yang

Őrhidi Mátyás  于2022年6月21日周二 23:00写道:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
 Hi Flink team!

 I'm interested in getting the new Flink Kubernetes Operator to work on
 AWS EKS.  Following the documentation I got pretty far.  However, when
 trying to run a job I got the following error:

 Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example
> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


  I have an Apache Hop/Beam fat jar capable of running the Flink
 pipeline in my yml file:

 jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar

 So how could I go about getting the fat jar in a desired location for
 the operator?

 Getting this to work would be really cool for both short and long-lived
 pipelines in the service of all sorts of data integration work.  It would
 do away with the complexity of setting up and maintaining your own Flink
 cluster.

 Thanks in advance!

 All the best,

 Matt (mcasters, Apache Hop PMC)




Re: Flink k8s Operator on AWS?

2022-06-21 Thread Őrhidi Mátyás
Hi Matt,

I believe an artifact fetcher (e.g
https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
template (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
is an elegant way to solve your problem.

The operator uses K8s native integration under the hood:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
In
application mode,  the main() method of the application is executed on the
JobManager, hence we need the jar locally.

You can launch a session cluster (without job spec) on the operator that
allows submitting jars if you would like to avoid dealing with
authentication, but the recommended and safe approach is to use
sessionjobs for this purpose.


Cheers,
Matyas

On Tue, Jun 21, 2022 at 4:03 PM Matt Casters 
wrote:

> Thank you very much for the help Matyas and Gyula!
>
> I just saw a video today where you were presenting the FKO.  Really nice
> stuff!
>
> So I'm guessing we're executing "flink run" at some point on the master
> and that this is when we need the jar file to be local?
> Am I right in assuming that this happens after the flink cluster in
> question was started, as part of the job execution?
>
> On the one hand I agree with the underlying idea that authentication and
> security should not be a responsibility of the operator.   On the other
> hand I could add a flink-s3 driver but then I'd also have to configure it
> and so on and it's just hard to get that configuration to be really clean.
>
> Do we have some service running on the flink cluster which would allow us
> to post/copy files from the client (running kubectl) to the master?  If so,
> could we add an option to the job specification to that effect?  Just
> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>
> All the best,
> Matt
>
> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
> wrote:
>
>> Hi Matt,
>>
>> - In FlinkDeployments you can utilize an init container to download your
>> artifact onto a shared volume, then you can refer to it as local:/.. from
>> the main container. FlinkDeployments comes with pod template support
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>
>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>> tweaking to make it work on your environment:
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>
>> I hope it helps, let us know if you have further questions.
>>
>> Cheers,
>> Matyas
>>
>>
>>
>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Hi Flink team!
>>>
>>> I'm interested in getting the new Flink Kubernetes Operator to work on
>>> AWS EKS.  Following the documentation I got pretty far.  However, when
>>> trying to run a job I got the following error:
>>>
>>> Only "local" is supported as schema for application mode. This assumes t
 hat the jar is located in the image, not the Flink client. An example
 of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>>
>>>
>>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>>> in my yml file:
>>>
>>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>>
>>> So how could I go about getting the fat jar in a desired location for
>>> the operator?
>>>
>>> Getting this to work would be really cool for both short and long-lived
>>> pipelines in the service of all sorts of data integration work.  It would
>>> do away with the complexity of setting up and maintaining your own Flink
>>> cluster.
>>>
>>> Thanks in advance!
>>>
>>> All the best,
>>>
>>> Matt (mcasters, Apache Hop PMC)
>>>
>>>


Re: Flink k8s Operator on AWS?

2022-06-21 Thread Matt Casters
Thank you very much for the help Matyas and Gyula!

I just saw a video today where you were presenting the FKO.  Really nice
stuff!

So I'm guessing we're executing "flink run" at some point on the master and
that this is when we need the jar file to be local?
Am I right in assuming that this happens after the flink cluster in
question was started, as part of the job execution?

On the one hand I agree with the underlying idea that authentication and
security should not be a responsibility of the operator.   On the other
hand I could add a flink-s3 driver but then I'd also have to configure it
and so on and it's just hard to get that configuration to be really clean.

Do we have some service running on the flink cluster which would allow us
to post/copy files from the client (running kubectl) to the master?  If so,
could we add an option to the job specification to that effect?  Just
brainstorming ;-) (and forking apache/flink-kubernetes-operator)

All the best,
Matt

On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download your
> artifact onto a shared volume, then you can refer to it as local:/.. from
> the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
> tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work on
>> AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This assumes t
>>> hat the jar is located in the image, not the Flink client. An example of
>>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>
>>
>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>> in my yml file:
>>
>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>
>> So how could I go about getting the fat jar in a desired location for the
>> operator?
>>
>> Getting this to work would be really cool for both short and long-lived
>> pipelines in the service of all sorts of data integration work.  It would
>> do away with the complexity of setting up and maintaining your own Flink
>> cluster.
>>
>> Thanks in advance!
>>
>> All the best,
>>
>> Matt (mcasters, Apache Hop PMC)
>>
>>


Re: Flink k8s Operator on AWS?

2022-06-21 Thread Gyula Fóra
A small addition to what Matyas has said:

The limitation of only supporting local scheme is coming from the Flink
Kubernetes Application mode directly and is not related to the operator
itself.
Once this feature is added to Flink itself the operator can also support it
for newer Flink versions.

Gyula

On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download your
> artifact onto a shared volume, then you can refer to it as local:/.. from
> the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
> tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work on
>> AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This assumes t
>>> hat the jar is located in the image, not the Flink client. An example of
>>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>
>>
>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>> in my yml file:
>>
>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>
>> So how could I go about getting the fat jar in a desired location for the
>> operator?
>>
>> Getting this to work would be really cool for both short and long-lived
>> pipelines in the service of all sorts of data integration work.  It would
>> do away with the complexity of setting up and maintaining your own Flink
>> cluster.
>>
>> Thanks in advance!
>>
>> All the best,
>>
>> Matt (mcasters, Apache Hop PMC)
>>
>>


Re: Flink k8s Operator on AWS?

2022-06-21 Thread Őrhidi Mátyás
Hi Matt,

- In FlinkDeployments you can utilize an init container to download your
artifact onto a shared volume, then you can refer to it as local:/.. from
the main container. FlinkDeployments comes with pod template support
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template

- FlinkSessionJobs comes with an artifact fetcher, but it may need some
tweaking to make it work on your environment:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview

I hope it helps, let us know if you have further questions.

Cheers,
Matyas



On Tue, Jun 21, 2022 at 2:35 PM Matt Casters 
wrote:

> Hi Flink team!
>
> I'm interested in getting the new Flink Kubernetes Operator to work on AWS
> EKS.  Following the documentation I got pretty far.  However, when trying
> to run a job I got the following error:
>
> Only "local" is supported as schema for application mode. This assumes t
>> hat the jar is located in the image, not the Flink client. An example of
>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>
>
>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
> in my yml file:
>
> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>
> So how could I go about getting the fat jar in a desired location for the
> operator?
>
> Getting this to work would be really cool for both short and long-lived
> pipelines in the service of all sorts of data integration work.  It would
> do away with the complexity of setting up and maintaining your own Flink
> cluster.
>
> Thanks in advance!
>
> All the best,
>
> Matt (mcasters, Apache Hop PMC)
>
>


Re: Flink + K8s

2021-11-02 Thread Austin Cawley-Edwards
Hi Rommel,

That’s correct that K8s will restart the JM pod (assuming it’s been created
by a K8s Job or Deployment), and it will pick up the HA data and resume
work. The only use case for having multiple replicas is faster failover, so
you don’t have to wait for K8s to provision that new pod (which can be
potentially a decent amount of time, if the cluster is scaling up, etc.).

Hope that helps,
Austin

On Tue, Nov 2, 2021 at 4:36 PM Rommel Holmes  wrote:

> Hi,
>
> From my understanding, when i set Flink in HA mode in K8s, I don't need to
> setup more than 1 job manager, because once the job manager dies, K8s will
> restart it for me. Is that the correct understanding or for the HA purpose,
> I still need to setup more than 1 job manager?
>
> Thanks.
>
> Rommel
>
> --
>  Yours
>  Rommel
>
>