Copilot commented on code in PR #64510:
URL: https://github.com/apache/airflow/pull/64510#discussion_r3025347161


##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)

Review Comment:
   `upload()` uses `time.time()` to generate per-line timestamps. This is 
sensitive to system clock adjustments and can lead to out-of-order log entries. 
Use a stable time source (e.g., parse the structured log's timestamp field if 
present, or use `time.time_ns()` / a consistent wall-clock timestamp captured 
once per file) and avoid generating a new wall-clock timestamp per line when 
the original event time is available.



##########
providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py:
##########
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import logging
+import json
+import uuid
+import datetime
+from logging import LogRecord
+from unittest.mock import MagicMock
+
+import pytest
+import requests_mock
+
+from airflow.providers.grafana.loki.log.loki_task_handler import 
LokiRemoteLogIO, LokiTaskHandler
+
[email protected]
+def mock_runtime_ti():
+    ti = MagicMock()
+    ti.dag_id = "test_dag"
+    ti.task_id = "test_task"
+    ti.run_id = "test_run"
+    ti.try_number = 1
+    ti.map_index = -1
+    return ti

Review Comment:
   Tests create `MagicMock()` without a `spec`/`autospec` for the runtime task 
instance, which can mask attribute typos and diverge from the protocol expected 
by the handler. Use a spec’d mock (or a small stub implementing the needed 
attributes) so the test fails if the production code accesses missing/renamed 
fields.



##########
pyproject.toml:
##########
@@ -168,7 +168,7 @@ apache-airflow = "airflow.__main__:main"
     "apache-airflow-providers-celery>=3.8.3"
 ]
 "cloudant" = [
-    "apache-airflow-providers-cloudant>=4.0.1"
+    "apache-airflow-providers-cloudant>=4.0.1; python_version !=\"3.9\""

Review Comment:
   The Cloudant extra is now conditioned on `python_version != "3.9"`, but the 
Cloudant provider itself declares `requires-python >=3.10` (so 3.9 can’t 
install it anyway). This marker is misleading/redundant and makes dependency 
resolution harder to reason about. Either drop the marker, or (if there’s a 
real incompatibility) use the version actually affected and align it with the 
provider’s declared `requires-python`.
   ```suggestion
       "apache-airflow-providers-cloudant>=4.0.1"
   ```



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)
+                    values.append([timestamp_ns, log_str])
+                    
+                    # Estimate the byte size of this entry in the payload
+                    payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 
bytes overhead per value
+
+                    if payload_size >= MAX_PAYLOAD_SIZE:
+                        if not _push_chunk():
+                            has_error = True
+                        values.clear()
+                        payload_size = 0
+
+                except Exception:
+                    pass
+
+        # Push any remaining logs
+        if values:
+            if not _push_chunk():
+                has_error = True
+
+        # Clean up local file just like ElasticsearchRemoteLogIO does if fully 
successful
+        if self.delete_local_copy and not has_error:
+            try:
+                import shutil
+                shutil.rmtree(local_loc.parent, ignore_errors=True)
+            except Exception:
+                pass
+
+    def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        """Fetch logs from Loki using LogQL for streaming or retrieval."""
+        labels = _render_log_labels(ti)
+        
+        # 1. Base stream selector (hits low-cardinality index)
+        stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in 
labels.items()]) + "}"
+        
+        # 2. Line filters (leveraging Loki Bloom filters)
+        run_id = getattr(ti, "run_id", "")
+        try_num = str(ti.try_number)
+        map_idx = str(getattr(ti, "map_index", -1))
+        
+        # Utilizing Loki's `| json` parser and exact match filters for maximum 
TSDB optimization
+        logQL = (
+            f"{stream_selector} "
+            f'| json '
+            f'| task_id="{ti.task_id}" '
+            f'| run_id="{run_id}" '
+            f'| try_number="{try_num}" '
+            f'| map_index="{map_idx}"'
+        )
+        
+        # Query Loki API using configured reliable session
+        resp = self.session.get(f"{self.host}/loki/api/v1/query_range", 
params={"query": logQL}, timeout=(3.0, 15.0))
+        
+        message = []
+        if resp.ok:
+            data = resp.json().get("data", {}).get("result", [])
+            for stream in data:
+                for val in stream.get("values", []):
+                    # parse the underlying JSON structured log we uploaded
+                    log_entry = json.loads(val[1])
+                    message.append(json.dumps(log_entry))
+        
+        return ["loki-remote"], message
+
+

Review Comment:
   `LokiRemoteLogIO.read()` returns `return ["loki-remote"], message` which 
doesn’t match the declared return type `tuple[LogSourceInfo, LogMessages]` nor 
the shape expected by `FileTaskHandler` (it typically expects a `LogSourceInfo` 
object/tuple and `LogMessages` as a list of `StructuredLogMessage`/strings). 
Also `_read_remote_logs()` passes `""` for the relative path and discards 
`try_number`, so mapped tries/attempt selection may be wrong. Align the return 
types and use the provided `try_number`/relative path contract consistent with 
other RemoteLogIO implementations.
   ```suggestion
   
           # Return structured LogSourceInfo and the list of log messages
           return {"source": "loki-remote"}, message
   ```



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)
+                    values.append([timestamp_ns, log_str])
+                    
+                    # Estimate the byte size of this entry in the payload
+                    payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 
bytes overhead per value
+
+                    if payload_size >= MAX_PAYLOAD_SIZE:
+                        if not _push_chunk():
+                            has_error = True
+                        values.clear()
+                        payload_size = 0
+
+                except Exception:
+                    pass
+

Review Comment:
   The `except Exception: pass` inside the log file loop silently drops JSON 
parse errors and any other issues while reading/enriching logs. This can result 
in missing logs with no visibility. At minimum, catch `json.JSONDecodeError` 
separately and log at debug/warning with enough context (e.g., file name + line 
number), and for unexpected exceptions log an exception once and mark 
`has_error=True` so deletion doesn’t occur.



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+

Review Comment:
   The version checks import `AIRFLOW_V_3_0_PLUS`/`AIRFLOW_V_3_2_PLUS` from 
`airflow.providers.elasticsearch.version_compat`. This creates an undeclared 
runtime dependency on the Elasticsearch provider, and the `except ImportError` 
fallback sets both flags to `True`, which will incorrectly force Airflow 3.x 
code paths when running on Airflow 2.x (likely breaking `_read`/remote log 
registration). Copy a `version_compat.py` into this provider (as suggested in 
`providers/elasticsearch/version_compat.py`) and compute the flags from 
`airflow.__version__` instead of defaulting to `True`.



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)
+                    values.append([timestamp_ns, log_str])
+                    
+                    # Estimate the byte size of this entry in the payload
+                    payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 
bytes overhead per value
+
+                    if payload_size >= MAX_PAYLOAD_SIZE:
+                        if not _push_chunk():
+                            has_error = True
+                        values.clear()
+                        payload_size = 0
+
+                except Exception:
+                    pass
+
+        # Push any remaining logs
+        if values:
+            if not _push_chunk():
+                has_error = True
+
+        # Clean up local file just like ElasticsearchRemoteLogIO does if fully 
successful
+        if self.delete_local_copy and not has_error:
+            try:
+                import shutil
+                shutil.rmtree(local_loc.parent, ignore_errors=True)
+            except Exception:
+                pass

Review Comment:
   There’s an `import shutil` inside the method body and a broad `except 
Exception: pass` around cleanup. Airflow’s style generally keeps imports at 
module scope; if this isn’t for circular/lazy-load reasons, move the import to 
the top. Also consider logging cleanup failures at least at debug level so 
operators can diagnose why local logs weren’t deleted.



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+

Review Comment:
   Several imports appear unused in this file (e.g., `logging`, `sys`, 
`pendulum`, and `StructuredLogMessage` isn’t referenced). Please remove unused 
imports to keep the module minimal and avoid unnecessary dependencies/import 
time in logging code paths.
   ```suggestion
   import os
   import time
   from pathlib import Path
   from typing import TYPE_CHECKING, Any
   
   import attrs
   import requests
   from requests.adapters import HTTPAdapter
   
   # Attempt to load standard log structures according to Airflow 3 requirements
   from airflow.utils.log.file_task_handler import FileTaskHandler 
   from airflow.utils.log.logging_mixin import ExternalLoggingMixin, 
LoggingMixin
   ```



##########
pyproject.toml:
##########
@@ -108,7 +108,7 @@ apache-airflow = "airflow.__main__:main"
     "apache-airflow-providers-amazon>=9.0.0"
 ]
 "apache.cassandra" = [
-    "apache-airflow-providers-apache-cassandra>=3.7.0; python_version 
!=\"3.14\""
+    "apache-airflow-providers-apache-cassandra>=3.7.0"

Review Comment:
   The `apache.cassandra` extra dropped the `python_version != "3.14"` marker. 
If this was intentional, it should be justified by the provider’s own 
`requires-python` (and/or lock/test matrix). Otherwise, users on 3.14 may now 
attempt to install the extra and hit resolver failures. Consider keeping the 
marker at the top-level extras if Airflow supports 3.14 but the provider does 
not.
   ```suggestion
       "apache-airflow-providers-apache-cassandra>=3.7.0; python_version != 
'3.14'"
   ```



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)
+                    values.append([timestamp_ns, log_str])
+                    
+                    # Estimate the byte size of this entry in the payload
+                    payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 
bytes overhead per value
+
+                    if payload_size >= MAX_PAYLOAD_SIZE:
+                        if not _push_chunk():
+                            has_error = True
+                        values.clear()
+                        payload_size = 0
+
+                except Exception:
+                    pass
+
+        # Push any remaining logs
+        if values:
+            if not _push_chunk():
+                has_error = True
+
+        # Clean up local file just like ElasticsearchRemoteLogIO does if fully 
successful
+        if self.delete_local_copy and not has_error:
+            try:
+                import shutil
+                shutil.rmtree(local_loc.parent, ignore_errors=True)
+            except Exception:
+                pass
+
+    def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        """Fetch logs from Loki using LogQL for streaming or retrieval."""
+        labels = _render_log_labels(ti)
+        
+        # 1. Base stream selector (hits low-cardinality index)
+        stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in 
labels.items()]) + "}"
+        
+        # 2. Line filters (leveraging Loki Bloom filters)
+        run_id = getattr(ti, "run_id", "")
+        try_num = str(ti.try_number)
+        map_idx = str(getattr(ti, "map_index", -1))
+        
+        # Utilizing Loki's `| json` parser and exact match filters for maximum 
TSDB optimization
+        logQL = (
+            f"{stream_selector} "
+            f'| json '
+            f'| task_id="{ti.task_id}" '
+            f'| run_id="{run_id}" '
+            f'| try_number="{try_num}" '
+            f'| map_index="{map_idx}"'
+        )
+        
+        # Query Loki API using configured reliable session
+        resp = self.session.get(f"{self.host}/loki/api/v1/query_range", 
params={"query": logQL}, timeout=(3.0, 15.0))
+        
+        message = []
+        if resp.ok:
+            data = resp.json().get("data", {}).get("result", [])
+            for stream in data:
+                for val in stream.get("values", []):
+                    # parse the underlying JSON structured log we uploaded
+                    log_entry = json.loads(val[1])
+                    message.append(json.dumps(log_entry))
+        
+        return ["loki-remote"], message
+
+
+class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
+    """
+    The main logging handler injected into Airflow configuration.
+    """
+    LOG_NAME = "Loki"
+
+    @property
+    def log_name(self) -> str:
+        return self.LOG_NAME
+
+    def __init__(self, base_log_folder: str, host: str, frontend: str = "", 
**kwargs):
+        super().__init__(base_log_folder=base_log_folder, **kwargs)
+        self.host = host
+        self.frontend = frontend
+        self.io = LokiRemoteLogIO(
+            host=self.host,
+            base_log_folder=base_log_folder,
+            delete_local_copy=kwargs.get("delete_local_copy", False),
+        )
+        
+        # Register Remote Log IO globally for Airflow 3 Task Supervisor
+        if AIRFLOW_V_3_0_PLUS:
+            if AIRFLOW_V_3_2_PLUS:
+                try:
+                    from airflow.logging_config import _ActiveLoggingConfig
+                    try:
+                        from airflow.logging_config import get_remote_task_log
+                        if callable(get_remote_task_log) and 
get_remote_task_log() is None:
+                            _ActiveLoggingConfig.set(self.io, None)
+                    except ImportError:
+                        pass
+                except ImportError:
+                    pass
+            else:
+                try:
+                    import airflow.logging_config as alc
+                    if getattr(alc, "REMOTE_TASK_LOG", None) is None:
+                        alc.REMOTE_TASK_LOG = self.io
+                except ImportError:
+                    pass
+                    
+    def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata: 
dict | None = None) -> tuple[list[str], list[str]]:
+        """
+        Called by Airflow 3.x FileTaskHandler._read to fetch remote logs.
+        Airflow 3 native FileTaskHandler manages interleaving these with 
locally streaming worker logs.
+        """
+        return self.io.read("", ti)
+
+    def _read(
+        self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None 
= None
+    ) -> tuple[list[Any] | str, dict[str, Any]]:
+        """
+        Implementation of the log read handler invoked by the Web UI.
+        In Airflow 3+, we defer to the super() class so it can serve logs from 
the active worker
+        and intelligently interleave them with `_read_remote_logs`.
+        """
+        if AIRFLOW_V_3_0_PLUS:
+            return super()._read(ti, try_number, metadata)
+
+        # Fallback for Airflow 2.x
+        metadata = metadata or {"offset": 0}
+        headers, messages = self.io.read("", ti)
+        
+        # Build raw messages (no StructuredLogMessage required in Airflow 2)
+        log_str = "\n".join(messages)
+        metadata["end_of_log"] = True 
+        
+        return [log_str], metadata
+
+    @property
+    def supports_external_link(self) -> bool:
+        """Let Airflow API Server know if we can return a link to Grafana."""
+        return bool(self.frontend)
+
+    def get_external_log_url(self, task_instance: TaskInstance, try_number: 
int) -> str:
+        """
+        Used by `airflow-api-server` when users request the external log URL.
+        Constructs a direct link to Grafana Explorer view for these logs.
+        """
+        if not self.frontend:
+            return ""
+            
+        import urllib.parse
+        

Review Comment:
   `get_external_log_url()` does `import urllib.parse` inside the method. 
Unless this is needed to avoid a circular import or for lazy-loading, move it 
to the module imports to match the repository’s import style and reduce 
per-call overhead.



##########
providers/grafana/src/airflow/providers/grafana/loki/log/loki_task_handler.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import pendulum
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+# Attempt to load standard log structures according to Airflow 3 requirements
+from airflow.utils.log.file_task_handler import FileTaskHandler 
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+
+# Try mapping for StructuredLogMessage available in 3.x
+try:
+    from airflow.utils.log.file_task_handler import StructuredLogMessage
+except ImportError:
+    StructuredLogMessage = dict  # Fallback for compilation matching
+
+# Try loading version compat constants
+try:
+    from airflow.providers.elasticsearch.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
+except ImportError:
+    AIRFLOW_V_3_0_PLUS = True
+    AIRFLOW_V_3_2_PLUS = True
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
+
+
+def _render_log_labels(ti) -> dict[str, str]:
+    """
+    Helper to extract low-cardinality labels for Loki streams.
+    High-cardinality fields (like task_id, run_id) are omitted here
+    to prevent stream explosion and will be indexed via Bloom filters instead.
+    """
+    return {
+        "job": "airflow_tasks",
+        "dag_id": ti.dag_id,
+    }
+
[email protected](kw_only=True)
+class LokiRemoteLogIO(LoggingMixin):
+    """
+    Handles the actual communication with Loki API.
+    Used by Task Supervisor to bulk-upload logs and by UI to read remote logs.
+    """
+    host: str = "http://localhost:3100";
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = False
+    processors: list = attrs.field(factory=list)
+    
+    @property
+    def session(self) -> requests.Session:
+        if not hasattr(self, "_session"):
+            self._session = requests.Session()
+            # Implementing Retries, Jitter, and Exponential Backoff via 
urllib3's Retry
+            retries = Retry(
+                total=5,
+                backoff_factor=1,
+                status_forcelist=[429, 500, 502, 503, 504],
+                allowed_methods=["GET", "POST"]
+            )
+            # Efficient scaling with TCP connection pooling 
+            adapter = HTTPAdapter(max_retries=retries, pool_connections=20, 
pool_maxsize=100)
+            self._session.mount("http://";, adapter)
+            self._session.mount("https://";, adapter)
+        return self._session
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Called by Airflow Task Supervisor after task finishes (or during) 
to push logs."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+
+        if not local_loc.is_file():
+            return
+
+        labels = _render_log_labels(ti)
+        values = []
+        payload_size = 0
+        MAX_PAYLOAD_SIZE = 1048576  # 1 MiB chunking as per Promtail limits
+
+        def _push_chunk():
+            if not values:
+                return True
+            payload = {
+                "streams": [
+                    {
+                        "stream": labels,
+                        "values": values
+                    }
+                ]
+            }
+            try:
+                resp = self.session.post(f"{self.host}/loki/api/v1/push", 
json=payload, timeout=(3.0, 15.0))
+                resp.raise_for_status()
+                return True
+            except Exception as e:
+                self.log.exception("Failed to upload chunk of logs to Loki: 
%s", e)
+                return False
+
+        has_error = False
+
+        with open(local_loc, "r") as f:
+            for line in f:
+                if not line.strip():
+                    continue
+
+                try:
+                    # Log line content from Task Supervisor
+                    log_data = json.loads(line)
+                    
+                    # Inject high-cardinality contextual fields into the JSON 
payload.
+                    log_data["task_id"] = ti.task_id
+                    log_data["run_id"] = getattr(ti, "run_id", "")
+                    log_data["try_number"] = str(ti.try_number)
+                    log_data["map_index"] = str(getattr(ti, "map_index", -1))
+
+                    # Loki expects Timestamp in nanoseconds as string
+                    timestamp_ns = str(int(time.time() * 1e9)) 
+                    log_str = json.dumps(log_data)
+                    values.append([timestamp_ns, log_str])
+                    
+                    # Estimate the byte size of this entry in the payload
+                    payload_size += len(timestamp_ns) + len(log_str) + 10 # 10 
bytes overhead per value
+
+                    if payload_size >= MAX_PAYLOAD_SIZE:
+                        if not _push_chunk():
+                            has_error = True
+                        values.clear()
+                        payload_size = 0
+
+                except Exception:
+                    pass
+
+        # Push any remaining logs
+        if values:
+            if not _push_chunk():
+                has_error = True
+
+        # Clean up local file just like ElasticsearchRemoteLogIO does if fully 
successful
+        if self.delete_local_copy and not has_error:
+            try:
+                import shutil
+                shutil.rmtree(local_loc.parent, ignore_errors=True)
+            except Exception:
+                pass
+
+    def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        """Fetch logs from Loki using LogQL for streaming or retrieval."""
+        labels = _render_log_labels(ti)
+        
+        # 1. Base stream selector (hits low-cardinality index)
+        stream_selector = "{" + ",".join([f'{k}="{v}"' for k, v in 
labels.items()]) + "}"
+        
+        # 2. Line filters (leveraging Loki Bloom filters)
+        run_id = getattr(ti, "run_id", "")
+        try_num = str(ti.try_number)
+        map_idx = str(getattr(ti, "map_index", -1))
+        
+        # Utilizing Loki's `| json` parser and exact match filters for maximum 
TSDB optimization
+        logQL = (
+            f"{stream_selector} "
+            f'| json '
+            f'| task_id="{ti.task_id}" '
+            f'| run_id="{run_id}" '
+            f'| try_number="{try_num}" '
+            f'| map_index="{map_idx}"'
+        )
+        
+        # Query Loki API using configured reliable session
+        resp = self.session.get(f"{self.host}/loki/api/v1/query_range", 
params={"query": logQL}, timeout=(3.0, 15.0))
+        
+        message = []
+        if resp.ok:
+            data = resp.json().get("data", {}).get("result", [])
+            for stream in data:
+                for val in stream.get("values", []):
+                    # parse the underlying JSON structured log we uploaded
+                    log_entry = json.loads(val[1])
+                    message.append(json.dumps(log_entry))
+        
+        return ["loki-remote"], message
+
+
+class LokiTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
+    """
+    The main logging handler injected into Airflow configuration.
+    """
+    LOG_NAME = "Loki"
+
+    @property
+    def log_name(self) -> str:
+        return self.LOG_NAME
+
+    def __init__(self, base_log_folder: str, host: str, frontend: str = "", 
**kwargs):
+        super().__init__(base_log_folder=base_log_folder, **kwargs)
+        self.host = host
+        self.frontend = frontend
+        self.io = LokiRemoteLogIO(
+            host=self.host,
+            base_log_folder=base_log_folder,
+            delete_local_copy=kwargs.get("delete_local_copy", False),
+        )
+        
+        # Register Remote Log IO globally for Airflow 3 Task Supervisor
+        if AIRFLOW_V_3_0_PLUS:
+            if AIRFLOW_V_3_2_PLUS:
+                try:
+                    from airflow.logging_config import _ActiveLoggingConfig
+                    try:
+                        from airflow.logging_config import get_remote_task_log
+                        if callable(get_remote_task_log) and 
get_remote_task_log() is None:
+                            _ActiveLoggingConfig.set(self.io, None)
+                    except ImportError:
+                        pass
+                except ImportError:
+                    pass
+            else:
+                try:
+                    import airflow.logging_config as alc
+                    if getattr(alc, "REMOTE_TASK_LOG", None) is None:
+                        alc.REMOTE_TASK_LOG = self.io
+                except ImportError:
+                    pass
+                    
+    def _read_remote_logs(self, ti: TaskInstance, try_number: int, metadata: 
dict | None = None) -> tuple[list[str], list[str]]:
+        """
+        Called by Airflow 3.x FileTaskHandler._read to fetch remote logs.
+        Airflow 3 native FileTaskHandler manages interleaving these with 
locally streaming worker logs.
+        """
+        return self.io.read("", ti)
+

Review Comment:
   `LokiTaskHandler._read_remote_logs()` ignores the `try_number` argument and 
passes an empty relative path to `self.io.read`. This can return logs for the 
wrong attempt and breaks the standard remote logging contract where reads are 
scoped to the rendered log path/try number. Use the `try_number` (and ideally 
the rendered relative path from `FileTaskHandler`) to query Loki for the 
correct attempt.



##########
providers/grafana/tests/unit/grafana/loki/log/test_loki_task_handler.py:
##########
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import logging
+import json
+import uuid
+import datetime
+from logging import LogRecord
+from unittest.mock import MagicMock
+
+import pytest
+import requests_mock
+
+from airflow.providers.grafana.loki.log.loki_task_handler import 
LokiRemoteLogIO, LokiTaskHandler

Review Comment:
   Several imports in this test file appear unused (`logging`, `uuid`, 
`datetime`, `LogRecord`, and `LokiTaskHandler`). Please remove unused imports 
to keep the unit test focused and avoid lint failures.
   ```suggestion
   import json
   from unittest.mock import MagicMock
   
   import pytest
   import requests_mock
   
   from airflow.providers.grafana.loki.log.loki_task_handler import 
LokiRemoteLogIO
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to