Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on PR #53368: URL: https://github.com/apache/airflow/pull/53368#issuecomment-3732364447 Drafting the PR following the author's request to make some changes, please do not merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on PR #53368: URL: https://github.com/apache/airflow/pull/53368#issuecomment-3709840015 > @stephen-bracken / @shahar1 as concerns raised in https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899 - one thing to consider maybe is - I'd propose to - adding a newsfragment such that it is highlighted in the changelog of the provider. > > TLDR: Requesting to add a newsfragment to highlight this interface change. > > UPDATE: Args, providers have no newsfragmen, add it to changelogs like in https://github.com/apache/airflow/pull/59143/changes#diff-24cff4e7b7926b95f4efef45da9f9d6f43b237b5143990b1554113251cb2c12eR30 for example that it is included in next providers wave. @jscheffl Thanks, added a changelog note. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
jscheffl commented on PR #53368: URL: https://github.com/apache/airflow/pull/53368#issuecomment-3707014488 @stephen-bracken / @shahar1 as concerns raised in https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899 - one thing to consider maybe is - I'd propose to - adding a newsfragment such that it is highlighted in the changelog of the provider. TLDR: Requesting to add a newsfragment to highlight this interface change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on PR #53368: URL: https://github.com/apache/airflow/pull/53368#issuecomment-3706900678 LGTM, I'm making sure with the other contributors that handling `parallelism = 0` case makes sense: https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1767429130216899 If no strong objections are given in the next 1-2 days, or there are additional approvals for this PR by then - I'm ok with merging it as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2657435077 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: +if len(pod_list) == self.parallelism: +break pod_list = self.client.list_namespaced_pod( namespace=pod_request_obj.metadata.namespace, label_selector=label_selector, Review Comment: This is not relevant to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2657429368 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: +if len(pod_list) == self.parallelism: +break pod_list = self.client.list_namespaced_pod( namespace=pod_request_obj.metadata.namespace, label_selector=label_selector, Review Comment: This is out of scope for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2644092018 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -115,7 +115,7 @@ def __init__( completion_mode: str | None = None, completions: int | None = None, manual_selector: bool | None = None, -parallelism: int | None = None, +parallelism: int = 1, Review Comment: +1 for that. Also, I would like to revise my earlier review: setting `parallelism = 0` as an equivalent for `parallelism = 1` seems to be confusing. If we take `parallism = 0` meaning as-is, it means that no pods should start at all, and I don't find it practical. For that reason: 1. I suggest to throw an error when `parallelism < 1`. 2. If `paralelism is None` - set it to `1` and raise a `DeprecationWarning`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2644093472 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -81,7 +81,7 @@ class KubernetesJobOperator(KubernetesPodOperator): :param completion_mode: CompletionMode specifies how Pod completions are tracked. It can be `NonIndexed` (default) or `Indexed`. :param completions: Specifies the desired number of successfully finished pods the job should be run with. :param manual_selector: manualSelector controls generation of pod labels and pod selectors. -:param parallelism: Specifies the maximum desired number of pods the job should run at any given time. +:param parallelism: Specifies the maximum desired number of pods the job should run at any given time. Defaults to 1 Review Comment: See my other comments - it should be mentioned that the value should be `1` (default) or larger -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2644093472 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -81,7 +81,7 @@ class KubernetesJobOperator(KubernetesPodOperator): :param completion_mode: CompletionMode specifies how Pod completions are tracked. It can be `NonIndexed` (default) or `Indexed`. :param completions: Specifies the desired number of successfully finished pods the job should be run with. :param manual_selector: manualSelector controls generation of pod labels and pod selectors. -:param parallelism: Specifies the maximum desired number of pods the job should run at any given time. +:param parallelism: Specifies the maximum desired number of pods the job should run at any given time. Defaults to 1 Review Comment: See my other comments, it should be `>1` ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -115,7 +115,7 @@ def __init__( completion_mode: str | None = None, completions: int | None = None, manual_selector: bool | None = None, -parallelism: int | None = None, +parallelism: int = 1, Review Comment: +1 for that. Also, I would like to review my earlier review, and setting `parallelism = 0` as an equivalent for `parallelism = 1` is indeed confusing. If we take `parallism = 0` as-is is, it means that no pods should start at all, and I don't find it practical. For that reason: 1. I suggest to throw an error when `parallelism < 1`. 2. If `paralelism is None` - set it to `1` and raise a `DeprecationWarning`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
Copilot commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2644067916 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -115,7 +115,7 @@ def __init__( completion_mode: str | None = None, completions: int | None = None, manual_selector: bool | None = None, -parallelism: int | None = None, +parallelism: int = 1, Review Comment: Changing the default value of `parallelism` from `None` to `1` is a breaking change that could affect existing users. Users who previously relied on `parallelism=None` to have different behavior may experience unexpected changes. This should be documented in a newsfragment file as mentioned in the PR guidelines, particularly in a `.significant.rst` file to notify users of backwards incompatible changes. ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: +if len(pod_list) == self.parallelism: +break pod_list = self.client.list_namespaced_pod( namespace=pod_request_obj.metadata.namespace, label_selector=label_selector, Review Comment: The loop logic has a potential issue where `pod_list` is assigned inside the loop after checking its length. This means that if the initial `pod_list` is empty (as initialized on line 452), the first iteration will always execute the list operation, which is correct. However, if fewer pods than expected are found, the loop continues retrying without any delay, which could result in rapid API calls. Consider adding a sleep between iterations to avoid overwhelming the Kubernetes API server. ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: +if len(pod_list) == self.parallelism: +break pod_list = self.client.list_namespaced_pod( namespace=pod_request_obj.metadata.namespace, label_selector=label_selector, Review Comment: The error message at line 465 states "No pods running with labels" but this might not be accurate. The pods could exist but not be in a running state yet (e.g., pending, initializing). Consider updating the error message to be more accurate, such as "Failed to find expected number of pods with labels" or "No pods found with labels" to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
rachthree commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2637298614 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: Review Comment: Sounds good, thanks! Will there be a follow up PR for that? If not, no worries, I can try picking that up if that's alright. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2626369806 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: Review Comment: Hi @rachthree, good question. I think this sounds reasonable but is outside of the scope of this PR. My suggestion is that there should be a timeout on `get_pods()` that is similar to the timeout in the `KubernetesPodOperator` for pods not being ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2626369806 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: Review Comment: Hi @rachtree, good question. I think this sounds reasonable but is outside of the scope of this PR. My suggestion is that there should be a timeout on `get_pods()` that is similar to the timeout in the `KubernetesPodOperator` for pods not being ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
rachthree commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2624402228 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -461,7 +452,9 @@ def get_pods( pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while retry_number <= self.discover_pods_retry_number: Review Comment: First off, thank you @stephen-bracken for looking into this! My question here is for clusters with limited resources and have Kueue enabled. The k8s Job will eventually spool up the pods but it will take a while if there is resource contention. In our case, it's a GPU cluster and many jobs reserve GPUs with potentially long runtimes. Could a configurable timeout instead of a retry number be used here? Or, don't have a retry number / timeout, and rely on DAG / task timeout instead (which is what my team preferred and is inherent in the solution I applied locally). Thanks again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2618902991 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## Review Comment: Won't `parallelism = 0` always throw an error in `get_pods()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2618779389 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## Review Comment: I've changed the default value to 1 to reduce confusion ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -459,8 +450,9 @@ def get_pods( label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 +parallelism = self.parallelism or 1 # Default to using single pod parallelism -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while len(pod_list) != parallelism or retry_number <= self.discover_pods_retry_number: Review Comment: I've fixed the loop invariant by splitting out the break condition here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
shahar1 commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2594682670 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## Review Comment: Docstring should be updated to reflect that the default value (`None`), as well as falsy value (`0`), are the same as `1` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
Copilot commented on code in PR #53368: URL: https://github.com/apache/airflow/pull/53368#discussion_r2594686671 ## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py: ## @@ -459,8 +450,9 @@ def get_pods( label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) pod_list: Sequence[k8s.V1Pod] = [] retry_number: int = 0 +parallelism = self.parallelism or 1 # Default to using single pod parallelism -while len(pod_list) != self.parallelism or retry_number <= self.discover_pods_retry_number: +while len(pod_list) != parallelism or retry_number <= self.discover_pods_retry_number: Review Comment: The while loop condition uses `or` but should use `and`. With the current logic, the loop will continue as long as either `len(pod_list) != parallelism` OR `retry_number <= self.discover_pods_retry_number` is true. This means: 1. If pods are found matching the parallelism count, the loop continues as long as `retry_number <= self.discover_pods_retry_number` 2. If the retry number exceeds the limit but pods don't match, it still continues The correct logic should be: `while len(pod_list) != parallelism and retry_number <= self.discover_pods_retry_number:` to retry finding pods until either the expected count is found OR the retry limit is reached. ```suggestion while len(pod_list) != parallelism and retry_number <= self.discover_pods_retry_number: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] remove references to KubernetesJobOperator.get_or_create_pod() to fix creating duplicate pods [airflow]
stephen-bracken commented on PR #53368: URL: https://github.com/apache/airflow/pull/53368#issuecomment-3606707268 @shahar1 revisiting this I think the simplest solution is to use `get_pods()` even if parallelism isn't set. We can assume `parallelism=None` means single pod executions here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
