Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-24 Thread via GitHub


shahar1 merged PR #49899:
URL: https://github.com/apache/airflow/pull/49899


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-24 Thread via GitHub


VladaZakharova commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-3112701134

   @shahar1 
   hi there! do you have some other questions for this change? can we merge it?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-22 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r183717


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -81,6 +82,15 @@ class KubernetesJobOperator(KubernetesPodOperator):
 Used if the parameter `wait_until_job_complete` set True.
 :param deferrable: Run operator in the deferrable mode. Note that the 
parameter
 `wait_until_job_complete` must be set True.
+:param on_kill_propagation_policy: Whether and how garbage collection will 
be performed. Default is 'Foreground'.
+Acceptable values are:
+'Orphan' - orphan the dependents;
+'Background' - allow the garbage collector to delete the dependents in 
the background;
+'Foreground' - a cascading policy that deletes all dependents in the 
foreground.
+Default value is 'Foreground'.
+:param discover_pods_retry_number: Number of time list_namespaced_pod will 
be performed to discover
+already running pods.
+:param unwrap_single: Unwrap single result from the pod. Default is True 
to support backward compatibility.

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-19 Thread via GitHub


shahar1 commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2217343046


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -81,6 +82,15 @@ class KubernetesJobOperator(KubernetesPodOperator):
 Used if the parameter `wait_until_job_complete` set True.
 :param deferrable: Run operator in the deferrable mode. Note that the 
parameter
 `wait_until_job_complete` must be set True.
+:param on_kill_propagation_policy: Whether and how garbage collection will 
be performed. Default is 'Foreground'.
+Acceptable values are:
+'Orphan' - orphan the dependents;
+'Background' - allow the garbage collector to delete the dependents in 
the background;
+'Foreground' - a cascading policy that deletes all dependents in the 
foreground.
+Default value is 'Foreground'.
+:param discover_pods_retry_number: Number of time list_namespaced_pod will 
be performed to discover
+already running pods.
+:param unwrap_single: Unwrap single result from the pod. Default is True 
to support backward compatibility.

Review Comment:
   Maybe add a usage example, something like: 
   
   ```text
   For example, if the XCom result is `['res']`, the final output would be 
`'res'`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-19 Thread via GitHub


shahar1 commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2217343046


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -81,6 +82,15 @@ class KubernetesJobOperator(KubernetesPodOperator):
 Used if the parameter `wait_until_job_complete` set True.
 :param deferrable: Run operator in the deferrable mode. Note that the 
parameter
 `wait_until_job_complete` must be set True.
+:param on_kill_propagation_policy: Whether and how garbage collection will 
be performed. Default is 'Foreground'.
+Acceptable values are:
+'Orphan' - orphan the dependents;
+'Background' - allow the garbage collector to delete the dependents in 
the background;
+'Foreground' - a cascading policy that deletes all dependents in the 
foreground.
+Default value is 'Foreground'.
+:param discover_pods_retry_number: Number of time list_namespaced_pod will 
be performed to discover
+already running pods.
+:param unwrap_single: Unwrap single result from the pod. Default is True 
to support backward compatibility.

Review Comment:
   Maybe add a usage example, something like: 
   
   ```text
   For example, when set to `True` - if the XCom result should be `['res']`, 
the final resullt would be `'res'`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-15 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2208171354


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##
@@ -69,7 +73,18 @@ def __init__(
 super().__init__()
 self.job_name = job_name
 self.job_namespace = job_namespace
-self.pod_name = pod_name
+if pod_name is not None:
+warnings.warn(
+"`pod_name` parameter is deprecated, please use `pod_names`",
+AirflowProviderDeprecationWarning,
+stacklevel=2,
+)
+self.pod_name = pod_name

Review Comment:
   Done! Thank you for your review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-15 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2208167070


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +176,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None
-if self.pod is None:
-self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
+self.pods: Sequence[k8s.V1Pod] | None = None
+if self.parallelism is None and self.pod is None:
+self.pod = self.get_or_create_pod(
 pod_request_obj=self.pod_request_obj,
 context=context,
 )
+self.pods = [
+self.pod,
+]

Review Comment:
   Thank you for your advice! Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-15 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2208164767


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -400,6 +419,29 @@ def reconcile_job_specs(
 
 return None
 
+def get_pods(
+self, pod_request_obj: k8s.V1Pod, context: Context, *, 
exclude_checked: bool = True
+) -> Sequence[k8s.V1Pod]:
+"""Return an already-running pods if exists."""
+label_selector = self._build_find_pod_label_selector(context, 
exclude_checked=exclude_checked)
+pod_list: Sequence[k8s.V1Pod] = []
+retry_number: int = 0
+
+while len(pod_list) != self.parallelism or retry_number <= 3:

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-13 Thread via GitHub


berglh commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-3067451105

   @Crowiant could you please address issues raised by shahar1 review when you 
get a chance? Thank-you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-08 Thread via GitHub


shahar1 commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2192570367


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -400,6 +419,29 @@ def reconcile_job_specs(
 
 return None
 
+def get_pods(
+self, pod_request_obj: k8s.V1Pod, context: Context, *, 
exclude_checked: bool = True
+) -> Sequence[k8s.V1Pod]:
+"""Return an already-running pods if exists."""
+label_selector = self._build_find_pod_label_selector(context, 
exclude_checked=exclude_checked)
+pod_list: Sequence[k8s.V1Pod] = []
+retry_number: int = 0
+
+while len(pod_list) != self.parallelism or retry_number <= 3:

Review Comment:
   Could you please make the max. `retry_number` as a parameter instead of 
harcoding it as `3`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-08 Thread via GitHub


shahar1 commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2192614821


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +176,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None
-if self.pod is None:
-self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
+self.pods: Sequence[k8s.V1Pod] | None = None
+if self.parallelism is None and self.pod is None:
+self.pod = self.get_or_create_pod(
 pod_request_obj=self.pod_request_obj,
 context=context,
 )
+self.pods = [
+self.pod,
+]

Review Comment:
   ```suggestion
   self.pods = [self.get_or_create_pod(
   pod_request_obj=self.pod_request_obj,
   context=context,
   )
   ]
   ```
   (please validate proper linting)
   
   Also, please ensure that `self.pod` is being deprecated properly, i.e, if 
being accessed - raise `AirflowProviderDeprecationWarning` + return the value 
of `self.pods[0]`. Conceptual implementation (please modify as necessary and 
add tests):
   
   ```python
   @property
   def pod(self):
   warnings.warn("...", AirflowProviderDeprecationWarning, stacklevel=2)
   return self.pods[0]
   ```



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -400,6 +419,29 @@ def reconcile_job_specs(
 
 return None
 
+def get_pods(
+self, pod_request_obj: k8s.V1Pod, context: Context, *, 
exclude_checked: bool = True
+) -> Sequence[k8s.V1Pod]:
+"""Return an already-running pods if exists."""
+label_selector = self._build_find_pod_label_selector(context, 
exclude_checked=exclude_checked)
+pod_list: Sequence[k8s.V1Pod] = []
+retry_number: int = 0
+
+while len(pod_list) != self.parallelism or retry_number <= 3:

Review Comment:
   Could you please make the max `retry_number` as a parameter?



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##
@@ -69,7 +73,18 @@ def __init__(
 super().__init__()
 self.job_name = job_name
 self.job_namespace = job_namespace
-self.pod_name = pod_name
+if pod_name is not None:
+warnings.warn(
+"`pod_name` parameter is deprecated, please use `pod_names`",
+AirflowProviderDeprecationWarning,
+stacklevel=2,
+)
+self.pod_name = pod_name

Review Comment:
   ```suggestion
   ```
   Instead of initializing it here, make `self.pod_name` a read-only property 
(see previous suggestion for `self.pod` of the operator).



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -232,20 +249,23 @@ def execute_complete(self, context: Context, event: dict, 
**kwargs):
 raise AirflowException(event["message"])
 
 if self.get_logs:
-pod_name = event["pod_name"]
-pod_namespace = event["pod_namespace"]
-self.pod = self.hook.get_pod(pod_name, pod_namespace)
-if not self.pod:
-raise PodNotFoundException("Could not find pod after resuming 
from deferral")
-self._write_logs(self.pod)
+for pod_name in event["pod_names"]:
+pod_namespace = event["pod_namespace"]
+pod = self.hook.get_pod(pod_name, pod_namespace)
+if not pod:
+raise PodNotFoundException("Could not find pod after 
resuming from deferral")
+self._write_logs(pod)
 
 if self.do_xcom_push:
-xcom_result = event["xcom_result"]
-if isinstance(xcom_result, str) and xcom_result.rstrip() == 
EMPTY_XCOM_RESULT:
-self.log.info("xcom result file is empty.")
-return None
-self.log.info("xcom result: \n%s", xcom_result)
-return json.loads(xcom_result)
+xcom_results: list[Any | None] = []
+for xcom_result in event["xcom_result"]:
+if isinstance(xcom_result, str) and xcom_result.rstrip() == 
EMPTY_XCOM_RESULT:
+self.log.info("xcom result file is empty.")
+xcom_results.append(None)
+continue
+self.log.info("xcom result: \n%s", xcom_result)
+xcom_results.append(json.loads(xcom_result))
+return xcom_results[0] if len(xcom_results) == 1 else xcom_results

Review Comment:
   If returning a single entity is for retaining backward compatibilty and/or 
nicer formatting - I'd maybe make it boolean flag as default, e.g.,. :
   
   

Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-06 Thread via GitHub


berglh commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-3043207040

   @hussein-awala, @shahar1, @jedcunningham can someone with review and write 
access please review these changes? 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-04 Thread via GitHub


steinwaywhw commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2185278651


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +170,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None

Review Comment:
   Sorry for the delay. That sounds totally fine. I didn't know the context and 
thanks for explaining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-07-03 Thread via GitHub


Crowiant commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-3031557090

   Hello @steinwaywhw can you please review my answers to your comments? Thank 
you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-20 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2159689072


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -101,6 +101,7 @@ def __init__(
 wait_until_job_complete: bool = False,
 job_poll_interval: float = 10,
 deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+on_kill_propagation_policy: str = "Background",

Review Comment:
   Hello @steinwaywhw thank you for your review! Here I totally agree with you. 
Set the policy to Foreground and updated doc string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-20 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2159691917


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +170,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None

Review Comment:
   I agree, but unfortunately KubernetesJobTrigger rely on self.pod. As 
@shahar1 previously stated: it is better to deprecate slowly. So, here I think 
it is better to remove self.pod attribute right with other deprecation(in 
KubernetesJobTrigger). WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-20 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2159689072


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -101,6 +101,7 @@ def __init__(
 wait_until_job_complete: bool = False,
 job_poll_interval: float = 10,
 deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+on_kill_propagation_policy: str = "Background",

Review Comment:
   Hello @steinwaywhw thank you for your review! Here I totally agree with you. 
Set the policy to Foreground and add updated doc string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-20 Thread via GitHub


shahar1 commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-2992180055

   @Crowiant / @VladaZakharova - could you please refer to @steinwaywhw 's 
comments? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-09 Thread via GitHub


steinwaywhw commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2136430925


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -101,6 +101,7 @@ def __init__(
 wait_until_job_complete: bool = False,
 job_poll_interval: float = 10,
 deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+on_kill_propagation_policy: str = "Background",

Review Comment:
   the old behavior is to call `super().onkill()` to delete the pod, which 
seems to be the same behavior as setting the propagation policy to 
"Foreground", i.e. immediately delete pods that belong to the job. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-09 Thread via GitHub


steinwaywhw commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2136431710


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -101,6 +101,7 @@ def __init__(
 wait_until_job_complete: bool = False,
 job_poll_interval: float = 10,
 deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+on_kill_propagation_policy: str = "Background",

Review Comment:
   please also update the doc string to include this new field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-09 Thread via GitHub


steinwaywhw commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2136416311


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +170,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None

Review Comment:
   this property seems to be internal to the KubernetesJobOperator itself. Is 
that the case? If so does it make sense to remove it and only keep "self.pods"? 
And also unify the cases where parallelism is None or 1 with parallelism > 1? 
   
   One point is that, even if parallelism is none or 1, it should still avoid 
creating a pod. The K8s job will create the job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-09 Thread via GitHub


steinwaywhw commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2136416311


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -168,34 +170,43 @@ def execute(self, context: Context):
 ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
 
 self.pod: k8s.V1Pod | None

Review Comment:
   this property seems to be internal to the KubernetesJobOperator itself. Is 
that the case? If so does it make sense to remove it and only keep "self.pods"? 
And also unify the cases where parallelism is None or 1 with parallelism > 1? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-06 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2132379786


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##
@@ -55,7 +55,7 @@ def __init__(
 self,
 job_name: str,
 job_namespace: str,
-pod_name: str,

Review Comment:
   Done! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-06-06 Thread via GitHub


Crowiant commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2131891272


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##
@@ -55,7 +55,7 @@ def __init__(
 self,
 job_name: str,
 job_namespace: str,
-pod_name: str,

Review Comment:
   Ok, sounds reasonable! Thank you for your comment, I´ll update my PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-05-16 Thread via GitHub


shahar1 commented on code in PR #49899:
URL: https://github.com/apache/airflow/pull/49899#discussion_r2093896582


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##
@@ -55,7 +55,7 @@ def __init__(
 self,
 job_name: str,
 job_namespace: str,
-pod_name: str,

Review Comment:
   Seems like a breaking change - not entirely sure that it is user facing, but 
it could potentially break customizations.
   Let's deprecate it slowly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Handle multiple pods to prevent ```KubernetesJobOperator``` falls with parallelism option [airflow]

2025-05-08 Thread via GitHub


VladaZakharova commented on PR #49899:
URL: https://github.com/apache/airflow/pull/49899#issuecomment-2862497707

   hi @potiuk !
   can you please check changes here? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]