jason810496 commented on code in PR #49475:
URL: https://github.com/apache/airflow/pull/49475#discussion_r2052383815
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py:
##########
@@ -397,3 +397,60 @@ def
test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self):
)
assert response.status_code == 404
assert response.json()["detail"] == "TaskInstance not found"
+
+ @pytest.mark.parametrize(
+ "supports_external_link, task_id, expected_status, expected_response,
mock_external_url",
+ [
+ (
+ True,
+ "task_for_testing_log_endpoint",
+ 200,
+ {"url": "https://external-logs.example.com/log/123"},
+ True,
+ ),
+ (
+ False,
+ "task_for_testing_log_endpoint",
+ 400,
+ {"detail": "Task log handler does not support external logs."},
+ False,
+ ),
+ (True, "INVALID_TASK", 404, {"detail": "TaskInstance not found"},
False),
+ ],
+ ids=[
+ "external_links_supported_task_exists",
+ "external_links_not_supported",
+ "external_links_supported_task_not_found",
+ ],
+ )
+ def test_get_external_log_url(
+ self, supports_external_link, task_id, expected_status,
expected_response, mock_external_url
+ ):
+ with mock.patch(
+
"airflow.utils.log.log_reader.TaskLogReader.supports_external_link",
+ new_callable=mock.PropertyMock,
+ return_value=supports_external_link,
+ ):
+ if mock_external_url:
+ with
mock.patch("airflow.utils.log.log_reader.TaskLogReader.log_handler") as
mock_log_handler:
+ mock_log_handler.get_external_log_url.return_value = (
+ "https://external-logs.example.com/log/123"
+ )
+
+ response = self.client.get(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{task_id}/externalLogUrl/{self.TRY_NUMBER}",
+ headers={"Accept": "application/json"},
+ )
+
+ assert response.status_code == expected_status
+ assert response.json() == expected_response
+
+ mock_log_handler.get_external_log_url.assert_called_once()
if expected_status == 200 else
mock_log_handler.get_external_log_url.assert_not_called()
+ else:
+ response = self.client.get(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{task_id}/externalLogUrl/{self.TRY_NUMBER}",
+ headers={"Accept": "application/json"},
+ )
+
+ assert response.status_code == expected_status
+ assert response.json() == expected_response
Review Comment:
Small nit for if / else block.
```suggestion
response = self.client.get(
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{task_id}/externalLogUrl/{self.TRY_NUMBER}"
)
assert response.status_code == expected_status
assert response.json() == expected_response
if mock_external_url and expected_status == 200:
mock_log_handler.get_external_log_url.assert_called_once()
elif mock_external_url:
mock_log_handler.get_external_log_url.assert_not_called()
```
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -151,3 +151,48 @@ def get_log(
"Airflow-Continuation-Token":
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
}
return Response(media_type="application/x-ndjson", content=logs,
headers=headers)
+
+
+@task_instances_log_router.get(
+ "/{task_id}/externalLogUrl/{try_number}",
+ responses={
+ **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ status.HTTP_200_OK: {
+ "description": "External log URL",
+ "content": {
+ "application/json": {"schema": {"type": "object",
"properties": {"url": {"type": "string"}}}}
+ },
+ },
+ },
+ dependencies=[Depends(requires_access_dag("GET",
DagAccessEntity.TASK_INSTANCE))],
+ response_model=ExternalLogUrlResponse,
+)
+def get_external_log_url(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ try_number: PositiveInt,
+ session: SessionDep,
+ map_index: int = -1,
+):
Review Comment:
Nit: The `get_log` endpoint that you take reference is a special one ( it
will return `application/json` and `application/x-ndjson` based on accept
header ). For most of the routes, we tend to add annotation instead of
`response_model`.
```suggestion
@task_instances_log_router.get(
"/{task_id}/externalLogUrl/{try_number}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_dag("GET",
DagAccessEntity.TASK_INSTANCE))],
)
def get_external_log_url(
dag_id: str,
dag_run_id: str,
task_id: str,
try_number: PositiveInt,
session: SessionDep,
map_index: int = -1,
) -> ExternalLogUrlResponse:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]