Re: Job graph
Hi Nikolaos, As Ron said, the jobgraph is a low level structure in flink and it is not exposed to users now. Currently you can get job details from `RestClusterClient` in method `getJobDetails(JobID jobId)`, the result `JobDetailsInfo` contains all vertices in the job and the json format job plan. Best, Shammon FY On Sat, Sep 2, 2023 at 6:26 AM David Anderson wrote: > This may or may not help, but you can get the execution plan from > inside the client, by doing something like this (I printed the plan to > stderr): > > ... > System.err.println(env.getExecutionPlan()); > env.execute("my job"); > > The result is a JSON-encoded representation of the job graph, which > for the simple example I just tried it with, produced this output: > > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 10 > }, { > "id" : 3, > "type" : "Sink: Writer", > "pact" : "Operator", > "contents" : "Sink: Writer", > "parallelism" : 10, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 5, > "type" : "Sink: Committer", > "pact" : "Operator", > "contents" : "Sink: Committer", > "parallelism" : 10, > "predecessors" : [ { > "id" : 3, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > > On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis > wrote: > > > > Hello folks, > > > > I am trying to get the job graph of a running flink job. I want to use > flink libraries. For now, I have the RestClusterClient and the job IDs. > Tell me please how to get the job graph. > > > > Thank you. >
k8s operator - clearing operator state
Hi community, I would like to ask how one can clear Flink's k8s operator state. I have a sandbox k8s cluster with Flink k8s operator where I've deployed Flink session cluster with few Session jobs. After some play around, and braking few things here and there I see this log: 023-09-04 21:10:30,923 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/basic-session-job-only-example-5] Not ready for reconciliation yet... │ │ 2023-09-04 21:10:45,928 o.a.f.k.o.c.FlinkSessionJobController [INFO ][default/basic-session-job-only-example-5] Starting reconciliation │ │ 2023-09-04 21:10:45,929 o.a.f.k.o.r.s.SessionJobReconciler [WARN ][default/basic-session-job-only-example-5] Session cluster deployment is not found│ │ 2023-09-04 21:10:45,930 o.a.f.k.o.r.s.SessionJobReconciler [WARN ][default/basic-session-job-only-example-5] Session cluster deployment is not found│ │ 2023-09-04 21:10:45,930 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/basic-session-job-only-example-5] Not ready for reconciliation yet... The "basic-session-job-only-example-5" was one of by session jobs deployed on the cluster. I reinstall operator by calling helm uninstall and kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml so basically trying to revert what is described at [1]. After that I've install operator again as described in [1] but I still have those logs. Couple of extra things: k get pods NAME READY STATUSRESTARTS AGE flink-kubernetes-operator-5dc99cb4d8-6lv88 2/2 Running 0 39m k get services NAME TYPECLUSTER-IP EXTERNAL-IP PORT(S) AGE flink-operator-webhook-service ClusterIP 10.97.169.70 443/TCP 39m kubernetes ClusterIP 10.96.0.1 443/TCP 13d k get deployments NAMEREADY UP-TO-DATE AVAILABLE AGE flink-kubernetes-operator 1/1 11 39m If anyone would wonder, the exact instruction on how I "broke" this is described here [2]. However at this point what I would like to know is how to have a clear k8s operator state. [1] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/try-flink-kubernetes-operator/quick-start/ [2] https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
backpressured metrics doesnt work
Hi, Any idea why backpressured metrics are not working and how I can fix it? [image: image.png] Thanks Kenan