qamasailer opened a new issue, #56424:
URL: https://github.com/apache/airflow/issues/56424

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   After the upgrade to Airflow 3.1.0 the failing of one of our DAGs and its 
email notification for that led to the scheduler crashing with the following 
error message.
   ```
       sys.exit(main())
                ^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1042, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1345, in _run_scheduler_loop
       num_finished_events += self._process_executor_events(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 745, in _process_executor_events
       return SchedulerJobRunner.process_executor_events(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 977, in process_executor_events
       executor.send_callback(email_request)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py",
 line 586, in send_callback
       self.callback_sink.send(request)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 99, in wrapper
       with create_session() as session:
            ^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/contextlib.py", line 144, in __exit__
       next(self.gen)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 43, in create_session
       session.commit()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
            ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
       rec.execute(self)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
       _emit_insert_statements(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 1238, in _emit_insert_statements
       result = connection._execute_20(
                ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
    File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.DataError: (psycopg2.errors.StringDataRightTruncation) value 
too long for type character varying(20)
   [SQL: INSERT INTO callback_request (created_at, priority_weight, 
callback_data, callback_type) VALUES (%(created_at)s, %(priority_weight)s, 
%(callback_data)s, %(callback_type)s) RETURNING callback_request.id]
   [parameters: {'created_at': datetime.datetime(2025, 10, 6, 7, 43, 34, 
612854, tzinfo=Timezone('UTC')), 'priority_weight': 1, 'callback_data': 
'"{\\"filepath\\":\\"commons/esg_load.py\\",\\"bundle_name\\":\\"dags-folder\\",\\"bundle_version\\":null,\\"msg\\":\\"Executor
 CeleryExecutor(paralle ... (1361 characters truncated) ... 
:null,\\"next_method\\":null,\\"next_kwargs\\":null,\\"xcom_keys_to_clear\\":[],\\"should_retry\\":false},\\"type\\":\\"EmailNotificationRequest\\"}"',
 'callback_type': 'EmailNotificationRequest'}]
   (Background on this error at: https://sqlalche.me/e/14/9h9h)
   ```
   
   
   
   ### What you think should happen instead?
   
   We made a quick-fix by changing the callback_type field to length 30, 
however I think that this is not a stable and good solution and should be fixed 
in airflow by either changing the field length in the models or by changing the 
value for this callback_type to something below 20 characters.
   
   ### How to reproduce
   
   I tried reproducing, but I cannot get it to fail in the running system again.
   I can understand why its happening by going through the code:
   
   
https://github.com/apache/airflow/blob/cc1a0007eb20c6c29f420e8f8867f2eaf56de787/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L977
   
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/base_executor.py#L586
   
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/callbacks/database_callback_sink.py#L38
   
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/db_callback_request.py#L48
   uses the class name which is too long for the field.
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to