Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-10 Thread via GitHub


shahar1 commented on PR #53368:
URL: https://github.com/apache/airflow/pull/53368#issuecomment-3732364447

   Drafting the PR following the author's request to make some changes, please 
do not merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-05 Thread via GitHub


stephen-bracken commented on PR #53368:
URL: https://github.com/apache/airflow/pull/53368#issuecomment-3709840015

   > @stephen-bracken / @shahar1 as concerns raised in 
https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899 - one 
thing to consider maybe is - I'd propose to - adding a newsfragment such that 
it is highlighted in the changelog of the provider.
   > 
   > TLDR: Requesting to add a newsfragment to highlight this interface change.
   > 
   > UPDATE: Args, providers have no newsfragmen, add it to changelogs like in 
https://github.com/apache/airflow/pull/59143/changes#diff-24cff4e7b7926b95f4efef45da9f9d6f43b237b5143990b1554113251cb2c12eR30
 for example that it is included in next providers wave.
   
   @jscheffl Thanks, added a changelog note.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-03 Thread via GitHub


jscheffl commented on PR #53368:
URL: https://github.com/apache/airflow/pull/53368#issuecomment-3707014488

   @stephen-bracken / @shahar1 as concerns raised in 
https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899 - one 
thing to consider maybe is - I'd propose to - adding a newsfragment such that 
it is highlighted in the changelog of the provider.
   
   TLDR: Requesting to add a newsfragment to highlight this interface change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-03 Thread via GitHub


shahar1 commented on PR #53368:
URL: https://github.com/apache/airflow/pull/53368#issuecomment-3706900678

   LGTM, I'm making sure with the other contributors that handling `parallelism 
= 0` case makes sense:
   https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899
   
   If no strong objections are given in the next 1-2 days, or there are 
additional approvals for this PR by then - I'm ok with merging it as-is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-02 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2657435077


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:
+if len(pod_list) == self.parallelism:
+break
 pod_list = self.client.list_namespaced_pod(
 namespace=pod_request_obj.metadata.namespace,
 label_selector=label_selector,

Review Comment:
   This is not relevant to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2026-01-02 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2657429368


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:
+if len(pod_list) == self.parallelism:
+break
 pod_list = self.client.list_namespaced_pod(
 namespace=pod_request_obj.metadata.namespace,
 label_selector=label_selector,

Review Comment:
   This is out of scope for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-23 Thread via GitHub


shahar1 commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2644092018


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -115,7 +115,7 @@ def __init__(
 completion_mode: str | None = None,
 completions: int | None = None,
 manual_selector: bool | None = None,
-parallelism: int | None = None,
+parallelism: int = 1,

Review Comment:
   +1 for that.
   Also, I would like to revise my earlier review: setting `parallelism = 0` as 
an equivalent for `parallelism = 1` seems to be confusing. 
   If we take `parallism = 0` meaning as-is, it means that no pods should start 
at all, and I don't find it practical.
   For that reason:
   1. I suggest to throw an error when `parallelism < 1`.
   2. If `paralelism is None` - set it to `1` and raise a `DeprecationWarning`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-23 Thread via GitHub


shahar1 commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2644093472


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -81,7 +81,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
 :param completion_mode: CompletionMode specifies how Pod completions are 
tracked. It can be `NonIndexed` (default) or `Indexed`.
 :param completions: Specifies the desired number of successfully finished 
pods the job should be run with.
 :param manual_selector: manualSelector controls generation of pod labels 
and pod selectors.
-:param parallelism: Specifies the maximum desired number of pods the job 
should run at any given time.
+:param parallelism: Specifies the maximum desired number of pods the job 
should run at any given time. Defaults to 1

Review Comment:
   See my other comments - it should be mentioned that the value should be `1` 
(default) or larger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-23 Thread via GitHub


shahar1 commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2644093472


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -81,7 +81,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
 :param completion_mode: CompletionMode specifies how Pod completions are 
tracked. It can be `NonIndexed` (default) or `Indexed`.
 :param completions: Specifies the desired number of successfully finished 
pods the job should be run with.
 :param manual_selector: manualSelector controls generation of pod labels 
and pod selectors.
-:param parallelism: Specifies the maximum desired number of pods the job 
should run at any given time.
+:param parallelism: Specifies the maximum desired number of pods the job 
should run at any given time. Defaults to 1

Review Comment:
   See my other comments, it should be `>1`



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -115,7 +115,7 @@ def __init__(
 completion_mode: str | None = None,
 completions: int | None = None,
 manual_selector: bool | None = None,
-parallelism: int | None = None,
+parallelism: int = 1,

Review Comment:
   +1 for that.
   Also, I would like to review my earlier review, and setting `parallelism = 
0` as an equivalent for `parallelism = 1` is indeed confusing. 
   If we take `parallism = 0` as-is is, it means that no pods should start at 
all, and I don't find it practical.
   For that reason:
   1. I suggest to throw an error when `parallelism < 1`.
   2. If `paralelism is None` - set it to `1` and raise a `DeprecationWarning`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-23 Thread via GitHub


Copilot commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2644067916


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -115,7 +115,7 @@ def __init__(
 completion_mode: str | None = None,
 completions: int | None = None,
 manual_selector: bool | None = None,
-parallelism: int | None = None,
+parallelism: int = 1,

Review Comment:
   Changing the default value of `parallelism` from `None` to `1` is a breaking 
change that could affect existing users. Users who previously relied on 
`parallelism=None` to have different behavior may experience unexpected 
changes. This should be documented in a newsfragment file as mentioned in the 
PR guidelines, particularly in a `.significant.rst` file to notify users of 
backwards incompatible changes.



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:
+if len(pod_list) == self.parallelism:
+break
 pod_list = self.client.list_namespaced_pod(
 namespace=pod_request_obj.metadata.namespace,
 label_selector=label_selector,

Review Comment:
   The loop logic has a potential issue where `pod_list` is assigned inside the 
loop after checking its length. This means that if the initial `pod_list` is 
empty (as initialized on line 452), the first iteration will always execute the 
list operation, which is correct. However, if fewer pods than expected are 
found, the loop continues retrying without any delay, which could result in 
rapid API calls. Consider adding a sleep between iterations to avoid 
overwhelming the Kubernetes API server.



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:
+if len(pod_list) == self.parallelism:
+break
 pod_list = self.client.list_namespaced_pod(
 namespace=pod_request_obj.metadata.namespace,
 label_selector=label_selector,

Review Comment:
   The error message at line 465 states "No pods running with labels" but this 
might not be accurate. The pods could exist but not be in a running state yet 
(e.g., pending, initializing). Consider updating the error message to be more 
accurate, such as "Failed to find expected number of pods with labels" or "No 
pods found with labels" to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-20 Thread via GitHub


rachthree commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2637298614


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:

Review Comment:
   Sounds good, thanks! Will there be a follow up PR for that? If not, no 
worries, I can try picking that up if that's alright.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-18 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2626369806


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:

Review Comment:
   Hi @rachthree, good question. I think this sounds reasonable but is outside 
of the scope of this PR. My suggestion is that there should be a timeout on 
`get_pods()` that is similar to the timeout in the `KubernetesPodOperator` for 
pods not being ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-17 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2626369806


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:

Review Comment:
   Hi @rachtree, good question. I think this sounds reasonable but is outside 
of the scope of this PR. My suggestion is that there should be a timeout on 
`get_pods()` that is similar to the timeout in the `KubernetesPodOperator` for 
pods not being ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-16 Thread via GitHub


rachthree commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2624402228


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -461,7 +452,9 @@ def get_pods(
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while retry_number <= self.discover_pods_retry_number:

Review Comment:
   First off, thank you @stephen-bracken for looking into this!
   
   My question here is for clusters with limited resources and have Kueue 
enabled. The k8s Job will eventually spool up the pods but it will take a while 
if there is resource contention. In our case, it's a GPU cluster and many jobs 
reserve GPUs with potentially long runtimes. Could a configurable timeout 
instead of a retry number be used here? Or, don't have a retry number / 
timeout, and rely on DAG / task timeout instead (which is what my team 
preferred and is inherent in the solution I applied locally).
   
   Thanks again!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-15 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2618902991


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##


Review Comment:
   Won't `parallelism = 0` always throw an error in `get_pods()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-15 Thread via GitHub


stephen-bracken commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2618779389


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##


Review Comment:
   I've changed the default value to 1 to reduce confusion



##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -459,8 +450,9 @@ def get_pods(
 label_selector = self._build_find_pod_label_selector(context, 
exclude_checked=exclude_checked)
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
+parallelism = self.parallelism or 1  # Default to using single pod 
parallelism
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while len(pod_list) != parallelism or retry_number <= 
self.discover_pods_retry_number:

Review Comment:
   I've fixed the loop invariant by splitting out the break condition here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-06 Thread via GitHub


shahar1 commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2594682670


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##


Review Comment:
   Docstring should be updated to reflect that the default value (`None`), as 
well as falsy value (`0`), are the same as `1` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-06 Thread via GitHub


Copilot commented on code in PR #53368:
URL: https://github.com/apache/airflow/pull/53368#discussion_r2594686671


##
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py:
##
@@ -459,8 +450,9 @@ def get_pods(
 label_selector = self._build_find_pod_label_selector(context, 
exclude_checked=exclude_checked)
 pod_list: Sequence[k8s.V1Pod] = []
 retry_number: int = 0
+parallelism = self.parallelism or 1  # Default to using single pod 
parallelism
 
-while len(pod_list) != self.parallelism or retry_number <= 
self.discover_pods_retry_number:
+while len(pod_list) != parallelism or retry_number <= 
self.discover_pods_retry_number:

Review Comment:
   The while loop condition uses `or` but should use `and`. With the current 
logic, the loop will continue as long as either `len(pod_list) != parallelism` 
OR `retry_number <= self.discover_pods_retry_number` is true. This means:
   1. If pods are found matching the parallelism count, the loop continues as 
long as `retry_number <= self.discover_pods_retry_number`
   2. If the retry number exceeds the limit but pods don't match, it still 
continues
   
   The correct logic should be: `while len(pod_list) != parallelism and 
retry_number <= self.discover_pods_retry_number:` to retry finding pods until 
either the expected count is found OR the retry limit is reached.
   ```suggestion
   while len(pod_list) != parallelism and retry_number <= 
self.discover_pods_retry_number:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]

2025-12-03 Thread via GitHub


stephen-bracken commented on PR #53368:
URL: https://github.com/apache/airflow/pull/53368#issuecomment-3606707268

   @shahar1 revisiting this I think the simplest solution is to use 
`get_pods()` even if parallelism isn't set. We can assume `parallelism=None` 
means single pod executions here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]