Copilot commented on code in PR #64510:
URL: https://github.com/apache/airflow/pull/64510#discussion_r3025347161
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
Review Comment:
`upload()` uses `time.time()` to generate per-line timestamps. This is
sensitive to system clock adjustments and can lead to out-of-order log entries.
Use a stable time source (e.g., parse the structured log's timestamp field if
present, or use `time.time_ns()` / a consistent wall-clock timestamp captured
once per file) and avoid generating a new wall-clock timestamp per line when
the original event time is available.
##########
providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py:
##########
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import logging
+import json
+import uuid
+import datetime
+from logging import LogRecord
+from unittest.mock import MagicMock
+
+import pytest
+import requests_mock
+
+from airflow.providers.grafana.loki.log.loki_task_handler import
LokiRemoteLogIO, LokiTaskHandler
+
[email protected]
+def mock_runtime_ti():
+ ti = MagicMock()
+ ti.dag_id = "test_dag"
+ ti.task_id = "test_task"
+ ti.run_id = "test_run"
+ ti.try_number = 1
+ ti.map_index = -1
+ return ti
Review Comment:
Tests create `MagicMock()` without a `spec`/`autospec` for the runtime task
instance, which can mask attribute typos and diverge from the protocol expected
by the handler. Use a spec’d mock (or a small stub implementing the needed
attributes) so the test fails if the production code accesses missing/renamed
fields.
##########
pyproject.toml:
##########
@@ -168,7 +168,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-celery>=3.8.3"
]
"cloudant" = [
- "apache-airflow-providers-cloudant>=4.0.1"
+ "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\""
Review Comment:
The Cloudant extra is now conditioned on `python_version != "3.9"`, but the
Cloudant provider itself declares `requires-python >=3.10` (so 3.9 can’t
install it anyway). This marker is misleading/redundant and makes dependency
resolution harder to reason about. Either drop the marker, or (if there’s a
real incompatibility) use the version actually affected and align it with the
provider’s declared `requires-python`.
```suggestion
"apache-airflow-providers-cloudant>=4.0.1"
```
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
+ values.append([timestamp_ns, log_str])
+
+ # Estimate the byte size of this entry in the payload
+ payload_size += len(timestamp_ns) + len(log_str) + 10 # 10
bytes overhead per value
+
+ if payload_size >= MAX_PAYLOAD_SIZE:
+ if not _push_chunk():
+ has_error = True
+ values.clear()
+ payload_size = 0
+
+ except Exception:
+ pass
+
+ # Push any remaining logs
+ if values:
+ if not _push_chunk():
+ has_error = True
+
+ # Clean up local file just like ElasticsearchRemoteLogIO does if fully
successful
+ if self.delete_local_copy and not has_error:
+ try:
+ import shutil
+ shutil.rmtree(local_loc.parent, ignore_errors=True)
+ except Exception:
+ pass
+
+ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages]:
+ """Fetch logs from Loki using LogQL for streaming or retrieval."""
+ labels = _render_log_labels(ti)
+
+ # 1. Base stream selector (hits low-cardinality index)
+ stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in
labels.items()]) + "}"
+
+ # 2. Line filters (leveraging Loki Bloom filters)
+ run_id = getattr(ti, "run_id", "")
+ try_num = str(ti.try_number)
+ map_idx = str(getattr(ti, "map_index", -1))
+
+ # Utilizing Loki's `| json` parser and exact match filters for maximum
TSDB optimization
+ logQL = (
+ f"{stream_selector} "
+ f'| json '
+ f'| task_id="{ti.task_id}" '
+ f'| run_id="{run_id}" '
+ f'| try_number="{try_num}" '
+ f'| map_index="{map_idx}"'
+ )
+
+ # Query Loki API using configured reliable session
+ resp = self.session.get(f"{self.host}/loki/api/v1/query_range",
params={"query": logQL}, timeout=(3.0, 15.0))
+
+ message = []
+ if resp.ok:
+ data = resp.json().get("data", {}).get("result", [])
+ for stream in data:
+ for val in stream.get("values", []):
+ # parse the underlying JSON structured log we uploaded
+ log_entry = json.loads(val[1])
+ message.append(json.dumps(log_entry))
+
+ return ["loki-remote"], message
+
+
Review Comment:
`LokiRemoteLogIO.read()` returns `return ["loki-remote"], message` which
doesn’t match the declared return type `tuple[LogSourceInfo, LogMessages]` nor
the shape expected by `FileTaskHandler` (it typically expects a `LogSourceInfo`
object/tuple and `LogMessages` as a list of `StructuredLogMessage`/strings).
Also `_read_remote_logs()` passes `""` for the relative path and discards
`try_number`, so mapped tries/attempt selection may be wrong. Align the return
types and use the provided `try_number`/relative path contract consistent with
other RemoteLogIO implementations.
```suggestion
# Return structured LogSourceInfo and the list of log messages
return {"source": "loki-remote"}, message
```
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
+ values.append([timestamp_ns, log_str])
+
+ # Estimate the byte size of this entry in the payload
+ payload_size += len(timestamp_ns) + len(log_str) + 10 # 10
bytes overhead per value
+
+ if payload_size >= MAX_PAYLOAD_SIZE:
+ if not _push_chunk():
+ has_error = True
+ values.clear()
+ payload_size = 0
+
+ except Exception:
+ pass
+
Review Comment:
The `except Exception: pass` inside the log file loop silently drops JSON
parse errors and any other issues while reading/enriching logs. This can result
in missing logs with no visibility. At minimum, catch `json.JSONDecodeError`
separately and log at debug/warning with enough context (e.g., file name + line
number), and for unexpected exceptions log an exception once and mark
`has_error=True` so deletion doesn’t occur.
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
Review Comment:
The version checks import `AIRFLOW_V_3_0_PLUS`/`AIRFLOW_V_3_2_PLUS` from
`airflow.providers.elasticsearch.version_compat`. This creates an undeclared
runtime dependency on the Elasticsearch provider, and the `except ImportError`
fallback sets both flags to `True`, which will incorrectly force Airflow 3.x
code paths when running on Airflow 2.x (likely breaking `_read`/remote log
registration). Copy a `version_compat.py` into this provider (as suggested in
`providers/elasticsearch/version_compat.py`) and compute the flags from
`airflow.__version__` instead of defaulting to `True`.
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
+ values.append([timestamp_ns, log_str])
+
+ # Estimate the byte size of this entry in the payload
+ payload_size += len(timestamp_ns) + len(log_str) + 10 # 10
bytes overhead per value
+
+ if payload_size >= MAX_PAYLOAD_SIZE:
+ if not _push_chunk():
+ has_error = True
+ values.clear()
+ payload_size = 0
+
+ except Exception:
+ pass
+
+ # Push any remaining logs
+ if values:
+ if not _push_chunk():
+ has_error = True
+
+ # Clean up local file just like ElasticsearchRemoteLogIO does if fully
successful
+ if self.delete_local_copy and not has_error:
+ try:
+ import shutil
+ shutil.rmtree(local_loc.parent, ignore_errors=True)
+ except Exception:
+ pass
Review Comment:
There’s an `import shutil` inside the method body and a broad `except
Exception: pass` around cleanup. Airflow’s style generally keeps imports at
module scope; if this isn’t for circular/lazy-load reasons, move the import to
the top. Also consider logging cleanup failures at least at debug level so
operators can diagnose why local logs weren’t deleted.
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
Review Comment:
Several imports appear unused in this file (e.g., `logging`, `sys`,
`pendulum`, and `StructuredLogMessage` isn’t referenced). Please remove unused
imports to keep the module minimal and avoid unnecessary dependencies/import
time in logging code paths.
```suggestion
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any
import attrs
import requests
from requests.adapters import HTTPAdapter
# Attempt to load standard log structures according to Airflow 3 requirements
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import ExternalLoggingMixin,
LoggingMixin
```
##########
pyproject.toml:
##########
@@ -108,7 +108,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-amazon>=9.0.0"
]
"apache.cassandra" = [
- "apache-airflow-providers-apache-cassandra>=3.7.0; python_version
!=\"3.14\""
+ "apache-airflow-providers-apache-cassandra>=3.7.0"
Review Comment:
The `apache.cassandra` extra dropped the `python_version != "3.14"` marker.
If this was intentional, it should be justified by the provider’s own
`requires-python` (and/or lock/test matrix). Otherwise, users on 3.14 may now
attempt to install the extra and hit resolver failures. Consider keeping the
marker at the top-level extras if Airflow supports 3.14 but the provider does
not.
```suggestion
"apache-airflow-providers-apache-cassandra>=3.7.0; python_version !=
'3.14'"
```
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
+ values.append([timestamp_ns, log_str])
+
+ # Estimate the byte size of this entry in the payload
+ payload_size += len(timestamp_ns) + len(log_str) + 10 # 10
bytes overhead per value
+
+ if payload_size >= MAX_PAYLOAD_SIZE:
+ if not _push_chunk():
+ has_error = True
+ values.clear()
+ payload_size = 0
+
+ except Exception:
+ pass
+
+ # Push any remaining logs
+ if values:
+ if not _push_chunk():
+ has_error = True
+
+ # Clean up local file just like ElasticsearchRemoteLogIO does if fully
successful
+ if self.delete_local_copy and not has_error:
+ try:
+ import shutil
+ shutil.rmtree(local_loc.parent, ignore_errors=True)
+ except Exception:
+ pass
+
+ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages]:
+ """Fetch logs from Loki using LogQL for streaming or retrieval."""
+ labels = _render_log_labels(ti)
+
+ # 1. Base stream selector (hits low-cardinality index)
+ stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in
labels.items()]) + "}"
+
+ # 2. Line filters (leveraging Loki Bloom filters)
+ run_id = getattr(ti, "run_id", "")
+ try_num = str(ti.try_number)
+ map_idx = str(getattr(ti, "map_index", -1))
+
+ # Utilizing Loki's `| json` parser and exact match filters for maximum
TSDB optimization
+ logQL = (
+ f"{stream_selector} "
+ f'| json '
+ f'| task_id="{ti.task_id}" '
+ f'| run_id="{run_id}" '
+ f'| try_number="{try_num}" '
+ f'| map_index="{map_idx}"'
+ )
+
+ # Query Loki API using configured reliable session
+ resp = self.session.get(f"{self.host}/loki/api/v1/query_range",
params={"query": logQL}, timeout=(3.0, 15.0))
+
+ message = []
+ if resp.ok:
+ data = resp.json().get("data", {}).get("result", [])
+ for stream in data:
+ for val in stream.get("values", []):
+ # parse the underlying JSON structured log we uploaded
+ log_entry = json.loads(val[1])
+ message.append(json.dumps(log_entry))
+
+ return ["loki-remote"], message
+
+
+class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
+ """
+ The main logging handler injected into Airflow configuration.
+ """
+ LOG_NAME = "Loki"
+
+ @property
+ def log_name(self) -> str:
+ return self.LOG_NAME
+
+ def __init__(self, base_log_folder: str, host: str, frontend: str = "",
**kwargs):
+ super().__init__(base_log_folder=base_log_folder, **kwargs)
+ self.host = host
+ self.frontend = frontend
+ self.io = LokiRemoteLogIO(
+ host=self.host,
+ base_log_folder=base_log_folder,
+ delete_local_copy=kwargs.get("delete_local_copy", False),
+ )
+
+ # Register Remote Log IO globally for Airflow 3 Task Supervisor
+ if AIRFLOW_V_3_0_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
+ try:
+ from airflow.logging_config import _ActiveLoggingConfig
+ try:
+ from airflow.logging_config import get_remote_task_log
+ if callable(get_remote_task_log) and
get_remote_task_log() is None:
+ _ActiveLoggingConfig.set(self.io, None)
+ except ImportError:
+ pass
+ except ImportError:
+ pass
+ else:
+ try:
+ import airflow.logging_config as alc
+ if getattr(alc, "REMOTE_TASK_LOG", None) is None:
+ alc.REMOTE_TASK_LOG = self.io
+ except ImportError:
+ pass
+
+ def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata:
dict | None = None) -> tuple[list[str], list[str]]:
+ """
+ Called by Airflow 3.x FileTaskHandler._read to fetch remote logs.
+ Airflow 3 native FileTaskHandler manages interleaving these with
locally streaming worker logs.
+ """
+ return self.io.read("", ti)
+
+ def _read(
+ self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None
= None
+ ) -> tuple[list[Any] | str, dict[str, Any]]:
+ """
+ Implementation of the log read handler invoked by the Web UI.
+ In Airflow 3+, we defer to the super() class so it can serve logs from
the active worker
+ and intelligently interleave them with `_read_remote_logs`.
+ """
+ if AIRFLOW_V_3_0_PLUS:
+ return super()._read(ti, try_number, metadata)
+
+ # Fallback for Airflow 2.x
+ metadata = metadata or {"offset": 0}
+ headers, messages = self.io.read("", ti)
+
+ # Build raw messages (no StructuredLogMessage required in Airflow 2)
+ log_str = "\n".join(messages)
+ metadata["end_of_log"] = True
+
+ return [log_str], metadata
+
+ @property
+ def supports_external_link(self) -> bool:
+ """Let Airflow API Server know if we can return a link to Grafana."""
+ return bool(self.frontend)
+
+ def get_external_log_url(self, task_instance: TaskInstance, try_number:
int) -> str:
+ """
+ Used by `airflow-api-server` when users request the external log URL.
+ Constructs a direct link to Grafana Explorer view for these logs.
+ """
+ if not self.frontend:
+ return ""
+
+ import urllib.parse
+
Review Comment:
`get_external_log_url()` does `import urllib.parse` inside the method.
Unless this is needed to avoid a circular import or for lazy-loading, move it
to the module imports to match the repository’s import style and reduce
per-call overhead.
##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+ StructuredLogMessage = dict # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+ from airflow.providers.elasticsearch.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+ AIRFLOW_V_3_0_PLUS = True
+ AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+ """
+ Helper to extract low-cardinality labels for Loki streams.
+ High-cardinality fields (like task_id, run_id) are omitted here
+ to prevent stream explosion and will be indexed via Bloom filters instead.
+ """
+ return {
+ "job": "airflow_tasks",
+ "dag_id": ti.dag_id,
+ }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+ """
+ Handles the actual communication with Loki API.
+ Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+ """
+ host: str = "http://localhost:3100"
+ base_log_folder: Path = attrs.field(converter=Path)
+ delete_local_copy: bool = False
+ processors: list = attrs.field(factory=list)
+
+ @property
+ def session(self) -> requests.Session:
+ if not hasattr(self, "_session"):
+ self._session = requests.Session()
+ # Implementing Retries, Jitter, and Exponential Backoff via
urllib3's Retry
+ retries = Retry(
+ total=5,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["GET", "POST"]
+ )
+ # Efficient scaling with TCP connection pooling
+ adapter = HTTPAdapter(max_retries=retries, pool_connections=20,
pool_maxsize=100)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ return self._session
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Called by Airflow Task Supervisor after task finishes (or during)
to push logs."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+
+ if not local_loc.is_file():
+ return
+
+ labels = _render_log_labels(ti)
+ values = []
+ payload_size = 0
+ MAX_PAYLOAD_SIZE = 1048576 # 1 MiB chunking as per Promtail limits
+
+ def _push_chunk():
+ if not values:
+ return True
+ payload = {
+ "streams": [
+ {
+ "stream": labels,
+ "values": values
+ }
+ ]
+ }
+ try:
+ resp = self.session.post(f"{self.host}/loki/api/v1/push",
json=payload, timeout=(3.0, 15.0))
+ resp.raise_for_status()
+ return True
+ except Exception as e:
+ self.log.exception("Failed to upload chunk of logs to Loki:
%s", e)
+ return False
+
+ has_error = False
+
+ with open(local_loc, "r") as f:
+ for line in f:
+ if not line.strip():
+ continue
+
+ try:
+ # Log line content from Task Supervisor
+ log_data = json.loads(line)
+
+ # Inject high-cardinality contextual fields into the JSON
payload.
+ log_data["task_id"] = ti.task_id
+ log_data["run_id"] = getattr(ti, "run_id", "")
+ log_data["try_number"] = str(ti.try_number)
+ log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+ # Loki expects Timestamp in nanoseconds as string
+ timestamp_ns = str(int(time.time() * 1e9))
+ log_str = json.dumps(log_data)
+ values.append([timestamp_ns, log_str])
+
+ # Estimate the byte size of this entry in the payload
+ payload_size += len(timestamp_ns) + len(log_str) + 10 # 10
bytes overhead per value
+
+ if payload_size >= MAX_PAYLOAD_SIZE:
+ if not _push_chunk():
+ has_error = True
+ values.clear()
+ payload_size = 0
+
+ except Exception:
+ pass
+
+ # Push any remaining logs
+ if values:
+ if not _push_chunk():
+ has_error = True
+
+ # Clean up local file just like ElasticsearchRemoteLogIO does if fully
successful
+ if self.delete_local_copy and not has_error:
+ try:
+ import shutil
+ shutil.rmtree(local_loc.parent, ignore_errors=True)
+ except Exception:
+ pass
+
+ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages]:
+ """Fetch logs from Loki using LogQL for streaming or retrieval."""
+ labels = _render_log_labels(ti)
+
+ # 1. Base stream selector (hits low-cardinality index)
+ stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in
labels.items()]) + "}"
+
+ # 2. Line filters (leveraging Loki Bloom filters)
+ run_id = getattr(ti, "run_id", "")
+ try_num = str(ti.try_number)
+ map_idx = str(getattr(ti, "map_index", -1))
+
+ # Utilizing Loki's `| json` parser and exact match filters for maximum
TSDB optimization
+ logQL = (
+ f"{stream_selector} "
+ f'| json '
+ f'| task_id="{ti.task_id}" '
+ f'| run_id="{run_id}" '
+ f'| try_number="{try_num}" '
+ f'| map_index="{map_idx}"'
+ )
+
+ # Query Loki API using configured reliable session
+ resp = self.session.get(f"{self.host}/loki/api/v1/query_range",
params={"query": logQL}, timeout=(3.0, 15.0))
+
+ message = []
+ if resp.ok:
+ data = resp.json().get("data", {}).get("result", [])
+ for stream in data:
+ for val in stream.get("values", []):
+ # parse the underlying JSON structured log we uploaded
+ log_entry = json.loads(val[1])
+ message.append(json.dumps(log_entry))
+
+ return ["loki-remote"], message
+
+
+class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
+ """
+ The main logging handler injected into Airflow configuration.
+ """
+ LOG_NAME = "Loki"
+
+ @property
+ def log_name(self) -> str:
+ return self.LOG_NAME
+
+ def __init__(self, base_log_folder: str, host: str, frontend: str = "",
**kwargs):
+ super().__init__(base_log_folder=base_log_folder, **kwargs)
+ self.host = host
+ self.frontend = frontend
+ self.io = LokiRemoteLogIO(
+ host=self.host,
+ base_log_folder=base_log_folder,
+ delete_local_copy=kwargs.get("delete_local_copy", False),
+ )
+
+ # Register Remote Log IO globally for Airflow 3 Task Supervisor
+ if AIRFLOW_V_3_0_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
+ try:
+ from airflow.logging_config import _ActiveLoggingConfig
+ try:
+ from airflow.logging_config import get_remote_task_log
+ if callable(get_remote_task_log) and
get_remote_task_log() is None:
+ _ActiveLoggingConfig.set(self.io, None)
+ except ImportError:
+ pass
+ except ImportError:
+ pass
+ else:
+ try:
+ import airflow.logging_config as alc
+ if getattr(alc, "REMOTE_TASK_LOG", None) is None:
+ alc.REMOTE_TASK_LOG = self.io
+ except ImportError:
+ pass
+
+ def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata:
dict | None = None) -> tuple[list[str], list[str]]:
+ """
+ Called by Airflow 3.x FileTaskHandler._read to fetch remote logs.
+ Airflow 3 native FileTaskHandler manages interleaving these with
locally streaming worker logs.
+ """
+ return self.io.read("", ti)
+
Review Comment:
`LokiTaskHandler._read_remote_logs()` ignores the `try_number` argument and
passes an empty relative path to `self.io.read`. This can return logs for the
wrong attempt and breaks the standard remote logging contract where reads are
scoped to the rendered log path/try number. Use the `try_number` (and ideally
the rendered relative path from `FileTaskHandler`) to query Loki for the
correct attempt.
##########
providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py:
##########
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import logging
+import json
+import uuid
+import datetime
+from logging import LogRecord
+from unittest.mock import MagicMock
+
+import pytest
+import requests_mock
+
+from airflow.providers.grafana.loki.log.loki_task_handler import
LokiRemoteLogIO, LokiTaskHandler
Review Comment:
Several imports in this test file appear unused (`logging`, `uuid`,
`datetime`, `LogRecord`, and `LokiTaskHandler`). Please remove unused imports
to keep the unit test focused and avoid lint failures.
```suggestion
import json
from unittest.mock import MagicMock
import pytest
import requests_mock
from airflow.providers.grafana.loki.log.loki_task_handler import
LokiRemoteLogIO
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]