This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eee6919ff64 fix(provider/edge): add back mising method map (#44468)
eee6919ff64 is described below

commit eee6919ff64412156d821dcebc96a58efafd7786
Author: Wei Lee <weilee...@gmail.com>
AuthorDate: Fri Nov 29 13:48:34 2024 +0800

    fix(provider/edge): add back mising method map (#44468)
---
 .../providers/edge/worker_api/routes/rpc_api.py    | 109 ++++++++++++++++++++-
 1 file changed, 104 insertions(+), 5 deletions(-)

diff --git a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py 
b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
index ab2d3133d3f..171356b8f21 100644
--- a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
+++ b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
@@ -32,9 +32,6 @@ from jwt import (
     InvalidSignatureError,
 )
 
-from airflow.api_internal.endpoints.rpc_api_endpoint import (
-    initialize_method_map,
-)
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.edge.worker_api.datamodels import JsonRpcRequest
@@ -59,13 +56,115 @@ rpc_api_router = AirflowRouter(tags=["JSONRPC"])
 
 @cache
 def _initialize_method_map() -> dict[str, Callable]:
+    from airflow.api.common.trigger_dag import trigger_dag
+    from airflow.assets.manager import AssetManager
+    from airflow.cli.commands.task_command import _get_ti_db_access
+    from airflow.dag_processing.manager import DagFileProcessorManager
+    from airflow.dag_processing.processor import DagFileProcessor
+    from airflow.jobs.job import Job, most_recent_job
+    from airflow.models import Trigger, Variable, XCom
+    from airflow.models.dag import DAG, DagModel
+    from airflow.models.dagrun import DagRun
+    from airflow.models.dagwarning import DagWarning
+    from airflow.models.renderedtifields import RenderedTaskInstanceFields
+    from airflow.models.serialized_dag import SerializedDagModel
+    from airflow.models.skipmixin import SkipMixin
+    from airflow.models.taskinstance import (
+        TaskInstance,
+        _add_log,
+        _defer_task,
+        _get_template_context,
+        _handle_failure,
+        _handle_reschedule,
+        _record_task_map_for_downstreams,
+        _update_rtif,
+        _update_ti_heartbeat,
+        _xcom_pull,
+    )
+    from airflow.models.xcom_arg import _get_task_map_length
     from airflow.providers.edge.models.edge_job import EdgeJob
     from airflow.providers.edge.models.edge_logs import EdgeLogs
     from airflow.providers.edge.models.edge_worker import EdgeWorker
+    from airflow.sdk.definitions.asset import expand_alias_to_assets
+    from airflow.secrets.metastore import MetastoreBackend
+    from airflow.sensors.base import _orig_start_date
+    from airflow.utils.cli_action_loggers import _default_action_log_internal
+    from airflow.utils.log.file_task_handler import FileTaskHandler
 
-    internal_api_functions = initialize_method_map().values()
     functions: list[Callable] = [
-        *internal_api_functions,
+        _default_action_log_internal,
+        _defer_task,
+        _get_template_context,
+        _get_ti_db_access,
+        _get_task_map_length,
+        _update_rtif,
+        _update_ti_heartbeat,
+        _orig_start_date,
+        _handle_failure,
+        _handle_reschedule,
+        _add_log,
+        _xcom_pull,
+        _record_task_map_for_downstreams,
+        trigger_dag,
+        DagModel.deactivate_deleted_dags,
+        DagModel.get_paused_dag_ids,
+        DagModel.get_current,
+        DagFileProcessor._execute_task_callbacks,
+        DagFileProcessor.execute_callbacks,
+        DagFileProcessor.execute_callbacks_without_dag,
+        DagFileProcessor.save_dag_to_db,
+        DagFileProcessor.update_import_errors,
+        DagFileProcessor._validate_task_pools_and_update_dag_warnings,
+        DagFileProcessorManager._fetch_callbacks,
+        DagFileProcessorManager._get_priority_filelocs,
+        DagFileProcessorManager.clear_nonexistent_import_errors,
+        DagFileProcessorManager.deactivate_stale_dags,
+        DagWarning.purge_inactive_dag_warnings,
+        expand_alias_to_assets,
+        AssetManager.register_asset_change,
+        FileTaskHandler._render_filename_db_access,
+        Job._add_to_db,
+        Job._fetch_from_db,
+        Job._kill,
+        Job._update_heartbeat,
+        Job._update_in_db,
+        most_recent_job,
+        MetastoreBackend._fetch_connection,
+        MetastoreBackend._fetch_variable,
+        XCom.get_value,
+        XCom.get_one,
+        # XCom.get_many, # Not supported because it returns query
+        XCom.clear,
+        XCom.set,
+        Variable._set,
+        Variable._update,
+        Variable._delete,
+        DAG.fetch_callback,
+        DAG.fetch_dagrun,
+        DagRun.fetch_task_instances,
+        DagRun.get_previous_dagrun,
+        DagRun.get_previous_scheduled_dagrun,
+        DagRun.get_task_instances,
+        DagRun.fetch_task_instance,
+        DagRun._get_log_template,
+        RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
+        SerializedDagModel.get_serialized_dag,
+        SkipMixin._skip,
+        SkipMixin._skip_all_except,
+        TaskInstance._check_and_change_state_before_execution,
+        TaskInstance.get_task_instance,
+        TaskInstance._get_dagrun,
+        TaskInstance._set_state,
+        TaskInstance.save_to_db,
+        TaskInstance._clear_xcom_data,
+        TaskInstance._register_asset_changes_int,
+        Trigger.from_object,
+        Trigger.bulk_fetch,
+        Trigger.clean_unused,
+        Trigger.submit_event,
+        Trigger.submit_failure,
+        Trigger.ids_for_triggerer,
+        Trigger.assign_unassigned,
         # Additional things from EdgeExecutor
         EdgeJob.reserve_task,
         EdgeJob.set_state,

Reply via email to