kiros19 commented on issue #49966:
URL: https://github.com/apache/airflow/issues/49966#issuecomment-3845726832
I've got the same error on setup:
Airlfow 3.1.6 + apache-airflow-providers-fab 3.1.2 (trying to solve
apiserver UI not opening after some time)
CeleryExecutor + rabbitMQ:3-management
Docker
I've seen the error before, but i believe some reloads, version changes made
it dissapear someway
The problem as i see it:
Celery has some buit-in log formatting setting, that's failing
That's causing worker to close AMQP connection to rabbit
I believe that logging config is built-in as my config is default and has
'taskname' everywhere, not task_name
And logs were not modified from scripts or anything, it's clean
The №1 sus thing is that airflow jobs still executed ok. Nothing is failing.
But how could it be ok, if AMQP is closed instanly on opening, due to that
celery logging error?
Other sus thing - Error is not appearing for my prod setup:
Airlfow 3.1.5 + airflow-providers-fab 3.0.3
And I'm not sure I've seen it for my prev test setup (not 100% sure here):
Airlfow 3.1.6 + apache-airflow-providers-fab 3.1.1
Below are my rabbit,celery logs and my logging config
RABBITMQ:
--ok
2026-02-04 06:46:26.518060+00:00 [info] <0.3105.0> accepting AMQP connection
<0.3105.0> (172.18.0.4:46058 -> 172.18.0.2:5672)
2026-02-04 06:46:26.520354+00:00 [info] <0.3105.0> connection <0.3105.0>
(172.18.0.4:46058 -> 172.18.0.2:5672): user 'airflow' authenticated and granted
access to vhost 'airflow_queue'
2026-02-04 06:46:26.528031+00:00 [info] <0.3122.0> accepting AMQP connection
<0.3122.0> (172.18.0.4:46064 -> 172.18.0.2:5672)
2026-02-04 06:46:26.529746+00:00 [info] <0.3122.0> connection <0.3122.0>
(172.18.0.4:46064 -> 172.18.0.2:5672): user 'airflow' authenticated and granted
access to vhost 'airflow_queue'
--instanlty closing
2026-02-04 06:46:26.922223+00:00 [warning] <0.3105.0> closing AMQP
connection <0.3105.0> (172.18.0.4:46058 -> 172.18.0.2:5672, vhost:
'airflow_queue', user: 'airflow'):
2026-02-04 06:46:26.922223+00:00 [warning] <0.3105.0> client unexpectedly
closed TCP connection
2026-02-04 06:46:26.922550+00:00 [warning] <0.3122.0> closing AMQP
connection <0.3122.0> (172.18.0.4:46064 -> 172.18.0.2:5672, vhost:
'airflow_queue', user: 'airflow'):
2026-02-04 06:46:26.922550+00:00 [warning] <0.3122.0> client unexpectedly
closed TCP connection
WORKER:
--- Logging error ---
Traceback (most recent call last):
File "/usr/python/lib/python3.12/logging/__init__.py", line 464, in format
return self._format(record)
^^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/logging/__init__.py", line 460, in _format
return self._fmt % values
~~~~~~~~~~^~~~~~~~
KeyError: 'task_name'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/python/lib/python3.12/logging/__init__.py", line 1160, in emit
msg = self.format(record)
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/logging/__init__.py", line 999, in format
return fmt.format(record)
^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/logging/__init__.py", line 706, in format
s = self.formatMessage(record)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/logging/__init__.py", line 675, in
formatMessage
return self._style.format(record)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/logging/__init__.py", line 466, in format
raise ValueError('Formatting field not found in record: %s' % e)
ValueError: Formatting field not found in record: 'task_name'
Call stack:
File "/home/airflow/.local/bin/airflow", line 7, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
55, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
114, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
line 66, in wrapper
providers_configuration_loaded(func)(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
line 293, in worker
_run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
line 52, in _run_command_with_daemon_option
run_command_with_daemon_option(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
line 286, in run_celery_worker
celery_app.worker_main(options)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py", line
487, in worker_main
self.start(argv=argv)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py", line
467, in start
celery.main(args=argv, standalone_mode=False)
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1406, in main
rv = self.invoke(ctx)
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1873, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1269, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 824, in invoke
return callback(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/click/decorators.py", line
34, in new_func
return f(get_current_context(), *args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/base.py", line
135, in caller
return f(ctx, *args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/worker.py", line
367, in worker
worker.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/worker.py",
line 203, in start
self.blueprint.start(self)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line
116, in start
step.start(parent)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line
365, in start
return self.obj.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
line 346, in start
blueprint.start(self)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line
116, in start
step.start(parent)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
line 788, in start
c.loop(*c.loop_args())
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/loops.py",
line 96, in asynloop
next(loop)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/asynchronous/hub.py",
line 310, in create_loop
item()
File "/home/airflow/.local/lib/python3.12/site-packages/vine/promises.py",
line 158, in __call__
retval = fun(*final_args, **final_kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/base.py",
line 230, in _read
drain_events(timeout=0)
File
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line
526, in drain_events
while not self.blocking_read(timeout):
File
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line
532, in blocking_read
return self.on_inbound_frame(frame)
File
"/home/airflow/.local/lib/python3.12/site-packages/amqp/method_framing.py",
line 77, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line
538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File
"/home/airflow/.local/lib/python3.12/site-packages/amqp/abstract_channel.py",
line 156, in dispatch_method
listener(*args)
File "/home/airflow/.local/lib/python3.12/site-packages/amqp/channel.py",
line 1629, in _on_basic_deliver
fun(msg)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", line
668, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
line 694, in on_task_received
strategy(
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/strategy.py",
line 207, in task_message_handler
[callback(req) for callback in callbacks]
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/autoscale.py",
line 96, in maybe_scale
self.pool.maintain_pool()
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 1351, in maintain_pool
self._maintain_pool()
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 1343, in _maintain_pool
self._repopulate_pool(joined)
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 1328, in _repopulate_pool
self._create_worker_process(self._avail_index())
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/concurrency/asynpool.py",
line 493, in _create_worker_process
return super()._create_worker_process(i)
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 1158, in _create_worker_process
w.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line
120, in start
self._popen = self._Popen(self)
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/context.py", line
331, in _Popen
return Popen(process_obj)
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/popen_fork.py",
line 22, in __init__
self._launch(process_obj)
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/popen_fork.py",
line 77, in _launch
code = process_obj._bootstrap()
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line
323, in _bootstrap
self.run()
File
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line
110, in run
self._target(*self._args, **self._kwargs)
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 292, in __call__
sys.exit(self.workloop(pid=pid))
File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py",
line 362, in workloop
result = (True, prepare_result(fun(*args, **kwargs)))
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line
694, in fast_trace_task
R, I, T, Rstr = tasks[task].__trace__(
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line
479, in trace_task
R = retval = fun(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line
779, in __protected_call__
return self.run(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 154, in execute_workload
log.info("[%s] Executing workload in Celery: %s", celery_task_id,
workload)
Message: '[%s] Executing workload in Celery: %s'
CONFIG:
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = False
remote_log_conn_id =
delete_local_logs = False
google_key_path =
remote_base_log_folder =
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = INFO
celery_logging_level = WARNING
fab_logging_level = WARNING
logging_config_class =
colored_console_log = True
#log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s -
%(message)s
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s -
%%(message)s
#log_format = %(asctime)s [%(levelname)s] %(message)s
callsite_parameters = filename,lineno
#simple_log_format = %(asctime)s %(levelname)s - %(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR]
{%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
dag_processor_child_process_log_directory = /opt/airflow/logs/dag_processor
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
min_length_masked_secret = 5
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id
}}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{
ti.map_index }}/{%% endif %%}attempt={{ try_number|default(ti.try_number) }}.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser =
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
color_log_error_keywords = error,exception
color_log_warning_keywords = warn
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
celery_app_name = airflow.providers.celery.executors.celery_executor
worker_concurrency = 16
# worker_autoscale =
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = pyamqp://airflow_mcfm:3hJflp5jVSHm@rabbitmq/mcfm
result_backend =
db+postgresql://airflow_mcfm:[email protected]:15432/airflow_mcfm
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options =
airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_acks_late = True
task_track_started = True
task_publish_max_retries = 3
extra_celery_config = {}
#extra_celery_config = {"global_qos": false} -- False должно быть, как в
питоне, не false. Но настройка фолс по дефолту
#broker_transport_options = {"global_qos": false}
# worker_umask =
[celery_broker_transport_options]
# visibility_timeout =
# sentinel_kwargs =
And here is everything i've got on logging in docker compose (no celery
logging format related stuff)
AIRFLOW__LOGGING__BASE_LOG_FOLDER: '/opt/airflow/logs'
AIRFLOW__LOGGING__DAG_PROCESSOR_CHILD_PROCESS_LOG_DIRECTORY:
'/opt/airflow/logs/dag_processor_logs'
AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: 's3://dev/logs'
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 'minio_s3_conn'
AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: 'true'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]