Repository: incubator-airflow Updated Branches: refs/heads/master 5747f5849 -> 972086aeb
[AIRFLOW-2520] CLI - make backfill less verbose Used backfill recently and it would log a shit ton of logging messages telling me all the tasks that were not ready to run at every tick. These messages are not useful and should be muted by default. I understand that this may be helpful in the context of `airflow run` in the context where dependencies aren't met, so decided to manage a flag instead of simply going `logging.debug` on it. Closes #3414 from mistercrunch/backfill_less_verbose Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/972086ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/972086ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/972086ae Branch: refs/heads/master Commit: 972086aeba4616843005b25210ba3b2596963d57 Parents: 5747f58 Author: Maxime Beauchemin <maximebeauche...@gmail.com> Authored: Thu May 24 21:08:35 2018 +0100 Committer: Kaxil Naik <kaxiln...@apache.org> Committed: Thu May 24 21:08:35 2018 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 10 ++++++++-- airflow/jobs.py | 8 +++++--- airflow/models.py | 28 +++++++++++++++++----------- 3 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 61d83a8..55201dd 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -198,7 +198,9 @@ def backfill(args, dag=None): ignore_first_depends_on_past=args.ignore_first_depends_on_past, ignore_task_deps=args.ignore_dependencies, pool=args.pool, - delay_on_limit_secs=args.delay_on_limit) + delay_on_limit_secs=args.delay_on_limit, + verbose=args.verbose, + ) @cli_utils.action_logging @@ -1322,6 +1324,9 @@ class CLIFactory(object): 'mark_success': Arg( ("-m", "--mark_success"), "Mark jobs as succeeded without running them", "store_true"), + 'verbose': Arg( + ("-v", "--verbose"), + "Make logging output more verbose", "store_true"), 'local': Arg( ("-l", "--local"), "Run the task using the LocalExecutor", "store_true"), @@ -1673,7 +1678,8 @@ class CLIFactory(object): 'dag_id', 'task_regex', 'start_date', 'end_date', 'mark_success', 'local', 'donot_pickle', 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', - 'subdir', 'pool', 'delay_on_limit', 'dry_run') + 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', + ) }, { 'func': list_tasks, 'help': "List the tasks within a DAG", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 9d80a79..0391750 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1952,6 +1952,7 @@ class BackfillJob(BaseJob): ignore_task_deps=False, pool=None, delay_on_limit_secs=1.0, + verbose=False, *args, **kwargs): self.dag = dag self.dag_id = dag.dag_id @@ -1963,6 +1964,7 @@ class BackfillJob(BaseJob): self.ignore_task_deps = ignore_task_deps self.pool = pool self.delay_on_limit_secs = delay_on_limit_secs + self.verbose = verbose super(BackfillJob, self).__init__(*args, **kwargs) def _update_counters(self, ti_status): @@ -2257,7 +2259,7 @@ class BackfillJob(BaseJob): if ti.are_dependencies_met( dep_context=backfill_context, session=session, - verbose=True): + verbose=self.verbose): ti.refresh_from_db(lock_for_update=True, session=session) if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY: if executor.has_task(ti): @@ -2364,11 +2366,11 @@ class BackfillJob(BaseJob): t.are_dependencies_met( dep_context=DepContext(ignore_depends_on_past=False), session=session, - verbose=True) != + verbose=self.verbose) != t.are_dependencies_met( dep_context=DepContext(ignore_depends_on_past=True), session=session, - verbose=True) + verbose=self.verbose) for t in ti_status.deadlocked) if deadlocked_depends_on_past: err += ( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/972086ae/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 28138fb..da18ec7 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1272,27 +1272,27 @@ class TaskInstance(Base, LoggingMixin): :type dep_context: DepContext :param session: database session :type session: Session - :param verbose: whether or not to print details on failed dependencies + :param verbose: whether log details on failed dependencies on + info or debug log level :type verbose: boolean """ dep_context = dep_context or DepContext() failed = False + verbose_aware_logger = self.log.info if verbose else self.log.debug for dep_status in self.get_failed_dep_statuses( dep_context=dep_context, session=session): failed = True - if verbose: - self.log.info( - "Dependencies not met for %s, dependency '%s' FAILED: %s", - self, dep_status.dep_name, dep_status.reason - ) + + verbose_aware_logger( + "Dependencies not met for %s, dependency '%s' FAILED: %s", + self, dep_status.dep_name, dep_status.reason + ) if failed: return False - if verbose: - self.log.info("Dependencies all met for %s", self) - + verbose_aware_logger("Dependencies all met for %s", self) return True @provide_session @@ -3961,7 +3961,9 @@ class DAG(BaseDag, LoggingMixin): ignore_task_deps=False, ignore_first_depends_on_past=False, pool=None, - delay_on_limit_secs=1.0): + delay_on_limit_secs=1.0, + verbose=False, + ): """ Runs the DAG. @@ -3987,6 +3989,8 @@ class DAG(BaseDag, LoggingMixin): :param delay_on_limit_secs: Time in seconds to wait before next attempt to run dag run when max_active_runs limit has been reached :type delay_on_limit_secs: float + :param verbose: Make logging output more verbose + :type verbose: boolean """ from airflow.jobs import BackfillJob if not executor and local: @@ -4003,7 +4007,9 @@ class DAG(BaseDag, LoggingMixin): ignore_task_deps=ignore_task_deps, ignore_first_depends_on_past=ignore_first_depends_on_past, pool=pool, - delay_on_limit_secs=delay_on_limit_secs) + delay_on_limit_secs=delay_on_limit_secs, + verbose=verbose, + ) job.run() def cli(self):