KinoMin commented on PR #22103: URL: https://github.com/apache/flink/pull/22103#issuecomment-1514209200
> > We are still using flink1.12. What should I do to use this pr in 1.12 or 1.15? I tried to merge this pr in 1.12 and found that the difference is too big and it is difficult to merge. If you can, I hope you can guide me. Thanks > > ok. and I think it is also simple to use this PR with flink 1.12. I will give my answer as soon as I can 。 I merged this pr into the release-1.12.7 branch, and it ran successfully in flink on k8s native session mode, but I don't quite understand this place. When decorateFlinkPod is executed, this method will set the annotation of deployment to `scheduling.k8s.io/group-name`: ```java @Override public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { String configuredSchedulerName = kubernetesComponentConf.getPodSchedulerName(); if (configuredSchedulerName == null || configuredSchedulerName.equals(DEFAULT_SCHEDULER_NAME)) { return flinkPod; } final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); String fakename = this.kubernetesComponentConf.getClusterId(); basicPodBuilder .editOrNewMetadata() .withAnnotations( Collections.singletonMap("scheduling.k8s.io/group-name", "pg-" + fakename)) .endMetadata(); return new FlinkPod.Builder(flinkPod).withPod(basicPodBuilder.build()).build(); } ``` , which is different from the one automatically created when jobmanager starts, which causes taskmanager to fail to start. This is because the `scheduling.k8s.io/group-name` of taskmanager and jobmanager are different. , I tried to debug the startup process of flinksession, and found that flinksession has been created before podgroup is created. I don't quite understand where the logic of this creation starts. But when I modify this code, I can start taskmanager normally ```java @Override public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { String configuredSchedulerName = kubernetesComponentConf.getPodSchedulerName(); if (configuredSchedulerName == null || configuredSchedulerName.equals(DEFAULT_SCHEDULER_NAME)) { return flinkPod; } final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); String fakename = this.kubernetesComponentConf.getClusterId(); PodGroupBuilder podGroupBuilder = new PodGroupBuilder(); podGroupBuilder.editOrNewMetadata().withName("pg-" + fakename).endMetadata(); Map<String, String> podGroupConfig = kubernetesComponentConf.getPodGroupConfig(); for (Map.Entry<String, String> stringStringEntry : podGroupConfig.entrySet()) { switch (stringStringEntry.getKey()) { case priorityClassName: podGroupBuilder .editOrNewSpec() .withPriorityClassName(stringStringEntry.getValue()) .endSpec(); break; case minMember: podGroupBuilder .editOrNewSpec() .withMinMember(Integer.parseInt(stringStringEntry.getValue())) .endSpec(); break; case queue: podGroupBuilder .editOrNewSpec() .withQueue(stringStringEntry.getValue()) .endSpec(); break; } } return new FlinkPod.Builder(flinkPod).withPod(basicPodBuilder.build()).build(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org