ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215551823
########## File path: airflow/executors/celery_executor.py ########## @@ -63,6 +69,40 @@ def execute_command(command): raise AirflowException('Celery command failed') +class ExceptionWithTraceback(object): + """ + Wrapper class used to propogate exceptions to parent processes from subprocesses. + :param exception: The exception to wrap + :type exception: Exception + :param traceback: The stacktrace to wrap + :type traceback: str + """ + + def __init__(self, exception, exception_traceback): + self.exception = exception + self.traceback = exception_traceback + + +def fetch_celery_task_state(celery_task): + """ + Fetch and return the state of the given celery task. The scope of this function is + global so that it can be called by subprocesses in the pool. + :param celery_task: a tuple of the Celery task key and the async Celery object used + to fetch the task's state + :type celery_task: (str, celery.result.AsyncResult) + :return: a tuple of the Celery task key and the Celery state of the task + :rtype: (str, str) + """ + + try: + res = (celery_task[0], celery_task[1].state) Review comment: I'm guessing that it's accessing the `.state` property that causes Celery to make the network request? Could you add a comment here saying so? (Otherwise this whole function looks odd and pointless ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services