jscheffl commented on code in PR #61646:
URL: https://github.com/apache/airflow/pull/61646#discussion_r2795724479
##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -50,17 +49,34 @@
# Task tuple to send to be executed
TaskTuple = tuple[TaskInstanceKey, CommandType, str | None, Any | None]
-PARALLELISM: int = conf.getint("core", "PARALLELISM")
-DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
-
class EdgeExecutor(BaseExecutor):
"""Implementation of the EdgeExecutor to distribute work to Edge Workers
via HTTP."""
- def __init__(self, parallelism: int = PARALLELISM):
- super().__init__(parallelism=parallelism)
+ supports_multi_team: bool = True
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
+ # Check if self has the ExecutorConf set on the self.conf attribute,
and if not, set it to the global
+ # configuration object. This allows the changes to be backwards
compatible with older versions of
+ # Airflow.
+ # Can be removed when minimum supported provider version is equal to
the version of core airflow
+ # which introduces multi-team configuration.
+ if not hasattr(self, "conf"):
+ from airflow.configuration import conf as global_conf
+
+ self.conf = global_conf
+
+ # Track queues managed by this executor instance for multi-team
isolation.
+ # In a multi-team setup, each executor should only manage jobs and
workers
+ # associated with its own queues, not those of other teams.
+ # Initialize with the default queue from (possibly team-specific)
config.
+ self._managed_queues: set[str] = set()
+ default_queue = self.conf.get_mandatory_value("operators",
"default_queue")
+ self._managed_queues.add(default_queue)
Review Comment:
So, okay then it is "by concept" planned to potentielly start multiple
"instances" of the same (logic) class. In the "EdgeExecutor" case then each
executor class instance woul fill the same table just for another team. API
would service the tasks per team to different Edge Worker instances (each
worker need to run for a specific or a list(?) of teams.
Then all in this PR which is about managed queues need to be like managed
team(s9 + polishing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]