jedcunningham commented on code in PR #53189: URL: https://github.com/apache/airflow/pull/53189#discussion_r2201729954
########## airflow-core/src/airflow/utils/hitl_shared_links.py: ########## @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Utilities for Human-in-the-Loop (HITL) shared links.""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +from datetime import datetime, timedelta +from typing import Any +from urllib.parse import urlencode + +import structlog + +from airflow.configuration import conf +from airflow.utils import timezone + +log = structlog.get_logger(__name__) + + +class HITLSharedLinkManager: + """Manager for HITL shared links with token generation and verification.""" + + def __init__(self): + self.secret_key = conf.get("api", "hitl_shared_link_secret_key", fallback="") + self.default_expiration_hours = conf.getint("api", "hitl_shared_link_expiration_hours", fallback=24) + + def is_enabled(self) -> bool: + """Check if HITL shared links are enabled.""" + return conf.getboolean("api", "hitl_enable_shared_links", fallback=False) + + def _generate_signature(self, payload: str) -> str: + """Generate HMAC signature for the payload.""" + if not self.secret_key: + raise ValueError("HITL shared link secret key is not configured") + + signature = hmac.new( + self.secret_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 + ).digest() + return base64.urlsafe_b64encode(signature).decode("utf-8") + + def _verify_signature(self, payload: str, signature: str) -> bool: + """Verify HMAC signature for the payload.""" + expected_signature = self._generate_signature(payload) + return hmac.compare_digest(expected_signature, signature) + + def generate_link( + self, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int | None = None, + link_type: str = "action", + action: str | None = None, + expires_in_hours: int | None = None, + base_url: str | None = None, + ) -> dict[str, Any]: + """ + Generate a shared link for HITL task. + + :param dag_id: DAG ID + :param dag_run_id: DAG run ID + :param task_id: Task ID + :param map_index: Map index for mapped tasks + :param link_type: Type of link ('action' or 'redirect') + :param action: Action to perform (for action links) + :param expires_in_hours: Custom expiration time in hours + :param base_url: Base URL for the link + """ + if not self.is_enabled(): + raise ValueError("HITL shared links are not enabled") + + if link_type == "action" and not action: + raise ValueError("Action is required for action-type links") + + expiration_hours = expires_in_hours or self.default_expiration_hours + expires_at = timezone.utcnow() + timedelta(hours=expiration_hours) + + payload_data = { + "dag_id": dag_id, + "dag_run_id": dag_run_id, + "task_id": task_id, + "map_index": map_index, + "link_type": link_type, + "action": action, + "expires_at": expires_at.isoformat(), + } + + payload_str = json.dumps(payload_data, sort_keys=True) + signature = self._generate_signature(payload_str) + + encoded_payload = base64.urlsafe_b64encode(payload_str.encode("utf-8")).decode("utf-8") + + if base_url is None: + base_url = conf.get("api", "base_url", fallback="http://localhost:8080") + + if map_index is not None: + url_path = f"/api/v2/hitl-details/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/{map_index}" + else: + url_path = f"/api/v2/hitl-details/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}" + + query_params = { + "payload": encoded_payload, + "signature": signature, + } + + link_url = f"{base_url.rstrip('/')}{url_path}?{urlencode(query_params)}" + + return { + "task_instance_id": f"{dag_id}.{dag_run_id}.{task_id}.{map_index or -1}", Review Comment: Let's do the actual id, or provide each component of the logical key separately. ########## airflow-core/src/airflow/utils/hitl_shared_links.py: ########## @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Utilities for Human-in-the-Loop (HITL) shared links.""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +from datetime import datetime, timedelta +from typing import Any +from urllib.parse import urlencode + +import structlog + +from airflow.configuration import conf +from airflow.utils import timezone + +log = structlog.get_logger(__name__) + + +class HITLSharedLinkManager: + """Manager for HITL shared links with token generation and verification.""" + + def __init__(self): + self.secret_key = conf.get("api", "hitl_shared_link_secret_key", fallback="") + self.default_expiration_hours = conf.getint("api", "hitl_shared_link_expiration_hours", fallback=24) + + def is_enabled(self) -> bool: + """Check if HITL shared links are enabled.""" + return conf.getboolean("api", "hitl_enable_shared_links", fallback=False) + + def _generate_signature(self, payload: str) -> str: + """Generate HMAC signature for the payload.""" + if not self.secret_key: + raise ValueError("HITL shared link secret key is not configured") + + signature = hmac.new( + self.secret_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 + ).digest() + return base64.urlsafe_b64encode(signature).decode("utf-8") + + def _verify_signature(self, payload: str, signature: str) -> bool: + """Verify HMAC signature for the payload.""" + expected_signature = self._generate_signature(payload) + return hmac.compare_digest(expected_signature, signature) + + def generate_link( + self, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int | None = None, + link_type: str = "action", + action: str | None = None, + expires_in_hours: int | None = None, + base_url: str | None = None, + ) -> dict[str, Any]: + """ + Generate a shared link for HITL task. + + :param dag_id: DAG ID + :param dag_run_id: DAG run ID + :param task_id: Task ID + :param map_index: Map index for mapped tasks + :param link_type: Type of link ('action' or 'redirect') + :param action: Action to perform (for action links) + :param expires_in_hours: Custom expiration time in hours + :param base_url: Base URL for the link + """ + if not self.is_enabled(): + raise ValueError("HITL shared links are not enabled") + + if link_type == "action" and not action: + raise ValueError("Action is required for action-type links") + + expiration_hours = expires_in_hours or self.default_expiration_hours + expires_at = timezone.utcnow() + timedelta(hours=expiration_hours) + + payload_data = { + "dag_id": dag_id, + "dag_run_id": dag_run_id, + "task_id": task_id, + "map_index": map_index, + "link_type": link_type, + "action": action, + "expires_at": expires_at.isoformat(), + } + + payload_str = json.dumps(payload_data, sort_keys=True) + signature = self._generate_signature(payload_str) + + encoded_payload = base64.urlsafe_b64encode(payload_str.encode("utf-8")).decode("utf-8") + + if base_url is None: + base_url = conf.get("api", "base_url", fallback="http://localhost:8080") Review Comment: I don't think we should have a fallback here. If it's not set, we should probably fail? ########## airflow-core/src/airflow/config_templates/config.yml: ########## @@ -1493,6 +1493,32 @@ api: type: boolean example: ~ default: "False" + hitl_enable_shared_links: + description: | + Enable Human-in-the-Loop (HITL) shared links functionality. When enabled, users can generate + shareable links for HITL tasks that can be used to perform actions or redirect to the UI. + This feature must be explicitly enabled for security reasons. + version_added: 3.1.0 + type: boolean + example: ~ + default: "False" + hitl_shared_link_secret_key: Review Comment: Why aren't we just reusing `[api] secret_key`? ########## airflow-core/src/airflow/utils/hitl_shared_links.py: ########## @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Utilities for Human-in-the-Loop (HITL) shared links.""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +from datetime import datetime, timedelta +from typing import Any +from urllib.parse import urlencode + +import structlog + +from airflow.configuration import conf +from airflow.utils import timezone + +log = structlog.get_logger(__name__) + + +class HITLSharedLinkManager: + """Manager for HITL shared links with token generation and verification.""" + + def __init__(self): + self.secret_key = conf.get("api", "hitl_shared_link_secret_key", fallback="") + self.default_expiration_hours = conf.getint("api", "hitl_shared_link_expiration_hours", fallback=24) + + def is_enabled(self) -> bool: + """Check if HITL shared links are enabled.""" + return conf.getboolean("api", "hitl_enable_shared_links", fallback=False) + + def _generate_signature(self, payload: str) -> str: + """Generate HMAC signature for the payload.""" + if not self.secret_key: + raise ValueError("HITL shared link secret key is not configured") + + signature = hmac.new( + self.secret_key.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 + ).digest() + return base64.urlsafe_b64encode(signature).decode("utf-8") + + def _verify_signature(self, payload: str, signature: str) -> bool: + """Verify HMAC signature for the payload.""" + expected_signature = self._generate_signature(payload) + return hmac.compare_digest(expected_signature, signature) + + def generate_link( + self, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int | None = None, + link_type: str = "action", + action: str | None = None, + expires_in_hours: int | None = None, + base_url: str | None = None, + ) -> dict[str, Any]: + """ + Generate a shared link for HITL task. + + :param dag_id: DAG ID + :param dag_run_id: DAG run ID + :param task_id: Task ID + :param map_index: Map index for mapped tasks + :param link_type: Type of link ('action' or 'redirect') + :param action: Action to perform (for action links) + :param expires_in_hours: Custom expiration time in hours + :param base_url: Base URL for the link + """ + if not self.is_enabled(): + raise ValueError("HITL shared links are not enabled") + + if link_type == "action" and not action: + raise ValueError("Action is required for action-type links") + + expiration_hours = expires_in_hours or self.default_expiration_hours + expires_at = timezone.utcnow() + timedelta(hours=expiration_hours) + + payload_data = { + "dag_id": dag_id, + "dag_run_id": dag_run_id, + "task_id": task_id, + "map_index": map_index, + "link_type": link_type, + "action": action, + "expires_at": expires_at.isoformat(), + } + + payload_str = json.dumps(payload_data, sort_keys=True) + signature = self._generate_signature(payload_str) + + encoded_payload = base64.urlsafe_b64encode(payload_str.encode("utf-8")).decode("utf-8") + + if base_url is None: + base_url = conf.get("api", "base_url", fallback="http://localhost:8080") + + if map_index is not None: + url_path = f"/api/v2/hitl-details/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/{map_index}" Review Comment: Why are we doing double `/api/v2`? Also, can't we use `app.url_path_for` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
