[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-26 Thread GitBox


darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1292538966

   > > Hi @darenwkt ideally we should be able to control the stack-trace per CR:
   > > ```
   > >   flinkConfiguration:
   > > taskmanager.numberOfTaskSlots: "2"
   > > kubernetes.operator.exception.stacktrace.enabled: "true"
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > > With the current approach this is an operator wide config only. @gyfora 
wdyt?
   > 
   > I am not completely sure about this. It might be better to let operator 
admins decide on this setting, at least the size limits for sure.
   > 
   > In any case there is some inconsitency in the current implementation: If 
something is put in the `KubernetesOperatorConfigurations` the ConfigOption 
should have the annotation `SECTION_SYSTEM` not dynamic (those are for options 
that can dynamically be overridden from the user flink config as Matyas 
suggested).
   
   @gyfora @morhidi 
   
   Thanks for confirming, I will push a new revision with the config placed in 
SYSTEM section.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-26 Thread GitBox


darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1292536844

   
   
   
   > > Hi @morhidi,
   > > I have updated the stackTrace to be a string and added a ConfigOption to 
limit the length of the stackTrace string.
   > > Regarding the concern on deserializing the error status into a valid 
json field, I have tested that deserialization back into the 
FlinkResourceException class works. My testing was done as follows:
   > > 
   > > 1. `kubectl get flinkdeployment basic-example -o yaml > test.yaml`
   > > 2. Tested deserializing the `test.yaml` back into FlinkResourceException 
class using the following code:
   > > 
   > > ```
   > > @Test
   > > public void testYamlDeserialization() throws IOException {
   > > 
   > > Yaml yaml = new Yaml();
   > > InputStream inputStream = this.getClass()
   > > .getClassLoader()
   > > .getResourceAsStream("test.yaml");
   > > Map obj = yaml.load(inputStream);
   > > System.out.println("deserialized yaml: " + obj);
   > > 
   > > ObjectMapper mapper = new ObjectMapper();
   > > FlinkResourceException ex = mapper.readValue((String) 
((Map) obj.get("status")).get("error"), 
FlinkResourceException.class);
   > > System.out.println("deserialized json: " + ex);
   > > }
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 3. Results of System.out.println are:
   > > 
   > > ```
   > > deserialized yaml: {apiVersion=flink.apache.org/v1beta1, 
kind=FlinkDeployment, 
metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > > ```
   > > deserialized json: 
FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException,
 message=org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not create Kubernetes cluster "basic-example"., stackTraceElements=null, 
additionalMetadata=null, 
throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException,
 message=Could not create Kubernetes cluster "basic-example"., 
stackTraceElements=null, additionalMetadata=null, throwableList=null), 
FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException,
 message=Failure executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"]. Received status: Status(ap
 iVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: "flink-logs", reason=FieldValueNotFound, 
additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: "downloads", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={})., stackTraceElements=null, 
additionalMetadata=null, throwableList=null)])
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > > As a result, I can confirm deserialization of the json works. The 
question now is whether we are ok with the current format the error field is 
shown in CR yaml, which includes the escape field. I tried to search th

[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-26 Thread GitBox


darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1291703643

   Hi @morhidi,
   
   I have updated the stackTrace to be a string and added a ConfigOption to 
limit the length of the stackTrace string.
   
   Regarding the concern on deserializing the error status into a valid json 
field, I have tested that deserialization back into the FlinkResourceException 
class works. My testing was done as follows:
   
   1. `kubectl get flinkdeployment basic-example -o yaml > test.yaml`
   2. Tested deserializing the `test.yaml` back into FlinkResourceException 
class using the following code:
   
   ```
   @Test
   public void testYamlDeserialization() throws IOException {
   
   Yaml yaml = new Yaml();
   InputStream inputStream = this.getClass()
   .getClassLoader()
   .getResourceAsStream("test.yaml");
   Map obj = yaml.load(inputStream);
   System.out.println("deserialized yaml: " + obj);
   
   ObjectMapper mapper = new ObjectMapper();
   FlinkResourceException ex = mapper.readValue((String) ((Map) obj.get("status")).get("error"), FlinkResourceException.class);
   System.out.println("deserialized json: " + ex);
   }
   ```
   3. Results of System.out.println are:
   ```
   deserialized yaml: {apiVersion=flink.apache.org/v1beta1, 
kind=FlinkDeployment, 
metadata={annotations={kubectl.kubernetes.io/last-applied-configuration={"apiVersion":"flink.apache.org/v1beta1","kind":"FlinkDeployment","metadata":{"annotations":{},"name":"basic-example","namespace":"default"},"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"flinkVersion":"v1_15","image":"flink:1.15","job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"upgradeMode":"stateless"},"jobManager":{"resource":{"cpu":1,"memory":"2048m"}},"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod-template"},"spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/opt/flink/log","name":"flink-logs"},{"mountPath":"/opt/flink/downloads","name":"downloads"}]}]}},"serviceAccount":"flink","taskManager":{"resource":{"cpu":1,"memory":"2048m"
   ```
   ```
   deserialized json: 
FlinkResourceException(type=org.apache.flink.kubernetes.operator.exception.ReconciliationException,
 message=org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not create Kubernetes cluster "basic-example"., stackTraceElements=null, 
additionalMetadata=null, 
throwableList=[FlinkResourceException(type=org.apache.flink.client.deployment.ClusterDeploymentException,
 message=Could not create Kubernetes cluster "basic-example"., 
stackTraceElements=null, additionalMetadata=null, throwableList=null), 
FlinkResourceException(type=org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException,
 message=Failure executing: POST at: 
https://10.96.0.1/apis/apps/v1/namespaces/default/deployments. Message: 
Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"]. Received status: Status(apiVer
 sion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: "flink-logs", reason=FieldValueNotFound, 
additionalProperties={}), 
StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not found: "downloads", reason=FieldValueNotFound, 
additionalProperties={})], group=apps, kind=Deployment, name=basic-example, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Deployment.apps "basic-example" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name: Not found: 
"flink-logs", spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"downloads"], metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties={})., stackTraceElements=null, 
additionalMetadata=null, throwableList=null)])
   
   ```
   
   As a result, I can confirm deserialization of the json works. The question 
now is whether we are ok with the current format the error field is shown in CR 
yaml, which includes the escape field. I tried to search this up and the cause 
of this is we are storing the string with single quote '' instead of double 
quote "" in the yaml. Referring to 
https://www.baeldung.com/yaml-multi-line#quoting. I have tried to looking at 
openAPIV3Schema, but I don't see a straightforward way to change it to a double 
quote. I also suspect this could be related to how K8 api serializes the CR 
into yaml.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to Git

[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289640460

   @morhidi 
   
   Thanks for the feedback and sharing your test result:
   
   >It resulted in an string, that might not be a valid JSON:
   
   I will do some testing on this and validate the final JSON that's included 
in the CR field.
   
   >Another thing that I found is the structure of the stacktrace probably too 
verbose:
   
   Yup, it does contain several metadata we might not need like 
"classLoaderName", "nativeMethod". An alternative that I am considering is to 
store the stackTrace as a more compact List generated from 
ExceptionUtils.getStackFrames
   
(https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))
   
   However, the downside of this is each stackFrame is a String, so it's a bit 
more work to figure out which method/file the stackFrame is called from. Any 
thoughts on which one's more suited for our needs?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org