jscheffl commented on code in PR #45958:
URL: https://github.com/apache/airflow/pull/45958#discussion_r1937982373


##########
providers/edge/src/airflow/providers/edge/cli/api_client.py:
##########
@@ -103,15 +104,16 @@ def worker_register(
 
 def worker_set_state(
     hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] 
| None, sysinfo: dict
-) -> list[str] | None:
+) -> WorkerSetStateReturn:
     """Update the state of the worker in the central site and thereby 
implicitly heartbeat."""
-    return _make_generic_request(
+    result = _make_generic_request(
         "PATCH",
         f"worker/{quote(hostname)}",
         WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, 
sysinfo=sysinfo).model_dump_json(
             exclude_unset=True
         ),
     )
+    return WorkerSetStateReturn(**result["__data__"])

Review Comment:
   It seems the v2 wrapper "wrongly" serializes the class, therefore this 
compatibility breaks the EdgeWorker in Airflow 3. This generates a:
   ```
   [2025-01-31T20:33:16.309+0000] {before.py:42} WARNING - Starting call to 
'airflow.providers.edge.cli.api_client._make_generic_request', this is the 6th 
time calling it.
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 10, in <module>
       sys.exit(main())
                ^^^^^^
     File "/opt/airflow/airflow/__main__.py", line 58, in main
       args.func(args)
     File "/opt/airflow/airflow/utils/cli.py", line 111, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 
55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/providers/edge/src/airflow/providers/edge/cli/edge_command.py", 
line 437, in worker
       edge_worker.start()
     File 
"/opt/airflow/providers/edge/src/airflow/providers/edge/cli/edge_command.py", 
line 291, in start
       self.worker_state_changed = self.heartbeat()
                                   ^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/providers/edge/src/airflow/providers/edge/cli/edge_command.py", 
line 393, in heartbeat
       worker_info = worker_set_state(self.hostname, state, len(self.jobs), 
self.queues, sysinfo)
                     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/providers/edge/src/airflow/providers/edge/cli/api_client.py", 
line 116, in worker_set_state
       return WorkerSetStateReturn(**result["__data__"])
                                     ~~~~~~^^^^^^^^^^^^
   KeyError: '__data__'
   root@a788fcdc822a:/opt/airflow#
   ```
   So proposed fix is:
   ```suggestion
       return WorkerSetStateReturn(**result)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to