[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156784709 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -645,10 +641,19 @@ public Map getClusterInfo(Configuration conf) throws Exception { dashboardConfiguration.getFlinkRevision()); } -// JobManager resource usage can be deduced from the CR -var jmParameters = -new KubernetesJobManagerParameters( -conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +clusterInfo.putAll( +calculateClusterResourceMetrics( +conf, getTaskManagersInfo(conf).getTaskManagerInfos().size())); + +return clusterInfo; +} + +private HashMap calculateClusterResourceMetrics( Review Comment: I have added the new tests in FlinkUtils and rebased to main -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156270801 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -645,10 +641,19 @@ public Map getClusterInfo(Configuration conf) throws Exception { dashboardConfiguration.getFlinkRevision()); } -// JobManager resource usage can be deduced from the CR -var jmParameters = -new KubernetesJobManagerParameters( -conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +clusterInfo.putAll( +calculateClusterResourceMetrics( +conf, getTaskManagersInfo(conf).getTaskManagerInfos().size())); + +return clusterInfo; +} + +private HashMap calculateClusterResourceMetrics( Review Comment: You're right! I've also moved the method to two separate methods in `FlinkUtils` and will add tests tomorrow if this seems okay. This will result in duplicated code, but I think it improves the code, also easier to re-use and test this way. I will add tests for the two new methods tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156268027 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java: ## @@ -187,6 +193,66 @@ public void testMetricsMultiNamespace() { } } +@Test +public void testResourceMetrics() { +var namespace1 = "ns1"; +var namespace2 = "ns2"; +var deployment1 = TestUtils.buildApplicationCluster("deployment1", namespace1); +var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace1); +var deployment3 = TestUtils.buildApplicationCluster("deployment3", namespace2); + +deployment1 +.getStatus() +.getClusterInfo() +.putAll( +Map.of( +AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5", Review Comment: I've added the tests, also added a check to convert `Infinity` and `NaN` values to 0 instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1155350156 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -627,14 +637,42 @@ public Map getClusterInfo(Configuration conf) throws Exception { .toSeconds(), TimeUnit.SECONDS); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } -return runtimeVersion; + +// JobManager resource usage can be deduced from the CR +var jmParameters = +new KubernetesJobManagerParameters( +conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +var jmTotalCpu = +jmParameters.getJobManagerCPU() +* jmParameters.getJobManagerCPULimitFactor() +* jmParameters.getReplicas(); +var jmTotalMemory = +Math.round( +jmParameters.getJobManagerMemoryMB() +* Math.pow(1024, 2) +* jmParameters.getJobManagerMemoryLimitFactor() +* jmParameters.getReplicas()); + +// TaskManager resource usage is best gathered from the REST API to get current replicas Review Comment: I've pushed your requests and also extracted the logic to a new method so we could test it more easily without needing REST API, I just wasn't sure where to place the test, I'm not that familiar with the project structure yet :D If you think the PR looks ok, please let me know where you think I should write a test `AbstractFlinkService#calculateClusterResourceMetrics`, and I will do that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1154613982 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -627,14 +637,42 @@ public Map getClusterInfo(Configuration conf) throws Exception { .toSeconds(), TimeUnit.SECONDS); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } -return runtimeVersion; + +// JobManager resource usage can be deduced from the CR +var jmParameters = +new KubernetesJobManagerParameters( +conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +var jmTotalCpu = +jmParameters.getJobManagerCPU() +* jmParameters.getJobManagerCPULimitFactor() +* jmParameters.getReplicas(); +var jmTotalMemory = +Math.round( +jmParameters.getJobManagerMemoryMB() +* Math.pow(1024, 2) +* jmParameters.getJobManagerMemoryLimitFactor() +* jmParameters.getReplicas()); + +// TaskManager resource usage is best gathered from the REST API to get current replicas Review Comment: I tried to implement the same logic for `tmTotalCpu` as what you did the with `jmTotalCpu`, and I think it should be valid: `tmCpuRequest * tmCpuLimitFactor * numberOfTaskManagers` `tmCpuRequest` and `tmCpuLimitFactor` are accessible the same way as for the JM. Just retrieve `kubernetes.taskmanager.cpu` and `kubernetes.taskmanager.cpu.limit-factor` from the Flink config. I'm not sure about `numberOfTaskManagers`, in my test I just downloaded the number of TMs from the Flink REST API, maybe we could just use `FlinkUtils#getNumTaskManagers` instead. Code: ``` var tmTotalCpu = tmHardwareDesc.get().count() * conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU) * conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR); ``` Limit factors: ``` kubernetes.taskmanager.cpu.limit-factor = 1.3 kubernetes.jobmanager.cpu.limit-factor = 1.3 ``` Result: ``` Job Manager: Replicas:2 Resource: Cpu: 0.5 Memory: 1g Task Manager: Replicas:2 Resource: Cpu: 0.5 Memory: 1g Status: Cluster Info: Total - Cpu: 2.6 Total - Memory: 4294967296 ``` Do you think this could work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status
mateczagany commented on code in PR #558: URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1153504166 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -627,14 +637,42 @@ public Map getClusterInfo(Configuration conf) throws Exception { .toSeconds(), TimeUnit.SECONDS); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion()); -runtimeVersion.put( +clusterInfo.put( DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } -return runtimeVersion; + +// JobManager resource usage can be deduced from the CR +var jmParameters = +new KubernetesJobManagerParameters( +conf, new KubernetesClusterClientFactory().getClusterSpecification(conf)); +var jmTotalCpu = +jmParameters.getJobManagerCPU() +* jmParameters.getJobManagerCPULimitFactor() +* jmParameters.getReplicas(); +var jmTotalMemory = +Math.round( +jmParameters.getJobManagerMemoryMB() +* Math.pow(1024, 2) +* jmParameters.getJobManagerMemoryLimitFactor() +* jmParameters.getReplicas()); + +// TaskManager resource usage is best gathered from the REST API to get current replicas Review Comment: If fractional values are used for the CPU, there will be a difference between retrieving it from Flink REST and Kubernetes CR. Flink uses `Hardware.getNumberCPUCores()` under the hood to retrieve this value, not sure exactly how that works, but it's definitely an integer in the end :D This will lead to weird scenarios where if you have 3 JM and 3 TM replicas, all with `.5` CPU shares, the result will be `4.5` as total CPUs. An easy solution might be to just retrieve the number of TMs and multiply it with the CPU defined in the CR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org