kaxil commented on a change in pull request #6377: URL: https://github.com/apache/airflow/pull/6377#discussion_r425977128
########## File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py ########## @@ -324,3 +316,99 @@ def _set_name(self, name): return None validate_key(name, max_length=220) return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) + + def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]: + ''' + Creates a new pod and monitors for duration of task + + @param labels: + @param launcher: + @return: + ''' + if not (self.full_pod_spec or self.pod_template_file): + # Add Airflow Version to the label + # And a label to identify that pod is launched by KubernetesPodOperator + self.labels.update( + { + 'airflow_version': airflow_version.replace('+', '-'), + 'kubernetes_pod_operator': 'True', + } + ) + self.labels.update(labels) + pod = pod_generator.PodGenerator( + image=self.image, + namespace=self.namespace, + cmds=self.cmds, + args=self.arguments, + labels=self.labels, + name=self.name, + envs=self.env_vars, + extract_xcom=self.do_xcom_push, + image_pull_policy=self.image_pull_policy, + node_selectors=self.node_selectors, + annotations=self.annotations, + affinity=self.affinity, + image_pull_secrets=self.image_pull_secrets, + service_account_name=self.service_account_name, + hostnetwork=self.hostnetwork, + tolerations=self.tolerations, + configmaps=self.configmaps, + security_context=self.security_context, + dnspolicy=self.dnspolicy, + schedulername=self.schedulername, + init_containers=self.init_containers, + restart_policy='Never', + priority_class_name=self.priority_class_name, + pod_template_file=self.pod_template_file, + pod=self.full_pod_spec, + ).gen_pod() + + # noinspection PyTypeChecker + pod = append_to_pod( + pod, + self.pod_runtime_info_envs + # type: ignore + self.ports + # type: ignore + self.resources + # type: ignore + self.secrets + # type: ignore + self.volumes + # type: ignore + self.volume_mounts # type: ignore + ) + + self.pod = pod + + try: + launcher.start_pod( + pod, + startup_timeout=self.startup_timeout_seconds) + final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs) + except AirflowException: + if self.log_events_on_failure: + for event in launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise + finally: + if self.is_delete_operator_pod: + launcher.delete_pod(pod) + return final_state, pod, result + + def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]: + """ + Montitors a pod to completion that was created by a previous KubernetesPodOperator + + :param launcher: + :param pod: Review comment: Same here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org