jscheffl commented on PR #53035: URL: https://github.com/apache/airflow/pull/53035#issuecomment-3098222299
Hi @guan404ming - so now was able to dive into the PR - and as I am a bit "picky" on UI journey, sorry for the amount of comments. I assume we did not "design" all UI elments in detail and when I provide feedback this means it is "just my personal opinion" and there might be others valid as well. So feedback from UX side: - I see that HITL task instances are listed as a new tab in the right panels of the Dag view - but only on Dag global view! This means that I can see there the tasks for all Dag runs. I think this is fine. - What jumps into my sight: It is called "HITL Task Instances". I think HITL is a good technical name and we understand this as developers - but somebody getting to UI probably does not understand. Could we abstract it and cal it "Human Tasks"? - Is it possible to show the tab only if there are any for the Dag? Because I assume most Dags do not carry HITL tasks and then the tab is always empty - Is it possible also the same for Dag run and task instance - if any HITL then display the tab and filter based on run/task? - Is it possible in HITL tasks to show the HITL panel per default instead of Logs (as logs might be very boring for somebody who want to report a Human task) Capture 1: Displayed on Dag <img width="1302" height="446" alt="image" src="https://github.com/user-attachments/assets/d1c4c37e-2194-4ab6-acff-52337ae6b88d" /> Capture 2: Missing on Dag run <img width="1272" height="617" alt="image" src="https://github.com/user-attachments/assets/2f4aa28f-51e0-46da-8d90-fce5b2064f9f" /> Capture 3: Missing on Task <img width="1099" height="622" alt="image" src="https://github.com/user-attachments/assets/50aeb32f-6104-45a5-bced-506e36ec399f" /> Capture 4: Missing on Task instance <img width="1632" height="574" alt="image" src="https://github.com/user-attachments/assets/d1711cba-1dc8-4081-820d-254ebc1f9f49" /> Capture 5: Displayed also on a Dag with no HITL tasks <img width="1302" height="516" alt="image" src="https://github.com/user-attachments/assets/ff742054-fd77-4b77-b254-5a49b27f71f1" /> - The "HITL" view is displayed in the "Admin" menu - which I think is _not_ the right place. It is not something admin but mostly something for "normal" users. There migth be different opinions where it is "right" to be placed I assume... I'd still assume a top level menu entry would be good. Alternatively a panel on the "Start" dashboard. Or within Browse... but probably nobody will be able. So wishlist from my side would be a top level menu entry, second best would be browse menu and third best "user" menu. - Also here I'd propose to rename to "Human Tasks" (as HITL would be not known for a non power-user) - A panel on the dashboard might be another PR as well... <img width="538" height="448" alt="image" src="https://github.com/user-attachments/assets/f9867b76-b93f-46ae-8ab3-b8adaa2c75a1" /> - Content of the HTIL list - The "reponse" icon is maybe not obvious that you need to click on it. - Can you therefore add the "subject" and make this a click-able link? - On the other hand the link on the "Task ID" is mis-leading pointing to logs of the task. I'd also point to the task form. <img width="1397" height="262" alt="image" src="https://github.com/user-attachments/assets/bbaf880c-ff62-4b8f-83a9-a310f8955d05" /> - HITL dialog: - Instead of showing a pop-up as modal could we embed the HITL form within a tab on the task directly (like the logs)? That would save a click that is needed to open. Similar like a link to a task instance "normally" points to logs it could directly point to the entry panel as selected tab, you could enter and "repond". - I see that the reponse option is made to be a drop-down and then user needs to click "repsond". Funny layout bug is produced by this (see below) as there are more options than screen space and the accordion hides some options... Could we instead of having a "Respond" button for the options render the options as buttons? Like it was in the AIP (capture 2 below) - this would be from UX better especially for the cases where the user only has the "OK" options which renders a drop-down with only one element. Capture 1: Options dialog with layout glitch <img width="1019" height="409" alt="image" src="https://github.com/user-attachments/assets/b9e1af84-6f76-4340-9817-c28c3588ac7d" /> Capture 2: Options as buttons as in AIP <img width="548" height="202" alt="image" src="https://github.com/user-attachments/assets/c4f46330-409f-4183-8780-1ea72e8bb316" /> Capture 3: Information gathering with only "OK" option <img width="958" height="463" alt="image" src="https://github.com/user-attachments/assets/a69b1708-a216-4d06-ae3b-02c006f24496" /> - In the logs I see request for input is starting and when / at which time the user responded. Can you also put the options that the user responded and the selected option to the logs? - Also when a user had responded there is no other way to see the response. Even if the dialog is re-opened the previously entered information is not displayed. Capture 1: Logs showing that admin responded but no details <img width="1112" height="202" alt="image" src="https://github.com/user-attachments/assets/43afbb8d-68c7-40ff-aa22-08420f648807" /> Capture 2: Missing input on re-open dialog after entry <img width="1049" height="449" alt="image" src="https://github.com/user-attachments/assets/1800227f-6395-4d63-8dbe-91754526773a" /> - I used a "choose a branch to run" and select "task 2" but the execution failed. I assume this is a bug un-related to UI but rather a problem in the Dag or logic? <img width="1182" height="638" alt="image" src="https://github.com/user-attachments/assets/1c690c24-10ca-445d-a266-fe7251cbc69f" /> - Found a bug with EdgeExecutor when the deferred task runs into timeout an INSERT INTO db fails. Never tested deferred tasks before :-D - will file a separate bug ``` root@f2921a52480b:/opt/airflow# airflow scheduler ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ /opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`. [2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting the scheduler [2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor [2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - Adopting or resetting orphaned tasks for active dag runs [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks up for execution: <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]> [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 1/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting the following tasks to queued state: <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]> [2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: EdgeExecutor(parallelism=32) [2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - Exception when executing SchedulerJob._run_scheduler_loop Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute self._run_scheduler_loop() File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling guard.commit() File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit self.session.commit() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj _emit_insert_statements( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)] [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command' : '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})] (Background on this error at: https://sqlalche.me/e/14/gkpj) [2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down EdgeExecutor [2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited execute loop Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/bin/airflow", line 10, in <module> sys.exit(main()) ^^^^^^ File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main args.func(args) File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler run_command_with_daemon_option( File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option callback() File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda> callback=lambda: _run_scheduler_job(args), ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute self._run_scheduler_loop() File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling guard.commit() File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit self.session.commit() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj _emit_insert_statements( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)] [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command' : '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})] (Background on this error at: https://sqlalche.me/e/14/gkpj) ``` P.S.: I hope you are not worried about all this feedback :-D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org