This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 624211f33f Add Executors discovery and documentation (#32532)
624211f33f is described below

commit 624211f33f30d0147b9daeb5913d2eb01861a842
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jul 12 22:14:09 2023 +0200

    Add Executors discovery and documentation (#32532)
    
    * Add Executors discovery and documentation
    
    The Executors can now be added via providers. This PR adds
    mechanism of discovering the executors via Providers Manager,
    exposing them via CLI and documenting in core-extensions.
    
    * Update scripts/in_container/verify_providers.py
---
 airflow/cli/cli_config.py                          |  6 ++++
 airflow/cli/commands/provider_command.py           | 12 ++++++++
 airflow/provider.yaml.schema.json                  |  9 +++++-
 airflow/provider_info.schema.json                  | 11 ++++++--
 airflow/providers/celery/provider.yaml             |  4 +++
 airflow/providers_manager.py                       | 32 +++++++++++++++++----
 .../core-extensions/executors.rst                  | 33 ++++++++++++++++++++++
 docs/exts/executors.rst.jinja2                     | 27 ++++++++++++++++++
 docs/exts/operators_and_hooks_ref.py               | 29 +++++++++++++++++++
 scripts/in_container/verify_providers.py           |  7 ++++-
 10 files changed, 160 insertions(+), 10 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 92ba87c3de..97aa83c100 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -1814,6 +1814,12 @@ PROVIDERS_COMMANDS = (
         
func=lazy_load_command("airflow.cli.commands.provider_command.auth_backend_list"),
         args=(ARG_OUTPUT, ARG_VERBOSE),
     ),
+    ActionCommand(
+        name="executors",
+        help="Get information about executors provided",
+        
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
+        args=(ARG_OUTPUT, ARG_VERBOSE),
+    ),
 )
 
 USERS_COMMANDS = (
diff --git a/airflow/cli/commands/provider_command.py 
b/airflow/cli/commands/provider_command.py
index 876905c1c2..67c78c57fe 100644
--- a/airflow/cli/commands/provider_command.py
+++ b/airflow/cli/commands/provider_command.py
@@ -167,3 +167,15 @@ def auth_backend_list(args):
             "api_auth_backand_module": x,
         },
     )
+
+
+@suppress_logs_and_warning
+def executors_list(args):
+    """Lists all executors at the command line."""
+    AirflowConsole().print_as(
+        data=list(ProvidersManager().executor_class_names),
+        output=args.output,
+        mapper=lambda x: {
+            "executor_class_names": x,
+        },
+    )
diff --git a/airflow/provider.yaml.schema.json 
b/airflow/provider.yaml.schema.json
index 447098c655..a1d0822af6 100644
--- a/airflow/provider.yaml.schema.json
+++ b/airflow/provider.yaml.schema.json
@@ -306,13 +306,20 @@
           "type": "string"
       }
     },
-      "notifications": {
+    "notifications": {
           "type": "array",
           "description": "Notification class names",
           "items": {
               "type": "string"
           }
     },
+    "executors": {
+          "type": "array",
+          "description": "Executor class names",
+          "items": {
+              "type": "string"
+          }
+      },
     "plugins": {
       "type": "array",
       "description": "Plugins exposed by the provider",
diff --git a/airflow/provider_info.schema.json 
b/airflow/provider_info.schema.json
index a8bd9408cb..86b0430cc3 100644
--- a/airflow/provider_info.schema.json
+++ b/airflow/provider_info.schema.json
@@ -71,13 +71,20 @@
           "type": "string"
       }
     },
-      "notifications": {
+    "notifications": {
           "type": "array",
           "description": "Notification class names",
           "items": {
               "type": "string"
           }
-      },
+    },
+    "executors": {
+          "type": "array",
+          "description": "Executor class names",
+          "items": {
+              "type": "string"
+          }
+    },
     "task-decorators": {
         "type": "array",
         "description": "Apply custom decorators to the TaskFlow API. Can be 
accessed by users via '@task.<name>'",
diff --git a/airflow/providers/celery/provider.yaml 
b/airflow/providers/celery/provider.yaml
index b6b1e7e4e5..32682e51e5 100644
--- a/airflow/providers/celery/provider.yaml
+++ b/airflow/providers/celery/provider.yaml
@@ -56,3 +56,7 @@ sensors:
   - integration-name: Celery
     python-modules:
       - airflow.providers.celery.sensors.celery_queue
+
+executors:
+  - airflow.providers.celery.executors.celery_executor.CeleryExecutor
+  - 
airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index 668d79a0bc..abd14d27b5 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -273,7 +273,7 @@ def log_import_warning(class_name, e, provider_package):
 KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", 
"No module named 'paramiko'")]
 
 
-def _sanity_check(
+def _correctness_check(
     provider_package: str, class_name: str, provider_info: ProviderInfo
 ) -> type[BaseHook] | None:
     """
@@ -389,6 +389,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         self._extra_link_class_name_set: set[str] = set()
         self._logging_class_name_set: set[str] = set()
         self._secrets_backend_class_name_set: set[str] = set()
+        self._executor_class_name_set: set[str] = set()
         self._api_auth_backend_module_names: set[str] = set()
         self._trigger_info_set: set[TriggerInfo] = set()
         self._provider_schema_validator = 
_create_provider_info_schema_validator()
@@ -455,6 +456,12 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         self.initialize_providers_list()
         self._discover_secrets_backends()
 
+    @provider_info_cache("executors")
+    def initialize_providers_executors(self):
+        """Lazy initialization of providers executors information."""
+        self.initialize_providers_list()
+        self._discover_executors()
+
     @provider_info_cache("auth_backends")
     def initialize_providers_auth_backends(self):
         """Lazy initialization of providers API auth_backends information."""
@@ -797,7 +804,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
                     f"Provider package name is not set when hook_class_name 
({hook_class_name}) is used"
                 )
         allowed_field_classes = [IntegerField, PasswordField, StringField, 
BooleanField]
-        hook_class = _sanity_check(package_name, hook_class_name, 
provider_info)
+        hook_class = _correctness_check(package_name, hook_class_name, 
provider_info)
         if hook_class is None:
             return None
         try:
@@ -923,7 +930,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("extra-links"):
                 for extra_link_class_name in provider.data["extra-links"]:
-                    if _sanity_check(provider_package, extra_link_class_name, 
provider):
+                    if _correctness_check(provider_package, 
extra_link_class_name, provider):
                         
self._extra_link_class_name_set.add(extra_link_class_name)
 
     def _discover_logging(self) -> None:
@@ -931,7 +938,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("logging"):
                 for logging_class_name in provider.data["logging"]:
-                    if _sanity_check(provider_package, logging_class_name, 
provider):
+                    if _correctness_check(provider_package, 
logging_class_name, provider):
                         self._logging_class_name_set.add(logging_class_name)
 
     def _discover_secrets_backends(self) -> None:
@@ -939,7 +946,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("secrets-backends"):
                 for secrets_backends_class_name in 
provider.data["secrets-backends"]:
-                    if _sanity_check(provider_package, 
secrets_backends_class_name, provider):
+                    if _correctness_check(provider_package, 
secrets_backends_class_name, provider):
                         
self._secrets_backend_class_name_set.add(secrets_backends_class_name)
 
     def _discover_auth_backends(self) -> None:
@@ -947,9 +954,17 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("auth-backends"):
                 for auth_backend_module_name in provider.data["auth-backends"]:
-                    if _sanity_check(provider_package, 
auth_backend_module_name + ".init_app", provider):
+                    if _correctness_check(provider_package, 
auth_backend_module_name + ".init_app", provider):
                         
self._api_auth_backend_module_names.add(auth_backend_module_name)
 
+    def _discover_executors(self) -> None:
+        """Retrieve all executors defined in the providers."""
+        for provider_package, provider in self._provider_dict.items():
+            if provider.data.get("executors"):
+                for executors_class_name in provider.data["executors"]:
+                    if _correctness_check(provider_package, 
executors_class_name, provider):
+                        self._executor_class_name_set.add(executors_class_name)
+
     @provider_info_cache("triggers")
     def initialize_providers_triggers(self):
         """Initialization of providers triggers."""
@@ -1033,3 +1048,8 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
         """Returns set of API auth backend class names."""
         self.initialize_providers_auth_backends()
         return sorted(self._api_auth_backend_module_names)
+
+    @property
+    def executor_class_names(self) -> list[str]:
+        self.initialize_providers_executors()
+        return sorted(self._executor_class_name_set)
diff --git a/docs/apache-airflow-providers/core-extensions/executors.rst 
b/docs/apache-airflow-providers/core-extensions/executors.rst
new file mode 100644
index 0000000000..f5411beec7
--- /dev/null
+++ b/docs/apache-airflow-providers/core-extensions/executors.rst
@@ -0,0 +1,33 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Executors
+---------
+
+This is a summary of all Apache Airflow Community provided implementations of 
Executors
+exposed via community-managed providers.
+
+Airflow can be extended by providers with Executors. Each provider can define 
their own Executors,
+that can be configured to handle executing tasks
+
+The executors are explained in
+:doc:`apache-airflow:core-concepts/executor/index` and you can also see those
+provided by the community-managed providers:
+
+.. airflow-executors::
+   :tags: None
+   :header-separator: "
diff --git a/docs/exts/executors.rst.jinja2 b/docs/exts/executors.rst.jinja2
new file mode 100644
index 0000000000..bce6d8c3b8
--- /dev/null
+++ b/docs/exts/executors.rst.jinja2
@@ -0,0 +1,27 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+#}
+{%for provider, provider_dict in items.items() %}
+{{ provider_dict['name'] }}
+{{ header_separator * (provider_dict['name']|length) }}
+
+{% for executor in provider_dict['executors'] -%}
+- :class:`~{{ executor }}`
+{% endfor -%}
+
+{% endfor %}
diff --git a/docs/exts/operators_and_hooks_ref.py 
b/docs/exts/operators_and_hooks_ref.py
index 309259ae1d..9b78674b9d 100644
--- a/docs/exts/operators_and_hooks_ref.py
+++ b/docs/exts/operators_and_hooks_ref.py
@@ -284,12 +284,31 @@ def _prepare_notifications_data():
     return all_notifiers
 
 
+def _prepare_executors_data():
+    package_data = load_package_data()
+    all_executors = {}
+    for provider in package_data:
+        if executors := provider.get("executors"):
+            package_name = provider["package-name"]
+            all_executors[package_name] = {
+                "name": provider["name"],
+                "executors": executors,
+            }
+    return all_executors
+
+
 def _render_notification_content(*, header_separator: str):
     tabular_data = _prepare_notifications_data()
 
     return _render_template("notifications.rst.jinja2", items=tabular_data, 
header_separator=header_separator)
 
 
+def _render_executors_content(*, header_separator: str):
+    tabular_data = _prepare_executors_data()
+
+    return _render_template("executors.rst.jinja2", items=tabular_data, 
header_separator=header_separator)
+
+
 class BaseJinjaReferenceDirective(Directive):
     """The base directive for OperatorsHooksReferenceDirective and 
TransfersReferenceDirective"""
 
@@ -396,6 +415,15 @@ class NotificationsDirective(BaseJinjaReferenceDirective):
         )
 
 
+class ExecutorsDirective(BaseJinjaReferenceDirective):
+    """Generate list of executors"""
+
+    def render_content(self, *, tags: set[str] | None, header_separator: str = 
DEFAULT_HEADER_SEPARATOR):
+        return _render_executors_content(
+            header_separator=header_separator,
+        )
+
+
 def setup(app):
     """Setup plugin"""
     app.add_directive("operators-hooks-ref", OperatorsHooksReferenceDirective)
@@ -406,6 +434,7 @@ def setup(app):
     app.add_directive("airflow-connections", ConnectionsDirective)
     app.add_directive("airflow-extra-links", ExtraLinksDirective)
     app.add_directive("airflow-notifications", NotificationsDirective)
+    app.add_directive("airflow-executors", ExecutorsDirective)
 
     return {"parallel_read_safe": True, "parallel_write_safe": True}
 
diff --git a/scripts/in_container/verify_providers.py 
b/scripts/in_container/verify_providers.py
index d5bff21582..08193a62db 100755
--- a/scripts/in_container/verify_providers.py
+++ b/scripts/in_container/verify_providers.py
@@ -714,12 +714,17 @@ def run_provider_discovery():
     subprocess.run(["airflow", "providers", "secrets"], check=True)
     console.print("[bright_blue]List all auth backends[/]\n")
     subprocess.run(["airflow", "providers", "auth"], check=True)
-    if packaging.version.parse(airflow.version.version) >= 
packaging.version.parse("2.7.0.dev0"):
+    if packaging.version.parse(airflow.version.version) >= 
packaging.version.parse("2.6.0.dev0"):
         # CI also check if our providers are installable and discoverable in 
airflow older versions
         # But the triggers command is not available till airflow-2-6-0
         # TODO: Remove this block once airflow dependency in providers are > 
2-6-0
         console.print("[bright_blue]List all triggers[/]\n")
         subprocess.run(["airflow", "providers", "triggers"], check=True)
+    if packaging.version.parse(airflow.version.version) >= 
packaging.version.parse("2.7.0.dev0"):
+        # CI also check if our providers are installable and discoverable in 
airflow older versions
+        # But the executors command is not available till airflow-2-7-0
+        console.print("[bright_blue]List all executors[/]\n")
+        subprocess.run(["airflow", "providers", "executors"], check=True)
 
 
 AIRFLOW_LOCAL_SETTINGS_PATH = Path("/opt/airflow") / 
"airflow_local_settings.py"

Reply via email to