jscheffl commented on PR #53035:
URL: https://github.com/apache/airflow/pull/53035#issuecomment-3098222299

   Hi @guan404ming - so now was able to dive into the PR - and as I am a bit 
"picky" on UI journey, sorry for the amount of comments. I assume we did not 
"design" all UI elments in detail and when I provide feedback this means it is 
"just my personal opinion" and there might be others valid as well.
   
   So feedback from UX side:
   - I see that HITL task instances are listed as a new tab in the right panels 
of the Dag view - but only on Dag global view! This means that I can see there 
the tasks for all Dag runs. I think this is fine.
     - What jumps into my sight: It is called "HITL Task Instances". I think 
HITL is a good technical name and we understand this as developers - but 
somebody getting to UI probably does not understand. Could we abstract it and 
cal it "Human Tasks"?
     - Is it possible to show the tab only if there are any for the Dag? 
Because I assume most Dags do not carry HITL tasks and then the tab is always 
empty
     - Is it possible also the same for Dag run and task instance - if any HITL 
then display the tab and filter based on run/task?
     - Is it possible in HITL tasks to show the HITL panel per default instead 
of Logs (as logs might be very boring for somebody who want to report a Human 
task)
   
   Capture 1: Displayed on Dag
   <img width="1302" height="446" alt="image" 
src="https://github.com/user-attachments/assets/d1c4c37e-2194-4ab6-acff-52337ae6b88d";
 />
   Capture 2: Missing on Dag run
   <img width="1272" height="617" alt="image" 
src="https://github.com/user-attachments/assets/2f4aa28f-51e0-46da-8d90-fce5b2064f9f";
 />
   Capture 3: Missing on Task
   <img width="1099" height="622" alt="image" 
src="https://github.com/user-attachments/assets/50aeb32f-6104-45a5-bced-506e36ec399f";
 />
   Capture 4: Missing on Task instance
   <img width="1632" height="574" alt="image" 
src="https://github.com/user-attachments/assets/d1711cba-1dc8-4081-820d-254ebc1f9f49";
 />
   Capture 5: Displayed also on a Dag with no HITL tasks
   <img width="1302" height="516" alt="image" 
src="https://github.com/user-attachments/assets/ff742054-fd77-4b77-b254-5a49b27f71f1";
 />
   
   - The "HITL" view is displayed in the "Admin" menu - which I think is _not_ 
the right place. It is not something admin but mostly something for "normal" 
users. There migth be different opinions where it is "right" to be placed I 
assume... I'd still assume a top level menu entry would be good. Alternatively 
a panel on the "Start" dashboard. Or within Browse... but probably nobody will 
be able. So wishlist from my side would be a top level menu entry, second best 
would be browse menu and third best "user" menu.
   - Also here I'd propose to rename to "Human Tasks" (as HITL would be not 
known for a non power-user)
   - A panel on the dashboard might be another PR as well...
   <img width="538" height="448" alt="image" 
src="https://github.com/user-attachments/assets/f9867b76-b93f-46ae-8ab3-b8adaa2c75a1";
 />
   
   - Content of the HTIL list
       - The "reponse" icon is maybe not obvious that you need to click on it.
       - Can you therefore add the "subject" and make this a click-able link?
       - On the other hand the link on the "Task ID" is mis-leading pointing to 
logs of the task. I'd also point to the task form.
   <img width="1397" height="262" alt="image" 
src="https://github.com/user-attachments/assets/bbaf880c-ff62-4b8f-83a9-a310f8955d05";
 />
   
   - HITL dialog:
      - Instead of showing a pop-up as modal could we embed the HITL form 
within a tab on the task directly (like the logs)? That would save a click that 
is needed to open. Similar like a link to a task instance "normally" points to 
logs it could directly point to the entry panel as selected tab, you could 
enter and "repond".
      - I see that the reponse option is made to be a drop-down and then user 
needs to click "repsond". Funny layout bug is produced by this (see below) as 
there are more options than screen space and the accordion hides some 
options... Could we instead of having a "Respond" button for the options render 
the options as buttons? Like it was in the AIP (capture 2 below) - this would 
be from UX better especially for the cases where the user only has the "OK" 
options which renders a drop-down with only one element.
   Capture 1: Options dialog with layout glitch
   <img width="1019" height="409" alt="image" 
src="https://github.com/user-attachments/assets/b9e1af84-6f76-4340-9817-c28c3588ac7d";
 />
   Capture 2: Options as buttons as in AIP
   <img width="548" height="202" alt="image" 
src="https://github.com/user-attachments/assets/c4f46330-409f-4183-8780-1ea72e8bb316";
 />
   Capture 3: Information gathering with only "OK" option
   <img width="958" height="463" alt="image" 
src="https://github.com/user-attachments/assets/a69b1708-a216-4d06-ae3b-02c006f24496";
 />
   
   - In the logs I see request for input is starting and when / at which time 
the user responded. Can you also put the options that the user responded and 
the selected option to the logs?
   - Also when a user had responded there is no other way to see the response. 
Even if the dialog is re-opened the previously entered information is not 
displayed.
   Capture 1: Logs showing that admin responded but no details
   <img width="1112" height="202" alt="image" 
src="https://github.com/user-attachments/assets/43afbb8d-68c7-40ff-aa22-08420f648807";
 />
   Capture 2: Missing input on re-open dialog after entry
   <img width="1049" height="449" alt="image" 
src="https://github.com/user-attachments/assets/1800227f-6395-4d63-8dbe-91754526773a";
 />
   
   - I used a "choose a branch to run" and select "task 2" but the execution 
failed. I assume this is a bug un-related to UI but rather a problem in the Dag 
or logic?
   <img width="1182" height="638" alt="image" 
src="https://github.com/user-attachments/assets/1c690c24-10ca-445d-a266-fe7251cbc69f";
 />
   
   
   - Found a bug with EdgeExecutor when the deferred task runs into timeout an 
INSERT INTO db fails. Never tested deferred tasks before :-D - will file a 
separate bug
   ```
   root@f2921a52480b:/opt/airflow# airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   
/opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44
 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is 
deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`.
   [2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting 
the scheduler
   [2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded 
executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor
   [2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - 
Adopting or resetting orphaned tasks for active dag runs
   [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks 
up for execution:
           <TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.wait_for_option 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
   [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 0/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 1/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 0/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.wait_for_option 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
   [2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying 
to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: 
example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 
[scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: 
EdgeExecutor(parallelism=32)
   [2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1002, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1283, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1430, in _do_scheduling
       guard.commit()
     File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 
401, in commit
       self.session.commit()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
            ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
245, in save_obj
       _emit_insert_statements(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
1097, in _emit_insert_statements
       c = connection._execute_20(
           ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, 
state, queue, concurrency_slots, command, queued_dttm, edge_worker, 
last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, 
%(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, 
%(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
   [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 
'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 
'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 
'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2
 ... (675 characters truncated) ... 
,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 
'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command'
 : 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND
 ... (677 characters truncated) ... 
"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 
'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj
 ... (691 characters truncated) ... 
h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963
 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 
'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   [2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down 
EdgeExecutor
   [2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 10, in <module>
       sys.exit(main())
                ^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
       args.func(args)
     File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, 
in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in 
wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
52, in scheduler
       run_command_with_daemon_option(
     File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, 
in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in 
run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in 
execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1002, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1283, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1430, in _do_scheduling
       guard.commit()
     File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 
401, in commit
       self.session.commit()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
            ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
245, in save_obj
       _emit_insert_statements(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
1097, in _emit_insert_statements
       c = connection._execute_20(
           ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, 
state, queue, concurrency_slots, command, queued_dttm, edge_worker, 
last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, 
%(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, 
%(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
   [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 
'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 
'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 
'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2
 ... (675 characters truncated) ... 
,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 
'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command'
 : 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND
 ... (677 characters truncated) ... 
"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 
'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj
 ... (691 characters truncated) ... 
h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963
 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 
'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   ```
   
   P.S.: I hope you are not worried about all this feedback :-D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to