Re: [PR] fix: Prevent duplicate task execution on scheduler crash (Celery exec… [airflow]
github-actions[bot] closed pull request #58896: fix: Prevent duplicate task execution on scheduler crash (Celery exec… URL: https://github.com/apache/airflow/pull/58896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] fix: Prevent duplicate task execution on scheduler crash (Celery exec… [airflow]
github-actions[bot] commented on PR #58896: URL: https://github.com/apache/airflow/pull/58896#issuecomment-3775551766 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] fix: Prevent duplicate task execution on scheduler crash (Celery exec… [airflow]
mykola-shyshov commented on PR #58896: URL: https://github.com/apache/airflow/pull/58896#issuecomment-3620880501 ``` def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: ... for ti in tis: if ti.external_executor_id is not None: celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti) else: not_adopted_tis.append(ti) ``` can be replaced with just ``` def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: ... for ti in tis: celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti) ``` and `try_adopt_task_instances` will be based only task.id. it always will return empty list (what I think expected and okay) and will update task statuses -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] fix: Prevent duplicate task execution on scheduler crash (Celery exec… [airflow]
mykola-shyshov commented on PR #58896: URL: https://github.com/apache/airflow/pull/58896#issuecomment-3620711983 is your point to avoid usage of external_executor_id at all in celery executor? let me check. look like we can do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] fix: Prevent duplicate task execution on scheduler crash (Celery exec… [airflow]
mykola-shyshov commented on PR #58896: URL: https://github.com/apache/airflow/pull/58896#issuecomment-3620708957 > My idea btw was to use this https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply -- note the `task_id` parameter there -- in Airflow 3.0 and above I think we can use `task.id` there, which is unique per task try in Airflow @ashb I do exactly what you are saying https://github.com/apache/airflow/pull/58896/files#diff-532841407e041661f1efafffbd1fa7e56334c394e92d9910577c14e36ea28370R286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
