wangyang0918 commented on a change in pull request #15095:
URL: https://github.com/apache/flink/pull/15095#discussion_r589146194



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
##########
@@ -42,6 +43,17 @@ public boolean isTerminated() {
         return false;
     }
 
+    public boolean isScheduled() {
+        if (getInternalResource().getStatus() != null) {
+            return getInternalResource().getStatus().getConditions().stream()
+                    .anyMatch(
+                            e ->

Review comment:
       This is exactly correct after reading the documentation of Kubernetes[1].
   
   [1]. 
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -235,6 +241,8 @@ public void 
onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
         for (WorkerType worker : recoveredWorkers) {
             final ResourceID resourceId = worker.getResourceID();
             workerNodeMap.put(resourceId, worker);
+            previousAttemptUnregisteredWorkers.add(resourceId);

Review comment:
       Since `onPreviousAttemptWorkersRecovered` is executed before the 
leadship is granted, do we have a chance that the standby JobManagers also want 
to stop/re-request the TaskManager pods? I think this is not the responsibility 
of standby JobManagers.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -202,6 +202,20 @@
     public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX =
             "containerized.taskmanager.env.";
 
+    /** Timeout for TaskManagers to register at the active resource managers. 
*/
+    public static final ConfigOption<Duration> 
TASK_MANAGER_REGISTRATION_TIMEOUT =
+            
ConfigOptions.key("resourcemanager.taskmanager-registration.timeout")
+                    .durationType()
+                    
.defaultValue(TaskManagerOptions.REGISTRATION_TIMEOUT.defaultValue())
+                    
.withDeprecatedKeys(TaskManagerOptions.REGISTRATION_TIMEOUT.key())

Review comment:
       I think you means `withFallbackKeys` here. Right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to