Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-11 Thread Fuyao Li
Hello All,

I have solved the problem. Just in case someone need a reference in the future. 
I will share my problem solution here.

Problem 1: Log issue
Flink 1.12 have changed the default log4j configuration file name from 
log4j.properties to log4j-console.properties. From the operator’s perspective, 
the operator pods’s /opt/flink/conf directory must contain 
log4j-console.properties to enable logging to function properly. 
Log4j.properties won’t be recognized.

Problem 2:
stopWithSavepoint doesn’t work issue and Flink CLI stop/cancel/savepoint 
command doesn’t work with native Kubernetes.

For the CLI commands, I should add �Ctarget=Kubernetes-application 
-Dkubernetes.cluster-id= to all application mode Flink CLI commands 
to make it work. For stop/cancel/savepoint command, I was directly following 
the doc here [1] without adding those configurations parameters. Flink 
documentation doesn’t point out explicitly and I was kind of confused here 
earlier.

Maybe the doc can add a note here to be more informative?

For stop command not working with my code issue, it was due to my Kafka-client 
is too low. I was using Kafka-client 1.1.0 (a very old version) and it works 
okay with my flink application. Because of the log issue, I didn’t managed to 
notice such error earlier. Actually, I got such an error during executing stop 
command.

java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'void 
org.apache.kafka.clients.producer.KafkaProducer.close(java.time.Duration)'

Stop command introduces some better semantic for restart a job and it calls 
this method in the Flink application. A low version of Kafka client will run 
into failure. Cancel command will not have such an issue. I didn’t look deep 
into the source code implementation for this, maybe you can share more insights 
about this.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/cli/

Thanks,
Fuyao

From: Yang Wang 
Date: Friday, May 7, 2021 at 20:45
To: Fuyao Li 
Cc: Austin Cawley-Edwards , matth...@ververica.com 
, user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Since your problem is about the flink-native-k8s-operator, let's move the 
discussion there.

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年5月8日周六 
上午5:41写道:
Hi Austin, Yang, Matthias,

I am following up to see if you guys have any idea regarding this problem.

I also created an issue here to describe the problem. [1]

After looking into the source code[1], I believe for native k8s, three 
configuration files should be added to the flink-config- configmap 
automatically. However, it just have the flink-conf.yaml in the operator 
created flink application. And that is also causing the start command 
difference mentioned in the issue.


Native k8s using Flink CLI: Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b


Operator flink app:
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b

Please share your opinion on this. Thanks!

[1] 
https://github.com/wangyang0918/flink-native-k8s-operator/issues/4<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/issues/4__;!!GqivPVa7Brio!Kic66xJyvdUvhTTsvM8QZXyYZYUhVLI_tUdwbipnafqSDd7kF4wQn5taaZWaOoA$>
[2] 
https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java__;!!GqivPVa7Brio!Kic66xJyvdUvhTTsvM8QZXyYZYUhVLI_tUdwbipnafqSDd7kF4wQn5taji-Q22E$>

Have a good weekend!
Best,
Fuyao


From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Tuesday, May 4, 2021 at 19:52
To: Austin Cawley-Edwards 
mailto:austin.caw...@gmail.com>

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-07 Thread Yang Wang
Since your problem is about the flink-native-k8s-operator, let's move the
discussion there.

Best,
Yang

Fuyao Li  于2021年5月8日周六 上午5:41写道:

> Hi Austin, Yang, Matthias,
>
>
>
> I am following up to see if you guys have any idea regarding this problem.
>
>
>
> I also created an issue here to describe the problem. [1]
>
>
>
> After looking into the source code[1], I believe for native k8s, three
> configuration files should be added to the flink-config-
> configmap automatically. However, it just have the flink-conf.yaml in the
> operator created flink application. And that is also causing the start
> command difference mentioned in the issue.
>
>
>
> Native k8s using Flink CLI: Args:
>   native-k8s
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/opt/flink/log/jobmanager.log 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
> -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=1073741824b -D 
> jobmanager.memory.jvm-overhead.max=201326592b
>
>
>
>
>
> Operator flink app:
>
> Args:
>   native-k8s
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376
> -Xms3462817376 -XX:MaxMetaspaceSize=268435456
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=429496736b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=3462817376b -D
> jobmanager.memory.jvm-overhead.max=429496736b
>
>
>
> Please share your opinion on this. Thanks!
>
>
>
> [1] https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
>
> [2]
> https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
>
>
>
> Have a good weekend!
>
> Best,
>
> Fuyao
>
>
>
>
>
> *From: *Fuyao Li 
> *Date: *Tuesday, May 4, 2021 at 19:52
> *To: *Austin Cawley-Edwards ,
> matth...@ververica.com , Yang Wang <
> danrtsey...@gmail.com>
> *Cc: *user 
> *Subject: *Re: [External] : Re: StopWithSavepoint() method doesn't work
> in Java based flink native k8s operator
>
> Hello All,
>
>
>
> I also checked the native-k8s’s automatically generated configmap. It only
> contains the flink-conf.yaml, but no log4j.properties. I am not very
> familiar with the implementation details behind native k8s.
>
>
>
> That should be the root cause, could you check the implementation and help
> me to locate the potential problem.
>
> Yang’s initial code:
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
> <https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j7UZwZEh4$>
>
> My modified version:
> https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
> <https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j78oKszhY$>
>
>
>
> Thank you so much.
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Fuyao Li 
> *Date: *Tuesday, May 4, 2021 at 19:34
> *To: *Austin Cawley-Edwards ,
> matth...@ververica.com , Yang Wang <
> danrtsey...@gmail.com>
> *Cc: *user 
> *Subject: *Re: [External] : Re: StopWithSavepoint() method doesn't work
> in Java based flink native k8s operator
>
> Hello Austin, Yang,
>
>
>
> For the logging issue, I think I have found something worth to notice.
>
>
>
> They are all based on base image flink:1.12.1-scala_2.11-java11
>
>
>
> Dockerfile: https://pastebin.ubuntu.com/p/JTsHygsTP6/
> <https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/JTsHygsTP6/__;!!GqivPVa7Br

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-07 Thread Fuyao Li
Hi Austin, Yang, Matthias,

I am following up to see if you guys have any idea regarding this problem.

I also created an issue here to describe the problem. [1]

After looking into the source code[1], I believe for native k8s, three 
configuration files should be added to the flink-config- configmap 
automatically. However, it just have the flink-conf.yaml in the operator 
created flink application. And that is also causing the start command 
difference mentioned in the issue.


Native k8s using Flink CLI: Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b


Operator flink app:
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b

Please share your opinion on this. Thanks!

[1] https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
[2] 
https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java

Have a good weekend!
Best,
Fuyao


From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:52
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java<https://urldefense.com/v3/__https:/github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j7UZwZEh4$>
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java__;!!GqivPVa7Brio!K_YcBE0y2rd7zQtMtIKmSNNUpMmUTUSA8VdkNCJ8i9w2tH2nwNWoq3j78oKszhY$>

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/JTsHygsTP6/__;!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs7J-Fd4Q$>

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Fl

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/<https://urldefense.com/v3/__https:/pastebin.ubuntu.com/p/JTsHygsTP6/__;!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs7J-Fd4Q$>

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml*L58__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsYsKSwdA$>
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml*L21__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVs-oWyvPk$>
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java*L83__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsk28At-8$>
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java*L176__;Iw!!GqivPVa7Brio!OVpkpwiox1P7d8b34Nmo9xfzrPlbO4cW65uPc-xLTxaPavtVFQ1GVzVsc49StKI$>

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed pri

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: https://pastebin.ubuntu.com/p/JTsHygsTP6/

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zS0pc8xyg$>
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zSxwZ7y0c$>
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties<https://urldefense.com/v3/__https:/github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties__;!!GqivPVa7Brio!M43rnv3I8fCMsbDbdQLrgzWtsF6rSJJlebU1hGD9w0OMn702aTN6N-zSmcVTvfY$>
[4] 
https://pastebin.ubuntu.com/p

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] https://github.com/FuyaoLi2017/flink-native-kubernetes-operator
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties
[4] https://pastebin.ubuntu.com/p/tcxT2FwPRS/
[5] https://pastebin.ubuntu.com/p/JTsHygsTP6/
[6] https://pastebin.ubuntu.com/p/2wgdcxVfSy/
[7] https://pastebin.ubuntu.com/p/Sq8xRjQyVY/


Best,
Fuyao

From: Austin Cawley-Edwards 
Date: Tuesday, May 4, 2021 at 14:47
To: matth...@ververica.com 
Cc: Fuyao Li , user , Yang Wang 
, Austin Cawley-Edwards 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hey all,

Thanks for the ping, Matthias. I'm not super familiar with the details of @Yang 
Wang<mailto:danrtsey...@gmail.com>'s operator, to be honest :(. Can you share 
some of your FlinkApplication specs?

For the `kubectl logs` command, I believe that just reads stdout from the 
container. Which logging framework are you using in your application and how 
have you configured it? There's a good guide for configuring the popular ones 
in the Flink docs[1]. For instance, if you're using the default Log4j 2 
framework you should configure a ConsoleAppender[2].

Hope that helps a bit,
Austin

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/__;!!GqivPVa7Brio!IkcTZZ5rY-669_XS8ldTeXg0NeH1nsQkupDh_zuUHAC4yqDOoiJ6f2EvCjPpPPQ$>
[2]: 
https://logging.apache.org/log4j/2.x/manual/appenders.html#ConsoleAppender<https://urldefense.com/v3/__https:/logging.apache.org/log4j/2.x/manual/appenders.html*ConsoleAppender__;Iw!!GqivPVa7Brio!IkcTZZ5rY-669_XS8ldTeXg0NeH1nsQkupDh_zuUHAC4yqDOoiJ6f2EvHl40sbA$>

On Tue, May 4, 2021 at 1:59 AM Matthias Pohl 
mailto:matth...@ververica.com>> wrote:
Hi Fuyao,
sorry for not replying earlier. The stop-with-savepoint operation shouldn't 
only suspend but terminate the job. Is it that you might have a larger state 
that makes creating the savepoint take longer? Even though, considering that 
you don't experience this behavior with your 2nd solution, I'd assume that we 
could ignore this possibility.

I'm gonna add Austin to the conversation as he worked with k8s operators as 
well already. Maybe, he can also give you more insights on the logging issue 
which would enable us to dig deeper into what's going on with 
stop-with-savepoint.

Best,
Matthias

On Tue, May 4, 2021 at 4:33 AM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello,

Update:
I think stopWithSavepoint() only suspend the job. It doesn’t actually terminate 
(./bin/flink cancel) the job. I switched to cancelWithSavepoint() and it works 
here.

Maybe stopWithSavepoint() should only be used to update the configurations like 
parallelism? For updating the image, this seems to be not suitable, please 
correct me if I am wrong.

For the log issue, I am still a bit confused. Why it is not available in 
kubectl logs. How should I get access to it?

Thanks.
Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Sunday, May 2, 2021 at 00:36
To: user mailto:user@flink.apache.org>>, Yang Wang 
mailto:danrtsey...@gmail.com>>
Subject: [External] : Re: StopW