[ 
https://issues.apache.org/jira/browse/AIRFLOW-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698330#comment-16698330
 ] 

ASF GitHub Bot commented on AIRFLOW-1561:
-----------------------------------------

Fokko closed pull request #2635: [AIRFLOW-1561] Fix scheduler to pick up 
example DAGs without other DAGs
URL: https://github.com/apache/incubator-airflow/pull/2635
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index 9e68fad797..bcaf93b533 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1675,7 +1675,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
         simple_dags = []
 
         try:
-            dagbag = models.DagBag(file_path)
+            dagbag = models.DagBag(file_path, include_examples=False)
         except Exception:
             self.log.exception("Failed at reloading the DAG file %s", 
file_path)
             Stats.incr('dag_file_refresh_error', 1, 1)
diff --git a/airflow/models.py b/airflow/models.py
index 95ce629d3b..d505d9479e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -292,12 +292,7 @@ def __init__(
         self.import_errors = {}
         self.has_logged = False
 
-        if include_examples:
-            example_dag_folder = os.path.join(
-                os.path.dirname(__file__),
-                'example_dags')
-            self.collect_dags(example_dag_folder)
-        self.collect_dags(dag_folder)
+        self.collect_dags(dag_folder, include_examples)
 
     def size(self):
         """
@@ -531,7 +526,8 @@ def bag_dag(self, dag, parent_dag, root_dag):
     def collect_dags(
             self,
             dag_folder=None,
-            only_if_updated=True):
+            only_if_updated=True,
+            include_examples=configuration.conf.getboolean('core', 
'LOAD_EXAMPLES')):
         """
         Given a file path or a folder, this method looks for python modules,
         imports them and adds them to the dagbag collection.
@@ -551,7 +547,7 @@ def collect_dags(
         stats = []
         FileLoadStat = namedtuple(
             'FileLoadStat', "file duration dag_num task_num dags")
-        for filepath in list_py_file_paths(dag_folder):
+        for filepath in list_py_file_paths(dag_folder, include_examples):
             try:
                 ts = timezone.utcnow()
                 found_dags = self.process_file(
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 47f473e9aa..1b2eeeff2a 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -215,7 +215,8 @@ def get_dag(self, dag_id):
         return self.dag_id_to_simple_dag[dag_id]
 
 
-def list_py_file_paths(directory, safe_mode=True):
+def list_py_file_paths(directory, safe_mode=True,
+                       include_examples=conf.getboolean('core', 
'LOAD_EXAMPLES')):
     """
     Traverse a directory and look for Python files.
 
@@ -284,6 +285,10 @@ def list_py_file_paths(directory, safe_mode=True):
                 except Exception:
                     log = LoggingMixin().log
                     log.exception("Error while examining %s", f)
+    if include_examples:
+        import airflow.example_dags
+        example_dag_folder = airflow.example_dags.__path__[0]
+        file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, 
False))
     return file_paths
 
 
diff --git a/tests/test_jobs.py b/tests/test_jobs.py
index af8ccc6c2e..a0b0c222fe 100644
--- a/tests/test_jobs.py
+++ b/tests/test_jobs.py
@@ -42,6 +42,7 @@
 from airflow import AirflowException, settings, models
 from airflow import configuration
 from airflow.bin import cli
+import airflow.example_dags
 from airflow.executors import BaseExecutor, SequentialExecutor
 from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance 
as TI
@@ -3335,7 +3336,18 @@ def test_list_py_file_paths(self):
                 if file_name not in ignored_files:
                     expected_files.add(
                         '{}/{}'.format(TEST_DAGS_FOLDER, file_name))
-        for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
+        for file_path in list_py_file_paths(TEST_DAGS_FOLDER, 
include_examples=False):
+            detected_files.add(file_path)
+        self.assertEqual(detected_files, expected_files)
+
+        example_dag_folder = airflow.example_dags.__path__[0]
+        for root, dirs, files in os.walk(example_dag_folder):
+            for file_name in files:
+                if file_name.endswith('.py') or file_name.endswith('.zip'):
+                    if file_name not in ['__init__.py']:
+                        expected_files.add(os.path.join(root, file_name))
+        detected_files.clear()
+        for file_path in list_py_file_paths(TEST_DAGS_FOLDER, 
include_examples=True):
             detected_files.add(file_path)
         self.assertEqual(detected_files, expected_files)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Scheduler doesn't pick up example dags unless there is atleast 1 dag in dags 
> folder
> -----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1561
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1561
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.2
>            Reporter: Sumit Maheshwari
>            Assignee: Shintaro Murakami
>            Priority: Major
>             Fix For: 2.0.0
>
>         Attachments: airflow_scheduler_log.txt
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to