Anton Cherkasov created AIRFLOW-4298: ----------------------------------------
Summary: Scheduler can't communicate with Postgres as Metadata DB after upgrade from 1.10.2 to 1.10.3 Key: AIRFLOW-4298 URL: https://issues.apache.org/jira/browse/AIRFLOW-4298 Project: Apache Airflow Issue Type: Bug Components: scheduler Affects Versions: 1.10.3 Environment: Main host with airflow services: Cent-OS 7, Python 3.6.1. DB - official Docker image with Postgresql 11.2-alpine Reporter: Anton Cherkasov I have some strange issue with scheduler after upgrade to 1.10.3 from 1.10.2. DAG tasks runs only once. After that scheduler logs looks like this: {noformat} Apr 11 09:21:33 airflow.infra airflow[32739]: [2019-04-11 09:21:33,094] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:21:44 airflow.infra airflow[32739]: [2019-04-11 09:21:44,105] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:21:55 airflow.infra airflow[32739]: [2019-04-11 09:21:55,114] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:22:06 airflow.infra airflow[32739]: [2019-04-11 09:22:06,123] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:22:17 airflow.infra airflow[32739]: [2019-04-11 09:22:17,131] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:22:28 airflow.infra airflow[32739]: [2019-04-11 09:22:28,143] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... {noformat} Logs from Scheduler with *DEBUG* level: {noformat} Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:154}} DEBUG - Setting up DB connection pool (PID 17447) Apr 11 09:00:47 airflow.infra airflow[17403]: [2019-04-11 09:00:47,720] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=100, pool_recycle=3600, pid=17447 Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,450] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17449) Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,535] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 17448) Apr 11 09:00:48 airflow.infra airflow[17403]: [2019-04-11 09:00:48,706] {{jobs.py:1663}} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging . . . Apr 11 09:01:29 airflow.infra airflow[17403]: [2019-04-11 09:01:29,884] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21866) Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,492] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21865) Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,785] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor73-Process, stopped)> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor74-Process, stopped)> Apr 11 09:01:30 airflow.infra airflow[17403]: [2019-04-11 09:01:30,790] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,765] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21910) Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,786] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor76-Process, stopped)> Apr 11 09:01:31 airflow.infra airflow[17403]: [2019-04-11 09:01:31,866] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21909) Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,468] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21921) Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor75-Process, stopped)> Apr 11 09:01:32 airflow.infra airflow[17403]: [2019-04-11 09:01:32,787] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor77-Process, stopped)> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,358] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21933) Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,521] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21934) Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,788] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor78-Process, stopped)> Apr 11 09:01:33 airflow.infra airflow[17403]: [2019-04-11 09:01:33,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor79-Process, stopped)> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,456] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21949) Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,679] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21950) Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,789] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor80-Process, stopped)> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor81-Process, stopped)> Apr 11 09:01:34 airflow.infra airflow[17403]: [2019-04-11 09:01:34,892] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21964) Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,405] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21963) Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,790] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor82-Process, stopped)> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,791] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor83-Process, stopped)> Apr 11 09:01:35 airflow.infra airflow[17403]: [2019-04-11 09:01:35,834] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21994) Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,462] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 21993) Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,792] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor84-Process, stopped)> Apr 11 09:01:36 airflow.infra airflow[17403]: [2019-04-11 09:01:36,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor85-Process, stopped)> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,509] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22003) Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,672] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22004) Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,793] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor86-Process, stopped)> Apr 11 09:01:37 airflow.infra airflow[17403]: [2019-04-11 09:01:37,794] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor87-Process, stopped)> Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,472] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22010) Apr 11 09:01:38 airflow.infra airflow[17403]: [2019-04-11 09:01:38,795] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor88-Process, stopped)> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,522] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22054) Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,588] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22053) Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor89-Process, stopped)> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,796] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor90-Process, stopped)> Apr 11 09:01:39 airflow.infra airflow[17403]: [2019-04-11 09:01:39,888] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22060) Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,460] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22061) Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,798] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor91-Process, stopped)> Apr 11 09:01:40 airflow.infra airflow[17403]: [2019-04-11 09:01:40,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor92-Process, stopped)> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,459] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22088) Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,478] {{settings.py:206}} DEBUG - Disposing DB connection pool (PID 22089) Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor93-Process, stopped)> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,799] {{jobs.py:496}} DEBUG - Waiting for <Process(DagFileProcessor94-Process, stopped)> Apr 11 09:01:41 airflow.infra airflow[17403]: [2019-04-11 09:01:41,804] {{sqlalchemy.py:81}} WARNING - DB connection invalidated. Reconnecting... {noformat} Webserver and Worker works fine. If I ran task by hands - it works. I'm using postgresql in docker as Metadata DB. I'm using CeleryExecutor and RabbitMQ as broker. Related parts of airflow.cfg: {noformat} # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor executor = CeleryExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engine, more information # their website sql_alchemy_conn = postgresql+psycopg2://{{airflow_db_user}}:{{airflow_db_pass}}@{{airflow_db_host}}:12411/airflow # If SqlAlchemy should pool database connections. sql_alchemy_pool_enabled = True # The SqlAlchemy pool size is the maximum number of database connections # in the pool. 0 indicates no limit. sql_alchemy_pool_size = 100 # The SqlAlchemy pool recycle is the number of seconds a connection # can be idle in the pool before it is invalidated. This config does # not apply to sqlite. If the number of DB connections is ever exceeded, # a lower config value will allow the system to recover faster. sql_alchemy_pool_recycle = 3600 # How many seconds to retry re-establishing a DB connection after # disconnects. Setting this to 0 disables retries. sql_alchemy_reconnect_timeout = 300 # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation parallelism = 116 # The number of task instances allowed to run concurrently by the scheduler dag_concurrency = 16 # Are DAGs paused by default at creation dags_are_paused_at_creation = True # When not using pools, tasks are run in the "default pool", # whose size is guided by this config element non_pooled_task_slot_count = 128 # The maximum number of active DAG runs per DAG max_active_runs_per_dag = 4 {noformat} {noformat} [scheduler] # Task instances listen for external kill signal (when you clear tasks # from the CLI or the UI), this defines the frequency at which they should # listen (in seconds). job_heartbeat_sec = 5 # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). scheduler_heartbeat_sec = 5 # after how much time should the scheduler terminate in seconds # -1 indicates to run continuously (see also num_runs) run_duration = -1 # after how much time (seconds) a new DAGs should be picked up from the filesystem min_file_process_interval = 0 # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 # How often should stats be printed to the logs print_stats_interval = 30 child_process_log_directory = {{ airflow_child_scheduler_logs_folder }} # Local task jobs periodically heartbeat to the DB. If the job has # not heartbeat in this many seconds, the scheduler will mark the # associated task instance as failed and will re-schedule the task. scheduler_zombie_task_threshold = 300 # Turn off scheduler catchup by setting this to False. # Default behavior is unchanged and # Command Line Backfills still work, but the scheduler # will not do scheduler catchup if this is False, # however it can be set on a per DAG basis in the # DAG definition (catchup) catchup_by_default = True # This changes the batch size of queries in the scheduling main loop. # If this is too high, SQL query performance may be impacted by one # or more of the following: # - reversion to full table scan # - complexity of query predicate # - excessive locking # # Additionally, you may hit the maximum allowable query length for your db. # # Set this to 0 for no limit (not advised) max_tis_per_query = 0 # Statsd (https://github.com/etsy/statsd) integration settings statsd_on = True statsd_host = localhost statsd_port = 8125 statsd_prefix = airflow # The scheduler can run multiple threads in parallel to schedule dags. # This defines how many threads will run. max_threads = 2 authenticate = False {noformat} DB configuration: {noformat} -c max_connections=1000 -c shared_buffers=1GB -c effective_cache_size=3GB -c maintenance_work_mem=256MB -c checkpoint_completion_target=0.7 -c wal_buffers=16MB -c default_statistics_target=100 -c random_page_cost=1.1 -c max_wal_size=2GB -c work_mem=104857kB -c min_wal_size=1GB -c max_wal_senders=5 -c max_worker_processes=2 -c max_parallel_workers_per_gather=1 -c max_parallel_workers=2 {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)