ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize 
Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215551823
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -63,6 +69,40 @@ def execute_command(command):
         raise AirflowException('Celery command failed')
 
 
+class ExceptionWithTraceback(object):
+    """
+    Wrapper class used to propogate exceptions to parent processes from 
subprocesses.
+    :param exception: The exception to wrap
+    :type exception: Exception
+    :param traceback: The stacktrace to wrap
+    :type traceback: str
+    """
+
+    def __init__(self, exception, exception_traceback):
+        self.exception = exception
+        self.traceback = exception_traceback
+
+
+def fetch_celery_task_state(celery_task):
+    """
+    Fetch and return the state of the given celery task. The scope of this 
function is
+    global so that it can be called by subprocesses in the pool.
+    :param celery_task: a tuple of the Celery task key and the async Celery 
object used
+                        to fetch the task's state
+    :type celery_task: (str, celery.result.AsyncResult)
+    :return: a tuple of the Celery task key and the Celery state of the task
+    :rtype: (str, str)
+    """
+
+    try:
+        res = (celery_task[0], celery_task[1].state)
 
 Review comment:
   I'm guessing that it's accessing the `.state` property that causes Celery to 
make the network request? Could you add a comment here saying so? (Otherwise 
this whole function looks odd and pointless

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to