This is an automated email from the ASF dual-hosted git repository. jscheffl pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new eee6919ff64 fix(provider/edge): add back mising method map (#44468) eee6919ff64 is described below commit eee6919ff64412156d821dcebc96a58efafd7786 Author: Wei Lee <weilee...@gmail.com> AuthorDate: Fri Nov 29 13:48:34 2024 +0800 fix(provider/edge): add back mising method map (#44468) --- .../providers/edge/worker_api/routes/rpc_api.py | 109 ++++++++++++++++++++- 1 file changed, 104 insertions(+), 5 deletions(-) diff --git a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py index ab2d3133d3f..171356b8f21 100644 --- a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py +++ b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py @@ -32,9 +32,6 @@ from jwt import ( InvalidSignatureError, ) -from airflow.api_internal.endpoints.rpc_api_endpoint import ( - initialize_method_map, -) from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.providers.edge.worker_api.datamodels import JsonRpcRequest @@ -59,13 +56,115 @@ rpc_api_router = AirflowRouter(tags=["JSONRPC"]) @cache def _initialize_method_map() -> dict[str, Callable]: + from airflow.api.common.trigger_dag import trigger_dag + from airflow.assets.manager import AssetManager + from airflow.cli.commands.task_command import _get_ti_db_access + from airflow.dag_processing.manager import DagFileProcessorManager + from airflow.dag_processing.processor import DagFileProcessor + from airflow.jobs.job import Job, most_recent_job + from airflow.models import Trigger, Variable, XCom + from airflow.models.dag import DAG, DagModel + from airflow.models.dagrun import DagRun + from airflow.models.dagwarning import DagWarning + from airflow.models.renderedtifields import RenderedTaskInstanceFields + from airflow.models.serialized_dag import SerializedDagModel + from airflow.models.skipmixin import SkipMixin + from airflow.models.taskinstance import ( + TaskInstance, + _add_log, + _defer_task, + _get_template_context, + _handle_failure, + _handle_reschedule, + _record_task_map_for_downstreams, + _update_rtif, + _update_ti_heartbeat, + _xcom_pull, + ) + from airflow.models.xcom_arg import _get_task_map_length from airflow.providers.edge.models.edge_job import EdgeJob from airflow.providers.edge.models.edge_logs import EdgeLogs from airflow.providers.edge.models.edge_worker import EdgeWorker + from airflow.sdk.definitions.asset import expand_alias_to_assets + from airflow.secrets.metastore import MetastoreBackend + from airflow.sensors.base import _orig_start_date + from airflow.utils.cli_action_loggers import _default_action_log_internal + from airflow.utils.log.file_task_handler import FileTaskHandler - internal_api_functions = initialize_method_map().values() functions: list[Callable] = [ - *internal_api_functions, + _default_action_log_internal, + _defer_task, + _get_template_context, + _get_ti_db_access, + _get_task_map_length, + _update_rtif, + _update_ti_heartbeat, + _orig_start_date, + _handle_failure, + _handle_reschedule, + _add_log, + _xcom_pull, + _record_task_map_for_downstreams, + trigger_dag, + DagModel.deactivate_deleted_dags, + DagModel.get_paused_dag_ids, + DagModel.get_current, + DagFileProcessor._execute_task_callbacks, + DagFileProcessor.execute_callbacks, + DagFileProcessor.execute_callbacks_without_dag, + DagFileProcessor.save_dag_to_db, + DagFileProcessor.update_import_errors, + DagFileProcessor._validate_task_pools_and_update_dag_warnings, + DagFileProcessorManager._fetch_callbacks, + DagFileProcessorManager._get_priority_filelocs, + DagFileProcessorManager.clear_nonexistent_import_errors, + DagFileProcessorManager.deactivate_stale_dags, + DagWarning.purge_inactive_dag_warnings, + expand_alias_to_assets, + AssetManager.register_asset_change, + FileTaskHandler._render_filename_db_access, + Job._add_to_db, + Job._fetch_from_db, + Job._kill, + Job._update_heartbeat, + Job._update_in_db, + most_recent_job, + MetastoreBackend._fetch_connection, + MetastoreBackend._fetch_variable, + XCom.get_value, + XCom.get_one, + # XCom.get_many, # Not supported because it returns query + XCom.clear, + XCom.set, + Variable._set, + Variable._update, + Variable._delete, + DAG.fetch_callback, + DAG.fetch_dagrun, + DagRun.fetch_task_instances, + DagRun.get_previous_dagrun, + DagRun.get_previous_scheduled_dagrun, + DagRun.get_task_instances, + DagRun.fetch_task_instance, + DagRun._get_log_template, + RenderedTaskInstanceFields._update_runtime_evaluated_template_fields, + SerializedDagModel.get_serialized_dag, + SkipMixin._skip, + SkipMixin._skip_all_except, + TaskInstance._check_and_change_state_before_execution, + TaskInstance.get_task_instance, + TaskInstance._get_dagrun, + TaskInstance._set_state, + TaskInstance.save_to_db, + TaskInstance._clear_xcom_data, + TaskInstance._register_asset_changes_int, + Trigger.from_object, + Trigger.bulk_fetch, + Trigger.clean_unused, + Trigger.submit_event, + Trigger.submit_failure, + Trigger.ids_for_triggerer, + Trigger.assign_unassigned, # Additional things from EdgeExecutor EdgeJob.reserve_task, EdgeJob.set_state,