ashb commented on a change in pull request #10949: URL: https://github.com/apache/airflow/pull/10949#discussion_r489544387
########## File path: airflow/executors/celery_executor.py ########## @@ -274,6 +331,61 @@ def execute_async(self, def terminate(self): pass + def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]: + # See which of the TIs are still alive (or have finished even!) + # + # Since Celery doesn't store "SENT" state for queued commands (if we create an AsyncResult with a made + # up id it just returns PENDING state for it), we have to store Celery's task_id against the TI row to + # look at in future. + # + # This process is not perfect -- we could have sent the task to celery, and crashed before we were + # able to record the AsyncResult.task_id in the TaskInstance table, in which case we won't adopt the + # task (it'll either run and update the TI state, or the scheduler will clear and re-queue it. Either + # way it won't get executed more than once) + # + # (If we swapped it around, and generated a task_id for Celery, stored that in TI and enqueued that + # there is also still a race condition where we could generate and store the task_id, but die before + # we managed to enqueue the command. Since neither way is perfect we always have to deal with this + # process not being perfect.) + + celery_tasks = { + ti.external_executor_id: (AsyncResult(ti.external_executor_id), ti) + for ti in tis + if ti.external_executor_id is not None + } + + if not celery_tasks: + # Nothing to adopt + return tis + + states_by_celery_task_id = self.bulk_state_fetcher.get_many( + map(operator.itemgetter(0), celery_tasks.values()) + ) + + not_adopted = [ti for ti in tis if ti.external_executor_id is None] + adopted = [] + cached_celery_backend = next(iter(celery_tasks.values()))[0].backend + + for celery_task_id, (state, info) in states_by_celery_task_id.items(): + result, ti = celery_tasks[celery_task_id] + result.backend = cached_celery_backend + result.backend = cached_celery_backend + + # Set the correct elements of the state dicts, then update this + # like we just queried it. + self.adopted_task_timeouts[ti.key] = ti.queued_dttm + self.adoption_timeout + self.tasks[ti.key] = result + self.running.add(ti.key) + self.update_task_state(ti.key, state, info) + adopted.append(repr(ti) + " in state " + state) Review comment: Done in https://github.com/apache/airflow/pull/10949/commits/e4f915efb2b1bd4165d33425799f3bbffd88caa3 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org