yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple 
DAG parsing loop from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219606731
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -308,6 +369,249 @@ def file_path(self):
         raise NotImplementedError()
 
 
+class DagParsingStat(object):
+    def __init__(self,
+                 file_paths,
+                 all_pids,
+                 done,
+                 all_files_processed,
+                 result_count):
+        self.file_paths = file_paths
+        self.all_pids = all_pids
+        self.done = done
+        self.all_files_processed = all_files_processed
+        self.result_count = result_count
+
+
+class DagParsingSignal(object):
+    AGENT_HEARTBEAT = "agent_heartbeat"
+    MANAGER_DONE = "manager_done"
+    TERMINATE_MANAGER = "terminate_manager"
+    END_MANAGER = "end_manager"
+
+
+class DagFileProcessorAgent(LoggingMixin):
+    """
+    Agent for DAG file processors. It is responsible for all DAG parsing
+    related jobs in scheduler process. Mainly it will collect DAG parsing
+    result from DAG file processor manager and communicate signal/DAG parsing
+    stat with DAG file processor manager.
+    """
+
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 max_runs,
+                 processor_factory,
+                 async_mode):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param max_runs: The number of times to parse and schedule each file. 
-1
+        for unlimited.
+        :type max_runs: int
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode, list) -> 
(AbstractDagFileProcessor)
+        :param async_mode: Whether to start agent in async mode
+        :type async_mode: bool
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._processor_factory = processor_factory
+        self._async_mode = async_mode
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+        # Pids of DAG parse
+        self._all_pids = []
+        # Pipe for communicating signals
+        self._parent_signal_conn, self._child_signal_conn = 
multiprocessing.Pipe()
+        # Pipe for communicating DagParsingStat
+        self._stat_queue = multiprocessing.Queue()
+        self._result_queue = multiprocessing.Queue()
+        self._process = None
+        self._done = False
+        # Initialized as true so we do not deactivate w/o any actual DAG 
parsing.
+        self._all_files_processed = True
+        self._result_count = 0
+
+    def start(self):
+        """
+        Launch DagFileProcessorManager processor and start DAG parsing loop in 
manager.
+        """
+        self._process = self._launch_process(self._dag_directory,
+                                             self._file_paths,
+                                             self._max_runs,
+                                             self._processor_factory,
+                                             self._child_signal_conn,
+                                             self._stat_queue,
+                                             self._result_queue,
+                                             self._async_mode)
+        self.log.info("Launched DagFileProcessorManager with pid: {}"
+                      .format(self._process.pid))
+
+    def heartbeat(self):
+        """
+        Should only be used when launched DAG file processor manager in sync 
mode.
+        Send agent heartbeat signal to the manager.
+        """
+        self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT)
+
+    def wait_until_finished(self):
+        """
+        Should only be used when launched DAG file processor manager in sync 
mode.
+        Wait for done signal from the manager.
+        """
+        while True:
+            if self._parent_signal_conn.poll() \
+                    and self._parent_signal_conn.recv() == 
DagParsingSignal.MANAGER_DONE:
+                break
+            time.sleep(0.1)
+
+    @staticmethod
+    def _launch_process(dag_directory,
+                        file_paths,
+                        max_runs,
+                        processor_factory,
+                        signal_conn,
+                        _stat_queue,
+                        result_queue,
+                        async_mode):
+        def helper():
+            # Reload configurations and settings to avoid collision with 
parent process.
+            # Because this process may need custom configurations that cannot 
be shared,
+            # e.g. RotatingFileHandler. And it can cause connection corruption 
if we
+            # do not recreate the SQLA connection pool.
+            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+            reload(airflow.config_templates.airflow_local_settings)
+            reload(airflow.settings)
 
 Review comment:
   W/o reloading `airflow_local_settings`, the custom logic 
[here](https://github.com/apache/incubator-airflow/blob/c45cc19d87faa49cba1bbb4755940d3113adc522/airflow/config_templates/airflow_local_settings.py#L199-L203)
 would not be evaluated and thus we'll end up reconfigure with the same logging 
config again...
   
   Also as mentioned in the comment, right now the major purposes of reloading 
`settings` is to reload logger and also the connection pool.
   
   Tho I do agree reload is a bit of a kludge--I would avoid it if I had other 
solutions :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to