[ https://issues.apache.org/jira/browse/AIRFLOW-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358187#comment-16358187 ]
ASF subversion and git services commented on AIRFLOW-1157: ---------------------------------------------------------- Commit e6973b1596914e5d62567e065223e7b169d1c26c in incubator-airflow's branch refs/heads/master from Ian Suvak [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e6973b1 ] [AIRFLOW-1157] Fix missing pools crashing the scheduler Throw a warning when a pool associated with a Task Instance doesn't exist instead of crashing the scheduler. Use the default value of 0 slots for non-existent pools. Closes #3002 from iansuvak/1157_nonexistent_pool > Assigning a task to a pool that doesn't exist crashes the scheduler > ------------------------------------------------------------------- > > Key: AIRFLOW-1157 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1157 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Affects Versions: Airflow 1.8 > Reporter: John Culver > Assignee: David Klosowski > Priority: Critical > > If a dag is run that contains a task using a pool that doesn't exist, the > scheduler will crash. > Manually triggering the run of this dag on an environment without a pool > named 'a_non_existent_pool' will crash the scheduler: > {code} > from datetime import datetime > from airflow.models import DAG > from airflow.operators.dummy_operator import DummyOperator > dag = DAG(dag_id='crash_scheduler', > start_date=datetime(2017,1,1), > schedule_interval=None) > t1 = DummyOperator(task_id='crash', > pool='a_non_existent_pool', > dag=dag) > {code} > Here is the relevant log output on the scheduler: > {noformat} > [2017-04-27 19:31:24,816] {dag_processing.py:559} INFO - Processor for > /opt/airflow/dags/test-3.py finished > [2017-04-27 19:31:24,817] {dag_processing.py:559} INFO - Processor for > /opt/airflow/dags/test_s3_file_move.py finished > [2017-04-27 19:31:24,819] {dag_processing.py:627} INFO - Started a process > (PID: 124) to generate tasks for /opt/airflow/dags/crash_scheduler.py - > logging into /tmp/airflow/scheduler/logs/2017-04-27/crash_scheduler.py.log > [2017-04-27 19:31:24,822] {dag_processing.py:627} INFO - Started a process > (PID: 125) to generate tasks for /opt/airflow/dags/configuration/constants.py > - logging into > /tmp/airflow/scheduler/logs/2017-04-27/configuration/constants.py.log > [2017-04-27 19:31:24,847] {jobs.py:1007} INFO - Tasks up for execution: > <TaskInstance: move_s3_file_test.move_files 2017-04-27 > 19:31:22.298893 [scheduled]> > [2017-04-27 19:31:24,849] {jobs.py:1030} INFO - Figuring out tasks to run in > Pool(name=None) with 128 open slots and 1 task instances in queue > [2017-04-27 19:31:24,856] {jobs.py:1078} INFO - DAG move_s3_file_test has > 0/16 running tasks > [2017-04-27 19:31:24,856] {jobs.py:1105} INFO - Sending to executor > (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, > 22, 298893)) with priority 1 and queue MVSANDBOX-airflow-DEV-dev > [2017-04-27 19:31:24,859] {jobs.py:1116} INFO - Setting state of > (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, > 22, 298893)) to queued > [2017-04-27 19:31:24,867] {base_executor.py:50} INFO - Adding to queue: > airflow run move_s3_file_test move_files 2017-04-27T19:31:22.298893 --local > -sd /opt/airflow/dags/test_s3_file_move.py > [2017-04-27 19:31:24,867] {jobs.py:1440} INFO - Heartbeating the executor > [2017-04-27 19:31:24,872] {celery_executor.py:78} INFO - [celery] queuing > (u'move_s3_file_test', u'move_files', datetime.datetime(2017, 4, 27, 19, 31, > 22, 298893)) through celery, queue=MVSANDBOX-airflow-DEV-dev > [2017-04-27 19:31:25,974] {jobs.py:1404} INFO - Heartbeating the process > manager > [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for > /opt/airflow/dags/crash_scheduler.py finished > [2017-04-27 19:31:25,975] {dag_processing.py:559} INFO - Processor for > /opt/airflow/dags/configuration/constants.py finished > [2017-04-27 19:31:25,977] {dag_processing.py:627} INFO - Started a process > (PID: 128) to generate tasks for /opt/airflow/dags/example_s3_sensor.py - > logging into /tmp/airflow/scheduler/logs/2017-04-27/example_s3_sensor.py.log > [2017-04-27 19:31:25,980] {dag_processing.py:627} INFO - Started a process > (PID: 129) to generate tasks for /opt/airflow/dags/test-4.py - logging into > /tmp/airflow/scheduler/logs/2017-04-27/test-4.py.log > [2017-04-27 19:31:26,004] {jobs.py:1007} INFO - Tasks up for execution: > <TaskInstance: crash_scheduler.crash 2017-04-27 19:30:51.948542 > [scheduled]> > [2017-04-27 19:31:26,006] {jobs.py:1311} INFO - Exited execute loop > [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 128 > [2017-04-27 19:31:26,008] {jobs.py:1325} INFO - Terminating child PID: 129 > [2017-04-27 19:31:26,008] {jobs.py:1329} INFO - Waiting up to 5s for > processes to exit... > Traceback (most recent call last): > File "/usr/bin/airflow", line 28, in <module> > args.func(args) > File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 839, in > scheduler > job.run() > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 200, in run > self._execute() > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1309, in > _execute > self._execute_helper(processor_manager) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1437, in > _execute_helper > (State.SCHEDULED,)) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1025, in > _execute_task_instances > open_slots = pools[pool].open_slots(session=session) > KeyError: u'a_non_existant_pool' > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)