Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5747f5849 -> 972086aeb


[AIRFLOW-2520] CLI - make backfill less verbose

Used backfill recently and it would log a shit ton
of logging messages
telling me all the tasks that were not ready to
run at every tick.

These messages are not useful and should be muted
by default.

I understand that this may be helpful in the
context of `airflow run`
in the context where dependencies aren't met, so
decided to manage
a flag instead of simply going `logging.debug` on
it.

Closes #3414 from
mistercrunch/backfill_less_verbose


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/972086ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/972086ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/972086ae

Branch: refs/heads/master
Commit: 972086aeba4616843005b25210ba3b2596963d57
Parents: 5747f58
Author: Maxime Beauchemin <maximebeauche...@gmail.com>
Authored: Thu May 24 21:08:35 2018 +0100
Committer: Kaxil Naik <kaxiln...@apache.org>
Committed: Thu May 24 21:08:35 2018 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py | 10 ++++++++--
 airflow/jobs.py    |  8 +++++---
 airflow/models.py  | 28 +++++++++++++++++-----------
 3 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 61d83a8..55201dd 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -198,7 +198,9 @@ def backfill(args, dag=None):
             ignore_first_depends_on_past=args.ignore_first_depends_on_past,
             ignore_task_deps=args.ignore_dependencies,
             pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit)
+            delay_on_limit_secs=args.delay_on_limit,
+            verbose=args.verbose,
+        )
 
 
 @cli_utils.action_logging
@@ -1322,6 +1324,9 @@ class CLIFactory(object):
         'mark_success': Arg(
             ("-m", "--mark_success"),
             "Mark jobs as succeeded without running them", "store_true"),
+        'verbose': Arg(
+            ("-v", "--verbose"),
+            "Make logging output more verbose", "store_true"),
         'local': Arg(
             ("-l", "--local"),
             "Run the task using the LocalExecutor", "store_true"),
@@ -1673,7 +1678,8 @@ class CLIFactory(object):
                 'dag_id', 'task_regex', 'start_date', 'end_date',
                 'mark_success', 'local', 'donot_pickle',
                 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
-                'subdir', 'pool', 'delay_on_limit', 'dry_run')
+                'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose',
+            )
         }, {
             'func': list_tasks,
             'help': "List the tasks within a DAG",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 9d80a79..0391750 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1952,6 +1952,7 @@ class BackfillJob(BaseJob):
             ignore_task_deps=False,
             pool=None,
             delay_on_limit_secs=1.0,
+            verbose=False,
             *args, **kwargs):
         self.dag = dag
         self.dag_id = dag.dag_id
@@ -1963,6 +1964,7 @@ class BackfillJob(BaseJob):
         self.ignore_task_deps = ignore_task_deps
         self.pool = pool
         self.delay_on_limit_secs = delay_on_limit_secs
+        self.verbose = verbose
         super(BackfillJob, self).__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status):
@@ -2257,7 +2259,7 @@ class BackfillJob(BaseJob):
                     if ti.are_dependencies_met(
                             dep_context=backfill_context,
                             session=session,
-                            verbose=True):
+                            verbose=self.verbose):
                         ti.refresh_from_db(lock_for_update=True, 
session=session)
                         if ti.state == State.SCHEDULED or ti.state == 
State.UP_FOR_RETRY:
                             if executor.has_task(ti):
@@ -2364,11 +2366,11 @@ class BackfillJob(BaseJob):
                 t.are_dependencies_met(
                     dep_context=DepContext(ignore_depends_on_past=False),
                     session=session,
-                    verbose=True) !=
+                    verbose=self.verbose) !=
                 t.are_dependencies_met(
                     dep_context=DepContext(ignore_depends_on_past=True),
                     session=session,
-                    verbose=True)
+                    verbose=self.verbose)
                 for t in ti_status.deadlocked)
             if deadlocked_depends_on_past:
                 err += (

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 28138fb..da18ec7 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1272,27 +1272,27 @@ class TaskInstance(Base, LoggingMixin):
         :type dep_context: DepContext
         :param session: database session
         :type session: Session
-        :param verbose: whether or not to print details on failed dependencies
+        :param verbose: whether log details on failed dependencies on
+            info or debug log level
         :type verbose: boolean
         """
         dep_context = dep_context or DepContext()
         failed = False
+        verbose_aware_logger = self.log.info if verbose else self.log.debug
         for dep_status in self.get_failed_dep_statuses(
                 dep_context=dep_context,
                 session=session):
             failed = True
-            if verbose:
-                self.log.info(
-                    "Dependencies not met for %s, dependency '%s' FAILED: %s",
-                    self, dep_status.dep_name, dep_status.reason
-                )
+
+            verbose_aware_logger(
+                "Dependencies not met for %s, dependency '%s' FAILED: %s",
+                self, dep_status.dep_name, dep_status.reason
+            )
 
         if failed:
             return False
 
-        if verbose:
-            self.log.info("Dependencies all met for %s", self)
-
+        verbose_aware_logger("Dependencies all met for %s", self)
         return True
 
     @provide_session
@@ -3961,7 +3961,9 @@ class DAG(BaseDag, LoggingMixin):
             ignore_task_deps=False,
             ignore_first_depends_on_past=False,
             pool=None,
-            delay_on_limit_secs=1.0):
+            delay_on_limit_secs=1.0,
+            verbose=False,
+    ):
         """
         Runs the DAG.
 
@@ -3987,6 +3989,8 @@ class DAG(BaseDag, LoggingMixin):
         :param delay_on_limit_secs: Time in seconds to wait before next 
attempt to run
             dag run when max_active_runs limit has been reached
         :type delay_on_limit_secs: float
+        :param verbose: Make logging output more verbose
+        :type verbose: boolean
         """
         from airflow.jobs import BackfillJob
         if not executor and local:
@@ -4003,7 +4007,9 @@ class DAG(BaseDag, LoggingMixin):
             ignore_task_deps=ignore_task_deps,
             ignore_first_depends_on_past=ignore_first_depends_on_past,
             pool=pool,
-            delay_on_limit_secs=delay_on_limit_secs)
+            delay_on_limit_secs=delay_on_limit_secs,
+            verbose=verbose,
+        )
         job.run()
 
     def cli(self):

Reply via email to