ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205117226
> What was the crash you experienced?
Here's the log:
```
2022-08-04 11:12:57,341] {process_utils.py:76} INFO - Process
psutil.Process(pid=22647, status='terminated', exitcode=0, started='11:12:49')
(22647) terminated with exit code 0
[2022-08-04 11:12:57,342] {scheduler_job.py:779} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1803, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line
719, in do_execute
[2022-08-04 11:12:57 +0000] [22509] [INFO] Handling signal: term
cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique
constraint "task_instance_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug,
download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow/models/dagrun.py", line 1143, in
_create_task_instances
[2022-08-04 11:12:57 +0000] [22510] [INFO] Worker exiting (pid: 22510)
[2022-08-04 11:12:57 +0000] [22563] [INFO] Worker exiting (pid: 22563)
session.bulk_insert_mappings(TI, tasks)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3711, in bulk_insert_mappings
render_nulls,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3811, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line
72, in __exit__
with_traceback=exc_tb,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 3805, in _bulk_save_mappings
render_nulls,
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line
112, in _bulk_insert
bookkeeping=return_defaults,
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line
1097, in _emit_insert_statements
statement, multiparams, execution_options=execution_options
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1614, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",
line 326, in _execute_on_connection
self, multiparams, params, execution_options
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1491, in _execute_clauseelement
cache_hit=cache_hit,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1846, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 2027, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1803, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line
719, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate
key value violates unique constraint "task_instance_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug,
download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
[SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index,
try_number, max_tries, hostname, unixname, pool, pool_slots, queue,
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s,
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s,
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s,
%(operator)s, %(executor_config)s)]
[parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug',
'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number':
0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool',
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator':
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary
object at 0xffff7a066630>}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 33, in <module>
sys.exit(load_entry_point('apache-airflow', 'console_scripts',
'airflow')())
File "/opt/airflow/airflow/__main__.py", line 38, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 94, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 84, in
scheduler
_run_scheduler_job(args=args)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 50, in
_run_scheduler_job
job.run()
File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 750, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 859, in
_run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 941, in
_do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1177, in
_schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session,
execute_callbacks=False)
File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dagrun.py", line 527, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dagrun.py", line 741, in
task_instance_scheduling_decisions
self.verify_integrity(missing_indexes=missing_indexes, session=session)
File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dagrun.py", line 950, in verify_integrity
self._create_task_instances(dag.dag_id, tasks, created_counts,
hook_is_noop, session=session)
File "/opt/airflow/airflow/models/dagrun.py", line 1154, in
_create_task_instances
self.run_id,
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line
481, in __get__
return self.impl.get(state, dict_)
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line
926, in get
value = self._fire_loader_callables(state, key, passive)
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line
957, in _fire_loader_callables
return state._load_expired(state, passive)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py",
line 710, in _load_expired
self.manager.expired_attribute_loader(self, toload, passive)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py",
line 1459, in load_scalar_attributes
no_autoflush=no_autoflush,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py",
line 418, in load_on_ident
execution_options=execution_options,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py",
line 534, in load_on_pk_identity
bind_arguments=bind_arguments,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1688, in execute
conn = self._connection_for_bind(bind)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1530, in _connection_for_bind
engine, execution_options
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 721, in _connection_for_bind
self._assert_active()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 608, in _assert_active
code="7s2a",
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates
unique constraint "task_instance_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug,
download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
[SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index,
try_number, max_tries, hostname, unixname, pool, pool_slots, queue,
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s,
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s,
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s,
%(operator)s, %(executor_config)s)]
[parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug',
'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number':
0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool',
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator':
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary
object at 0xffff7a066630>}]
(Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on
this error at: https://sqlalche.me/e/14/7s2a)
[2022-08-04 11:12:57 +0000] [22509] [INFO] Shutting down: Master
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]