jscheffl commented on code in PR #45958:
URL: https://github.com/apache/airflow/pull/45958#discussion_r1929597555
##########
providers/edge/src/airflow/providers/edge/models/edge_worker.py:
##########
@@ -52,6 +52,14 @@ class EdgeWorkerState(str, Enum):
"""Edge Worker was shut down."""
UNKNOWN = "unknown"
"""No heartbeat signal from worker for some time, Edge Worker probably
down."""
+ MAINTENANCE_REQUEST = "maintenance request"
+ """Maintenance mode was requested by user."""
Review Comment:
I would re-phrase, the term "user" might not be clear. Might be Admin or Ops
staff mainly. But via API can also be automated.
```suggestion
"""Worker was requested to enter maintenance mode. Once worker receives
this the worker will pause fetching jobs."""
```
##########
providers/edge/src/airflow/providers/edge/models/edge_worker.py:
##########
@@ -52,6 +52,14 @@ class EdgeWorkerState(str, Enum):
"""Edge Worker was shut down."""
UNKNOWN = "unknown"
"""No heartbeat signal from worker for some time, Edge Worker probably
down."""
+ MAINTENANCE_REQUEST = "maintenance request"
+ """Maintenance mode was requested by user."""
+ MAINTENANCE_PENDING = "maintenance pending"
+ """Edge worker received the request for maintenance, waiting for jobs to
finish."""
+ MAINTENANCE_MODE = "maintenance mode"
+ """Edge worker is in maintenance mode."""
Review Comment:
```suggestion
"""Edge worker is in maintenance mode. It is online but pauses fetching
jobs."""
```
##########
providers/edge/src/airflow/providers/edge/plugins/templates/edge_worker_hosts.html:
##########
@@ -97,6 +100,25 @@ <h2>Edge Worker Hosts</h2>
{% endfor %}
</ul>
</td>
+ <td>
+ {%- if host.state == "idle" or host.state == "running" -%}
Review Comment:
This can be a bit simpler
```suggestion
{%- if host.state in ["idle", "running"] -%}
```
##########
providers/edge/src/airflow/providers/edge/plugins/templates/edge_worker_hosts.html:
##########
@@ -97,6 +100,25 @@ <h2>Edge Worker Hosts</h2>
{% endfor %}
</ul>
</td>
+ <td>
+ {%- if host.state == "idle" or host.state == "running" -%}
+ <form action="../edgeworkerhosts/status/maintenance/{{
host.worker_name }}/on" method="GET">
+ <button type="submit" style="padding: 10px 20px;
background-color: blue; color: white; border: none; border-radius: 5px;">
+ Enable
+ </button>
+ </form>
+ {%- elif host.state == "maintenance pending" or host.state ==
"maintenance mode" or host.state == "maintenance request" -%}
Review Comment:
Same here
```suggestion
{%- elif host.state in ["maintenance pending", "maintenance
mode", "maintenance request"] -%}
```
##########
providers/edge/src/airflow/providers/edge/models/edge_worker.py:
##########
@@ -52,6 +52,14 @@ class EdgeWorkerState(str, Enum):
"""Edge Worker was shut down."""
UNKNOWN = "unknown"
"""No heartbeat signal from worker for some time, Edge Worker probably
down."""
+ MAINTENANCE_REQUEST = "maintenance request"
+ """Maintenance mode was requested by user."""
+ MAINTENANCE_PENDING = "maintenance pending"
+ """Edge worker received the request for maintenance, waiting for jobs to
finish."""
+ MAINTENANCE_MODE = "maintenance mode"
+ """Edge worker is in maintenance mode."""
+ MAINTENANCE_EXIT = "maintenance exit"
+ """Request worker to exit maintenance mode."""
Review Comment:
```suggestion
"""Request worker to exit maintenance mode. Once the worker receives
this state it will un-pause and fetch new jobs."""
```
##########
providers/edge/docs/changelog.rst:
##########
@@ -27,6 +27,14 @@
Changelog
---------
+0.11.0pre0
+..........
+
+Misc
+~~~~
+
+* ``Enable maintenance mode for edge worker.``
Review Comment:
I'd re-phrase this a bit, proposal:
```suggestion
* ``Add the option to set edge workers to maintenance mode via UI plugin and
API.``
```
##########
providers/edge/src/airflow/providers/edge/worker_api/routes/worker.py:
##########
@@ -137,11 +156,11 @@ def set_state(
worker_name: Annotated[str, _worker_name_doc],
body: Annotated[WorkerStateBody, _worker_state_doc],
session: SessionDep,
-) -> list[str] | None:
+) -> dict[str, str | list[str] | None]:
Review Comment:
Instead of returning a Dict can you please model this properly via a
Pydantic class?
##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -365,14 +367,34 @@ def check_running_jobs(self) -> None:
def heartbeat(self) -> None:
"""Report liveness state of worker to central site with stats."""
- state = (
- (EdgeWorkerState.TERMINATING if _EdgeWorkerCli.drain else
EdgeWorkerState.RUNNING)
- if self.jobs
- else EdgeWorkerState.IDLE
- )
+ if _EdgeWorkerCli.drain:
+ state = EdgeWorkerState.TERMINATING
+ elif self.jobs:
+ if _EdgeWorkerCli.maintenance_mode:
+ state = EdgeWorkerState.MAINTENANCE_PENDING
+ else:
+ state = EdgeWorkerState.RUNNING
+ else:
+ if _EdgeWorkerCli.maintenance_mode:
+ state = EdgeWorkerState.MAINTENANCE_MODE
+ else:
+ state = EdgeWorkerState.IDLE
sysinfo = self._get_sysinfo()
try:
- self.queues = worker_set_state(self.hostname, state,
len(self.jobs), self.queues, sysinfo)
+ worker_info = worker_set_state(self.hostname, state,
len(self.jobs), self.queues, sysinfo)
+ self.queues = worker_info["queues"] if
isinstance(worker_info["queues"], list) else None
+ if worker_info["state"] in (
+ EdgeWorkerState.MAINTENANCE_REQUEST,
+ EdgeWorkerState.MAINTENANCE_PENDING,
+ EdgeWorkerState.MAINTENANCE_MODE,
+ ):
+ if not _EdgeWorkerCli.maintenance_mode:
+ logger.info("Maintenance mode requested!")
+ _EdgeWorkerCli.maintenance_mode = True
+ else:
+ if _EdgeWorkerCli.maintenance_mode:
+ logger.info("Exit Maintenance mode requested!")
+ _EdgeWorkerCli.maintenance_mode = False
Review Comment:
I see how this works and I think it will work. But it took me couple of
minutes to re-think the ping and pong of state communication. I think it is a
bit too complex.
I'd propose to un-bundle the state that is reported from the maintenance
request being passed back. How about if you just send a boolean kind of flag in
the response like "Please be in maintenance" in the case when maintenance is
requested (request/pending/in maintenance) and in other cases
(running,idle,exit maintenance...) send "false"? Then you can map and report
the state but don't need to mix and convert state enums on the interface.
Especially the `redefine_state_if_maintenance()` is then not needed.
##########
providers/edge/src/airflow/providers/edge/models/edge_worker.py:
##########
@@ -52,6 +52,14 @@ class EdgeWorkerState(str, Enum):
"""Edge Worker was shut down."""
UNKNOWN = "unknown"
"""No heartbeat signal from worker for some time, Edge Worker probably
down."""
+ MAINTENANCE_REQUEST = "maintenance request"
+ """Maintenance mode was requested by user."""
+ MAINTENANCE_PENDING = "maintenance pending"
+ """Edge worker received the request for maintenance, waiting for jobs to
finish."""
Review Comment:
Also propose a bit of re-phrasing.
```suggestion
"""Edge worker received the request for maintenance, waiting for jobs to
finish. Once jobs are finished will move to 'maintenance mode'."""
```
##########
providers/edge/src/airflow/providers/edge/plugins/templates/edge_worker_hosts.html:
##########
@@ -97,6 +100,25 @@ <h2>Edge Worker Hosts</h2>
{% endfor %}
</ul>
</td>
+ <td>
+ {%- if host.state == "idle" or host.state == "running" -%}
+ <form action="../edgeworkerhosts/status/maintenance/{{
host.worker_name }}/on" method="GET">
+ <button type="submit" style="padding: 10px 20px;
background-color: blue; color: white; border: none; border-radius: 5px;">
+ Enable
+ </button>
+ </form>
+ {%- elif host.state == "maintenance pending" or host.state ==
"maintenance mode" or host.state == "maintenance request" -%}
+ <form action="../edgeworkerhosts/status/maintenance/{{
host.worker_name }}/off" method="GET">
+ <button type="submit" style="padding: 10px 20px;
background-color: blue; color: white; border: none; border-radius: 5px;">
+ Disable
+ </button>
+ </form>
+ {% else %}
+ <button type="button" style="padding: 10px 20px;
background-color: gray; color: white; border: none; border-radius: 5px;"
disabled>
+ Unavailable
+ </button>
Review Comment:
I would just render nothing if no option.
```suggestion
```
##########
providers/edge/src/airflow/providers/edge/worker_api/routes/worker.py:
##########
@@ -157,7 +176,7 @@ def set_state(
queues=worker.queues,
)
_assert_version(body.sysinfo) # Exception only after worker state is in
the DB
- return worker.queues
+ return {"state": worker.state, "queues": worker.queues}
Review Comment:
I see this change is a breaking change. Maybe not too critical as it is not
a stable release (yet) but existing clients will fail when receiving an invalid
response from server as structure changed. At least this needs to be added to
release notes.
##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -365,14 +367,34 @@ def check_running_jobs(self) -> None:
def heartbeat(self) -> None:
"""Report liveness state of worker to central site with stats."""
- state = (
- (EdgeWorkerState.TERMINATING if _EdgeWorkerCli.drain else
EdgeWorkerState.RUNNING)
- if self.jobs
- else EdgeWorkerState.IDLE
- )
+ if _EdgeWorkerCli.drain:
+ state = EdgeWorkerState.TERMINATING
+ elif self.jobs:
+ if _EdgeWorkerCli.maintenance_mode:
+ state = EdgeWorkerState.MAINTENANCE_PENDING
+ else:
+ state = EdgeWorkerState.RUNNING
+ else:
+ if _EdgeWorkerCli.maintenance_mode:
+ state = EdgeWorkerState.MAINTENANCE_MODE
+ else:
+ state = EdgeWorkerState.IDLE
sysinfo = self._get_sysinfo()
try:
- self.queues = worker_set_state(self.hostname, state,
len(self.jobs), self.queues, sysinfo)
+ worker_info = worker_set_state(self.hostname, state,
len(self.jobs), self.queues, sysinfo)
+ self.queues = worker_info["queues"] if
isinstance(worker_info["queues"], list) else None
+ if worker_info["state"] in (
+ EdgeWorkerState.MAINTENANCE_REQUEST,
+ EdgeWorkerState.MAINTENANCE_PENDING,
+ EdgeWorkerState.MAINTENANCE_MODE,
+ ):
+ if not _EdgeWorkerCli.maintenance_mode:
+ logger.info("Maintenance mode requested!")
+ _EdgeWorkerCli.maintenance_mode = True
Review Comment:
As the maintenance mode is passed via the heartbeat, it takes some time
until is picked up __and__ also some time to report back the current state.
Does it make sense to change the timing of heartbeat such that a
confirmation/change in maintenance is signalled immediately and not in the next
cycle?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]