This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eee6919ff64 fix(provider/edge): add back mising method map (#44468)
eee6919ff64 is described below
commit eee6919ff64412156d821dcebc96a58efafd7786
Author: Wei Lee <[email protected]>
AuthorDate: Fri Nov 29 13:48:34 2024 +0800
fix(provider/edge): add back mising method map (#44468)
---
.../providers/edge/worker_api/routes/rpc_api.py | 109 ++++++++++++++++++++-
1 file changed, 104 insertions(+), 5 deletions(-)
diff --git a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
index ab2d3133d3f..171356b8f21 100644
--- a/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
+++ b/providers/src/airflow/providers/edge/worker_api/routes/rpc_api.py
@@ -32,9 +32,6 @@ from jwt import (
InvalidSignatureError,
)
-from airflow.api_internal.endpoints.rpc_api_endpoint import (
- initialize_method_map,
-)
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.edge.worker_api.datamodels import JsonRpcRequest
@@ -59,13 +56,115 @@ rpc_api_router = AirflowRouter(tags=["JSONRPC"])
@cache
def _initialize_method_map() -> dict[str, Callable]:
+ from airflow.api.common.trigger_dag import trigger_dag
+ from airflow.assets.manager import AssetManager
+ from airflow.cli.commands.task_command import _get_ti_db_access
+ from airflow.dag_processing.manager import DagFileProcessorManager
+ from airflow.dag_processing.processor import DagFileProcessor
+ from airflow.jobs.job import Job, most_recent_job
+ from airflow.models import Trigger, Variable, XCom
+ from airflow.models.dag import DAG, DagModel
+ from airflow.models.dagrun import DagRun
+ from airflow.models.dagwarning import DagWarning
+ from airflow.models.renderedtifields import RenderedTaskInstanceFields
+ from airflow.models.serialized_dag import SerializedDagModel
+ from airflow.models.skipmixin import SkipMixin
+ from airflow.models.taskinstance import (
+ TaskInstance,
+ _add_log,
+ _defer_task,
+ _get_template_context,
+ _handle_failure,
+ _handle_reschedule,
+ _record_task_map_for_downstreams,
+ _update_rtif,
+ _update_ti_heartbeat,
+ _xcom_pull,
+ )
+ from airflow.models.xcom_arg import _get_task_map_length
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker
+ from airflow.sdk.definitions.asset import expand_alias_to_assets
+ from airflow.secrets.metastore import MetastoreBackend
+ from airflow.sensors.base import _orig_start_date
+ from airflow.utils.cli_action_loggers import _default_action_log_internal
+ from airflow.utils.log.file_task_handler import FileTaskHandler
- internal_api_functions = initialize_method_map().values()
functions: list[Callable] = [
- *internal_api_functions,
+ _default_action_log_internal,
+ _defer_task,
+ _get_template_context,
+ _get_ti_db_access,
+ _get_task_map_length,
+ _update_rtif,
+ _update_ti_heartbeat,
+ _orig_start_date,
+ _handle_failure,
+ _handle_reschedule,
+ _add_log,
+ _xcom_pull,
+ _record_task_map_for_downstreams,
+ trigger_dag,
+ DagModel.deactivate_deleted_dags,
+ DagModel.get_paused_dag_ids,
+ DagModel.get_current,
+ DagFileProcessor._execute_task_callbacks,
+ DagFileProcessor.execute_callbacks,
+ DagFileProcessor.execute_callbacks_without_dag,
+ DagFileProcessor.save_dag_to_db,
+ DagFileProcessor.update_import_errors,
+ DagFileProcessor._validate_task_pools_and_update_dag_warnings,
+ DagFileProcessorManager._fetch_callbacks,
+ DagFileProcessorManager._get_priority_filelocs,
+ DagFileProcessorManager.clear_nonexistent_import_errors,
+ DagFileProcessorManager.deactivate_stale_dags,
+ DagWarning.purge_inactive_dag_warnings,
+ expand_alias_to_assets,
+ AssetManager.register_asset_change,
+ FileTaskHandler._render_filename_db_access,
+ Job._add_to_db,
+ Job._fetch_from_db,
+ Job._kill,
+ Job._update_heartbeat,
+ Job._update_in_db,
+ most_recent_job,
+ MetastoreBackend._fetch_connection,
+ MetastoreBackend._fetch_variable,
+ XCom.get_value,
+ XCom.get_one,
+ # XCom.get_many, # Not supported because it returns query
+ XCom.clear,
+ XCom.set,
+ Variable._set,
+ Variable._update,
+ Variable._delete,
+ DAG.fetch_callback,
+ DAG.fetch_dagrun,
+ DagRun.fetch_task_instances,
+ DagRun.get_previous_dagrun,
+ DagRun.get_previous_scheduled_dagrun,
+ DagRun.get_task_instances,
+ DagRun.fetch_task_instance,
+ DagRun._get_log_template,
+ RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
+ SerializedDagModel.get_serialized_dag,
+ SkipMixin._skip,
+ SkipMixin._skip_all_except,
+ TaskInstance._check_and_change_state_before_execution,
+ TaskInstance.get_task_instance,
+ TaskInstance._get_dagrun,
+ TaskInstance._set_state,
+ TaskInstance.save_to_db,
+ TaskInstance._clear_xcom_data,
+ TaskInstance._register_asset_changes_int,
+ Trigger.from_object,
+ Trigger.bulk_fetch,
+ Trigger.clean_unused,
+ Trigger.submit_event,
+ Trigger.submit_failure,
+ Trigger.ids_for_triggerer,
+ Trigger.assign_unassigned,
# Additional things from EdgeExecutor
EdgeJob.reserve_task,
EdgeJob.set_state,