KinoMin commented on PR #22103:
URL: https://github.com/apache/flink/pull/22103#issuecomment-1514209200
> > We are still using flink1.12. What should I do to use this pr in 1.12 or
1.15? I tried to merge this pr in 1.12 and found that the difference is too big
and it is difficult to merge. If you can, I hope you can guide me. Thanks
>
> ok. and I think it is also simple to use this PR with flink 1.12. I will
give my answer as soon as I can 。
I merged this pr into the release-1.12.7 branch, and it ran successfully in
flink on k8s native session mode, but I don't quite understand this place. When
decorateFlinkPod is executed, this method will set the annotation of deployment
to `scheduling.k8s.io/group-name`:
```java
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
String configuredSchedulerName =
kubernetesComponentConf.getPodSchedulerName();
if (configuredSchedulerName == null
|| configuredSchedulerName.equals(DEFAULT_SCHEDULER_NAME)) {
return flinkPod;
}
final PodBuilder basicPodBuilder = new
PodBuilder(flinkPod.getPodWithoutMainContainer());
String fakename = this.kubernetesComponentConf.getClusterId();
basicPodBuilder
.editOrNewMetadata()
.withAnnotations(
Collections.singletonMap("scheduling.k8s.io/group-name", "pg-" + fakename))
.endMetadata();
return new
FlinkPod.Builder(flinkPod).withPod(basicPodBuilder.build()).build();
}
```
, which is different from the one automatically created when jobmanager
starts, which causes taskmanager to fail to start. This is because the
`scheduling.k8s.io/group-name` of taskmanager and jobmanager are different. , I
tried to debug the startup process of flinksession, and found that flinksession
has been created before podgroup is created. I don't quite understand where the
logic of this creation starts. But when I modify this code, I can start
taskmanager normally
```java
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
String configuredSchedulerName =
kubernetesComponentConf.getPodSchedulerName();
if (configuredSchedulerName == null
|| configuredSchedulerName.equals(DEFAULT_SCHEDULER_NAME)) {
return flinkPod;
}
final PodBuilder basicPodBuilder = new
PodBuilder(flinkPod.getPodWithoutMainContainer());
String fakename = this.kubernetesComponentConf.getClusterId();
PodGroupBuilder podGroupBuilder = new PodGroupBuilder();
podGroupBuilder.editOrNewMetadata().withName("pg-" +
fakename).endMetadata();
Map podGroupConfig =
kubernetesComponentConf.getPodGroupConfig();
for (Map.Entry stringStringEntry :
podGroupConfig.entrySet()) {
switch (stringStringEntry.getKey()) {
case priorityClassName:
podGroupBuilder
.editOrNewSpec()
.withPriorityClassName(stringStringEntry.getValue())
.endSpec();
break;
case minMember:
podGroupBuilder
.editOrNewSpec()
.withMinMember(Integer.parseInt(stringStringEntry.getValue()))
.endSpec();
break;
case queue:
podGroupBuilder
.editOrNewSpec()
.withQueue(stringStringEntry.getValue())
.endSpec();
break;
}
}
return new
FlinkPod.Builder(flinkPod).withPod(basicPodBuilder.build()).build();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org