This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3bec5d61818 AIP-99: Add LLMFileAnalysisOperator and
@task.llm_file_analysis (#64077)
3bec5d61818 is described below
commit 3bec5d61818bb49d3d706ef99ca71507c466ae6a
Author: GPK <[email protected]>
AuthorDate: Thu Mar 26 17:21:30 2026 +0000
AIP-99: Add LLMFileAnalysisOperator and @task.llm_file_analysis (#64077)
* Add LLMFileAnalysisOperator and @task.llm_file_analysis to the common-ai
provider
# Conflicts:
# uv.lock
* Fix mypy issues
* Update utils
* Update return model
* Fix spells
* fix up read
* document prefix lookup operation
---
docs/spelling_wordlist.txt | 2 +
providers/common/ai/docs/operators/index.rst | 12 +-
.../common/ai/docs/operators/llm_file_analysis.rst | 141 +++++
providers/common/ai/provider.yaml | 4 +
providers/common/ai/pyproject.toml | 8 +
.../common/ai/decorators/llm_file_analysis.py | 96 +++
.../ai/example_dags/example_llm_file_analysis.py | 133 ++++
.../src/airflow/providers/common/ai/exceptions.py | 16 +
.../providers/common/ai/get_provider_info.py | 6 +
.../common/ai/operators/llm_file_analysis.py | 165 +++++
.../providers/common/ai/utils/file_analysis.py | 670 +++++++++++++++++++++
.../unit/common/ai/assets/__init__.py} | 7 -
.../unit/common/ai/assets/airflow-3-task-sdk.png | Bin 0 -> 126152 bytes
.../common/ai/decorators/test_llm_file_analysis.py | 118 ++++
.../common/ai/operators/test_llm_file_analysis.py | 303 ++++++++++
.../unit/common/ai/utils/test_file_analysis.py | 525 ++++++++++++++++
uv.lock | 12 +-
17 files changed, 2208 insertions(+), 10 deletions(-)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index bee00082c44..df7bea5521c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -256,6 +256,7 @@ cncf
cnf
cnt
codebase
+codec
codecov
codepoints
Colour
@@ -1156,6 +1157,7 @@ passwd
pathlib
pathType
Paxos
+PDFs
PEM
Pem
pem
diff --git a/providers/common/ai/docs/operators/index.rst
b/providers/common/ai/docs/operators/index.rst
index e961931925a..89ba5d15e6c 100644
--- a/providers/common/ai/docs/operators/index.rst
+++ b/providers/common/ai/docs/operators/index.rst
@@ -21,7 +21,7 @@ Common AI Operators
Choosing the right operator
---------------------------
-The common-ai provider ships four operators (and matching ``@task``
decorators). Use this table
+The common-ai provider ships five operators (and matching ``@task``
decorators). Use this table
to pick the one that fits your use case:
.. list-table::
@@ -34,6 +34,9 @@ to pick the one that fits your use case:
* - Single prompt → text or structured output
- :class:`~airflow.providers.common.ai.operators.llm.LLMOperator`
- ``@task.llm``
+ * - Analyze files, prefixes, images, or PDFs with one prompt
+ -
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`
+ - ``@task.llm_file_analysis``
* - LLM picks which downstream task runs
-
:class:`~airflow.providers.common.ai.operators.llm_branch.LLMBranchOperator`
- ``@task.llm_branch``
@@ -46,7 +49,12 @@ to pick the one that fits your use case:
**LLMOperator / @task.llm** — stateless, single-turn calls. Use this for
classification,
summarization, extraction, or any prompt that produces one response. Supports
structured output
-via a ``response_format`` Pydantic model.
+via an ``output_type`` Pydantic model.
+
+**LLMFileAnalysisOperator / @task.llm_file_analysis** — stateless, single-turn
file analysis.
+Use this when the prompt should reason over file contents or multimodal
attachments already chosen
+by the DAG author. The operator resolves files via ``ObjectStoragePath`` and
keeps the interaction
+read-only.
**AgentOperator / @task.agent** — multi-turn tool-calling loop. The model
decides which tools to
invoke and when to stop. Use this when the LLM needs to take actions (query
databases, call APIs,
diff --git a/providers/common/ai/docs/operators/llm_file_analysis.rst
b/providers/common/ai/docs/operators/llm_file_analysis.rst
new file mode 100644
index 00000000000..17d49593b3c
--- /dev/null
+++ b/providers/common/ai/docs/operators/llm_file_analysis.rst
@@ -0,0 +1,141 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/operator:llm_file_analysis:
+
+``LLMFileAnalysisOperator`` & ``@task.llm_file_analysis``
+=========================================================
+
+Use
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`
+or the ``@task.llm_file_analysis`` decorator to analyze files from object
storage
+or local storage with a single prompt.
+
+The operator resolves ``file_path`` through
+:class:`~airflow.providers.common.compat.sdk.ObjectStoragePath`, reads
supported
+formats in a read-only manner, injects file metadata and normalized content
into
+the prompt, and optionally attaches images or PDFs as multimodal inputs.
+
+.. seealso::
+ :ref:`Connection configuration <howto/connection:pydanticai>`
+
+Basic Usage
+-----------
+
+Analyze a text-like file or prefix with one prompt:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+ :language: python
+ :start-after: [START howto_operator_llm_file_analysis_basic]
+ :end-before: [END howto_operator_llm_file_analysis_basic]
+
+Directory / Prefix Analysis
+---------------------------
+
+Use a directory or object-storage prefix when you want the operator to analyze
+multiple files in one request. ``max_files`` bounds how many resolved files are
+included in the request, while the size and text limits keep the request safe:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+ :language: python
+ :start-after: [START howto_operator_llm_file_analysis_prefix]
+ :end-before: [END howto_operator_llm_file_analysis_prefix]
+
+.. note::
+
+ Prefix resolution enumerates objects under the supplied path and checks
each
+ candidate to find files before ``max_files`` is applied. For very large
+ object-store prefixes, prefer a more specific path or a narrower prefix to
+ avoid expensive listing and stat calls.
+
+Multimodal Analysis
+-------------------
+
+Set ``multi_modal=True`` for PNG/JPG/PDF inputs so they are sent as binary
+attachments to a vision-capable model:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+ :language: python
+ :start-after: [START howto_operator_llm_file_analysis_multimodal]
+ :end-before: [END howto_operator_llm_file_analysis_multimodal]
+
+Structured Output
+-----------------
+
+Set ``output_type`` to a Pydantic ``BaseModel`` when you want a typed response
+back from the LLM instead of a plain string:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+ :language: python
+ :start-after: [START howto_operator_llm_file_analysis_structured]
+ :end-before: [END howto_operator_llm_file_analysis_structured]
+
+TaskFlow Decorator
+------------------
+
+The ``@task.llm_file_analysis`` decorator wraps the operator. The function
+returns the prompt string; file settings are passed to the decorator:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
+ :language: python
+ :start-after: [START howto_decorator_llm_file_analysis]
+ :end-before: [END howto_decorator_llm_file_analysis]
+
+Parameters
+----------
+
+- ``prompt``: The analysis request to send to the LLM (operator) or the return
+ value of the decorated function (decorator).
+- ``llm_conn_id``: Airflow connection ID for the LLM provider.
+- ``file_path``: File or prefix to analyze.
+- ``file_conn_id``: Optional connection ID for the storage backend. Overrides a
+ connection embedded in ``file_path``.
+- ``multi_modal``: Allow PNG/JPG/PDF inputs as binary attachments. Default
``False``.
+- ``max_files``: Maximum number of files included from a prefix. Extra files
are
+ omitted and noted in the prompt. Default ``20``.
+- ``max_file_size_bytes``: Maximum size of any single input file. Default ``5
MiB``.
+- ``max_total_size_bytes``: Maximum cumulative size across all resolved files.
+ Default ``20 MiB``.
+- ``max_text_chars``: Maximum normalized text context sent to the LLM after
+ sampling and truncation. Default ``100000``.
+- ``sample_rows``: Maximum number of sampled rows or records included for CSV,
+ Parquet, and Avro inputs. This controls structural preview depth, while
+ ``max_file_size_bytes`` and ``max_total_size_bytes`` are byte-level read
+ guards and ``max_text_chars`` is the final prompt-text budget. Default
``10``.
+- ``model_id``: Model identifier (e.g. ``"openai:gpt-5"``). Overrides the
+ connection's extra field.
+- ``system_prompt``: System-level instructions appended to the operator's
+ built-in read-only guidance.
+- ``output_type``: Expected output type (default: ``str``). Set to a Pydantic
+ ``BaseModel`` for structured output.
+- ``agent_params``: Additional keyword arguments passed to the pydantic-ai
+ ``Agent`` constructor (e.g. ``retries``, ``model_settings``).
+
+Supported Formats
+-----------------
+
+- Text-like: ``.log``, ``.json``, ``.csv``, ``.parquet``, ``.avro``
+- Multimodal: ``.png``, ``.jpg``, ``.jpeg``, ``.pdf`` when ``multi_modal=True``
+- Gzip-compressed text inputs are supported for ``.log.gz``, ``.json.gz``, and
+ ``.csv.gz``.
+- Gzip is not supported for ``.parquet``, ``.avro``, image, or PDF inputs.
+
+Parquet and Avro readers require their corresponding optional extras:
+
+.. code-block:: bash
+
+ pip install apache-airflow-providers-common-ai[parquet]
+ pip install apache-airflow-providers-common-ai[avro]
diff --git a/providers/common/ai/provider.yaml
b/providers/common/ai/provider.yaml
index 43a98af32a3..e267e2ec800 100644
--- a/providers/common/ai/provider.yaml
+++ b/providers/common/ai/provider.yaml
@@ -35,6 +35,7 @@ integrations:
how-to-guide:
- /docs/apache-airflow-providers-common-ai/operators/agent.rst
- /docs/apache-airflow-providers-common-ai/operators/llm.rst
+ -
/docs/apache-airflow-providers-common-ai/operators/llm_file_analysis.rst
- /docs/apache-airflow-providers-common-ai/operators/llm_branch.rst
- /docs/apache-airflow-providers-common-ai/operators/llm_sql.rst
-
/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst
@@ -288,6 +289,7 @@ operators:
python-modules:
- airflow.providers.common.ai.operators.agent
- airflow.providers.common.ai.operators.llm
+ - airflow.providers.common.ai.operators.llm_file_analysis
- airflow.providers.common.ai.operators.llm_branch
- airflow.providers.common.ai.operators.llm_sql
- airflow.providers.common.ai.operators.llm_schema_compare
@@ -297,6 +299,8 @@ task-decorators:
name: agent
- class-name: airflow.providers.common.ai.decorators.llm.llm_task
name: llm
+ - class-name:
airflow.providers.common.ai.decorators.llm_file_analysis.llm_file_analysis_task
+ name: llm_file_analysis
- class-name:
airflow.providers.common.ai.decorators.llm_branch.llm_branch_task
name: llm_branch
- class-name: airflow.providers.common.ai.decorators.llm_sql.llm_sql_task
diff --git a/providers/common/ai/pyproject.toml
b/providers/common/ai/pyproject.toml
index ec23da07b82..fe5ed3d6b69 100644
--- a/providers/common/ai/pyproject.toml
+++ b/providers/common/ai/pyproject.toml
@@ -80,6 +80,14 @@ dependencies = [
"google" = ["pydantic-ai-slim[google]"]
"openai" = ["pydantic-ai-slim[openai]"]
"mcp" = ["pydantic-ai-slim[mcp]"]
+"avro" = [
+ 'fastavro>=1.10.0; python_version < "3.14"',
+ 'fastavro>=1.12.1; python_version >= "3.14"',
+]
+"parquet" = [
+ "pyarrow>=18.0.0; python_version < '3.14'",
+ "pyarrow>=22.0.0; python_version >= '3.14'",
+]
"sql" = [
"apache-airflow-providers-common-sql",
"sqlglot>=30.0.0",
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
new file mode 100644
index 00000000000..c9451b3fbee
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_file_analysis.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""TaskFlow decorator for LLM-backed file analysis."""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Collection, Mapping, Sequence
+from typing import TYPE_CHECKING, Any, ClassVar
+
+from airflow.providers.common.ai.operators.llm_file_analysis import
LLMFileAnalysisOperator
+from airflow.providers.common.compat.sdk import (
+ DecoratedOperator,
+ TaskDecorator,
+ context_merge,
+ determine_kwargs,
+ task_decorator_factory,
+)
+from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
+
+if TYPE_CHECKING:
+ from airflow.sdk import Context
+
+
+class _LLMFileAnalysisDecoratedOperator(DecoratedOperator,
LLMFileAnalysisOperator):
+ """Wrap a callable that returns the prompt string for file analysis."""
+
+ template_fields: Sequence[str] = (
+ *DecoratedOperator.template_fields,
+ *LLMFileAnalysisOperator.template_fields,
+ )
+ template_fields_renderers: ClassVar[dict[str, str]] = {
+ **DecoratedOperator.template_fields_renderers,
+ }
+
+ custom_operator_name: str = "@task.llm_file_analysis"
+
+ def __init__(
+ self,
+ *,
+ python_callable: Callable,
+ op_args: Collection[Any] | None = None,
+ op_kwargs: Mapping[str, Any] | None = None,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(
+ python_callable=python_callable,
+ op_args=op_args,
+ op_kwargs=op_kwargs,
+ prompt=SET_DURING_EXECUTION,
+ **kwargs,
+ )
+
+ def execute(self, context: Context) -> Any:
+ context_merge(context, self.op_kwargs)
+ kwargs = determine_kwargs(self.python_callable, self.op_args, context)
+
+ self.prompt = self.python_callable(*self.op_args, **kwargs)
+ if not isinstance(self.prompt, str) or not self.prompt.strip():
+ raise TypeError(
+ "The returned value from the @task.llm_file_analysis callable
must be a non-empty string."
+ )
+
+ self.render_template_fields(context)
+ return LLMFileAnalysisOperator.execute(self, context)
+
+
+def llm_file_analysis_task(
+ python_callable: Callable | None = None,
+ **kwargs: Any,
+) -> TaskDecorator:
+ """
+ Wrap a callable that returns a prompt into an LLM-backed file-analysis
task.
+
+ Any file-analysis keyword arguments accepted by
+
:class:`~airflow.providers.common.ai.operators.llm_file_analysis.LLMFileAnalysisOperator`,
+ including ``sample_rows``, can be passed through this decorator.
+ """
+ return task_decorator_factory(
+ python_callable=python_callable,
+ decorated_operator_class=_LLMFileAnalysisDecoratedOperator,
+ **kwargs,
+ )
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
new file mode 100644
index 00000000000..7fe2bb89a64
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAGs demonstrating LLMFileAnalysisOperator usage."""
+
+from __future__ import annotations
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm_file_analysis import
LLMFileAnalysisOperator
+from airflow.providers.common.compat.sdk import dag, task
+
+
+# [START howto_operator_llm_file_analysis_basic]
+@dag
+def example_llm_file_analysis_basic():
+ LLMFileAnalysisOperator(
+ task_id="analyze_error_logs",
+ prompt="Find error patterns and correlate them with deployment
timestamps.",
+ llm_conn_id="pydanticai_default",
+ file_path="s3://logs/app/2024-01-15/",
+ file_conn_id="aws_default",
+ )
+
+
+# [END howto_operator_llm_file_analysis_basic]
+
+example_llm_file_analysis_basic()
+
+
+# [START howto_operator_llm_file_analysis_prefix]
+@dag
+def example_llm_file_analysis_prefix():
+ LLMFileAnalysisOperator(
+ task_id="summarize_partitioned_logs",
+ prompt=(
+ "Summarize recurring errors across these partitioned log files and
call out "
+ "which partition keys appear in the highest-severity findings."
+ ),
+ llm_conn_id="pydanticai_default",
+ file_path="s3://logs/app/dt=2024-01-15/",
+ file_conn_id="aws_default",
+ max_files=10,
+ max_total_size_bytes=10 * 1024 * 1024,
+ max_text_chars=20_000,
+ )
+
+
+# [END howto_operator_llm_file_analysis_prefix]
+
+example_llm_file_analysis_prefix()
+
+
+# [START howto_operator_llm_file_analysis_multimodal]
+@dag
+def example_llm_file_analysis_multimodal():
+ LLMFileAnalysisOperator(
+ task_id="validate_dashboards",
+ prompt="Check charts for visual anomalies or stale data indicators.",
+ llm_conn_id="pydanticai_default",
+ file_path="s3://monitoring/dashboards/latest.png",
+ file_conn_id="aws_default",
+ multi_modal=True,
+ )
+
+
+# [END howto_operator_llm_file_analysis_multimodal]
+
+example_llm_file_analysis_multimodal()
+
+
+# [START howto_operator_llm_file_analysis_structured]
+@dag
+def example_llm_file_analysis_structured():
+
+ class FileAnalysisSummary(BaseModel):
+ """Structured output schema for the file-analysis examples."""
+
+ findings: list[str]
+ highest_severity: str
+ truncated_inputs: bool
+
+ LLMFileAnalysisOperator(
+ task_id="analyze_parquet_quality",
+ prompt=(
+ "Return the top data-quality findings from this Parquet dataset. "
+ "Include whether any inputs were truncated."
+ ),
+ llm_conn_id="pydanticai_default",
+ file_path="s3://analytics/warehouse/customers/",
+ file_conn_id="aws_default",
+ output_type=FileAnalysisSummary,
+ sample_rows=5,
+ max_files=5,
+ )
+
+
+# [END howto_operator_llm_file_analysis_structured]
+
+example_llm_file_analysis_structured()
+
+
+# [START howto_decorator_llm_file_analysis]
+@dag
+def example_llm_file_analysis_decorator():
+ @task.llm_file_analysis(
+ llm_conn_id="pydanticai_default",
+ file_path="s3://analytics/reports/quarterly.pdf",
+ file_conn_id="aws_default",
+ multi_modal=True,
+ )
+ def review_quarterly_report():
+ return "Extract the key revenue, risk, and compliance findings from
this report."
+
+ review_quarterly_report()
+
+
+# [END howto_decorator_llm_file_analysis]
+
+example_llm_file_analysis_decorator()
diff --git a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
b/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
index 7c83f30d68a..d0412fb2a48 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
@@ -21,3 +21,19 @@ from airflow.providers.common.compat.sdk import
AirflowException
class HITLMaxIterationsError(AirflowException):
"""Raised when the HITL review loop exhausts max iterations without
approval or rejection."""
+
+
+class LLMFileAnalysisError(ValueError):
+ """Base class for file-analysis validation errors."""
+
+
+class LLMFileAnalysisUnsupportedFormatError(LLMFileAnalysisError):
+ """Raised when a file format is not supported by LLM file analysis."""
+
+
+class LLMFileAnalysisLimitExceededError(LLMFileAnalysisError):
+ """Raised when file-analysis safety limits are exceeded."""
+
+
+class
LLMFileAnalysisMultimodalRequiredError(LLMFileAnalysisUnsupportedFormatError):
+ """Raised when image/PDF inputs are used without ``multi_modal=True``."""
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
index 90ae393d64d..9a216856fb0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py
@@ -33,6 +33,7 @@ def get_provider_info():
"how-to-guide": [
"/docs/apache-airflow-providers-common-ai/operators/agent.rst",
"/docs/apache-airflow-providers-common-ai/operators/llm.rst",
+
"/docs/apache-airflow-providers-common-ai/operators/llm_file_analysis.rst",
"/docs/apache-airflow-providers-common-ai/operators/llm_branch.rst",
"/docs/apache-airflow-providers-common-ai/operators/llm_sql.rst",
"/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst",
@@ -241,6 +242,7 @@ def get_provider_info():
"python-modules": [
"airflow.providers.common.ai.operators.agent",
"airflow.providers.common.ai.operators.llm",
+ "airflow.providers.common.ai.operators.llm_file_analysis",
"airflow.providers.common.ai.operators.llm_branch",
"airflow.providers.common.ai.operators.llm_sql",
"airflow.providers.common.ai.operators.llm_schema_compare",
@@ -250,6 +252,10 @@ def get_provider_info():
"task-decorators": [
{"class-name":
"airflow.providers.common.ai.decorators.agent.agent_task", "name": "agent"},
{"class-name":
"airflow.providers.common.ai.decorators.llm.llm_task", "name": "llm"},
+ {
+ "class-name":
"airflow.providers.common.ai.decorators.llm_file_analysis.llm_file_analysis_task",
+ "name": "llm_file_analysis",
+ },
{
"class-name":
"airflow.providers.common.ai.decorators.llm_branch.llm_branch_task",
"name": "llm_branch",
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
new file mode 100644
index 00000000000..bb10b66680d
--- /dev/null
+++
b/providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for analyzing files with LLMs."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm import LLMOperator
+from airflow.providers.common.ai.utils.file_analysis import
build_file_analysis_request
+from airflow.providers.common.ai.utils.logging import log_run_summary
+
+if TYPE_CHECKING:
+ from pydantic_ai import Agent
+
+ from airflow.sdk import Context
+
+
+class LLMFileAnalysisOperator(LLMOperator):
+ """
+ Analyze files from object storage or local storage using a single LLM call.
+
+ The operator resolves ``file_path`` via
+ :class:`~airflow.providers.common.compat.sdk.ObjectStoragePath`, normalizes
+ supported formats into text context, and optionally attaches images/PDFs as
+ multimodal inputs when ``multi_modal=True``.
+
+ :param prompt: The analysis prompt for the LLM.
+ :param llm_conn_id: Connection ID for the LLM provider.
+ :param file_path: File or prefix to analyze.
+ :param file_conn_id: Optional connection ID for the storage backend.
+ Overrides a connection embedded in ``file_path``.
+ :param multi_modal: Allow PNG/JPG/PDF inputs as binary attachments.
+ Default ``False``.
+ :param max_files: Maximum number of files to include from a prefix.
+ Excess files are omitted and noted in the prompt. Default ``20``.
+ :param max_file_size_bytes: Maximum size of any single input file.
+ Default ``5 MiB``.
+ :param max_total_size_bytes: Maximum cumulative size across all resolved
+ files. Default ``20 MiB``.
+ :param max_text_chars: Maximum normalized text context passed to the LLM
+ after sampling/truncation. Default ``100000``.
+ :param sample_rows: Maximum number of sampled rows or records included for
+ CSV, Parquet, and Avro inputs. This limits structural preview depth,
+ while ``max_file_size_bytes`` and ``max_total_size_bytes`` limit bytes
+ read from storage and ``max_text_chars`` limits the final prompt text
+ budget. Default ``10``.
+ """
+
+ template_fields: Sequence[str] = (
+ *LLMOperator.template_fields,
+ "file_path",
+ "file_conn_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ file_path: str,
+ file_conn_id: str | None = None,
+ multi_modal: bool = False,
+ max_files: int = 20,
+ max_file_size_bytes: int = 5 * 1024 * 1024,
+ max_total_size_bytes: int = 20 * 1024 * 1024,
+ max_text_chars: int = 100_000,
+ sample_rows: int = 10,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ if max_files <= 0:
+ raise ValueError("max_files must be greater than zero.")
+ if max_file_size_bytes <= 0:
+ raise ValueError("max_file_size_bytes must be greater than zero.")
+ if max_total_size_bytes <= 0:
+ raise ValueError("max_total_size_bytes must be greater than zero.")
+ if max_text_chars <= 0:
+ raise ValueError("max_text_chars must be greater than zero.")
+ if sample_rows <= 0:
+ raise ValueError("sample_rows must be greater than zero.")
+
+ self.file_path = file_path
+ self.file_conn_id = file_conn_id
+ self.multi_modal = multi_modal
+ self.max_files = max_files
+ self.max_file_size_bytes = max_file_size_bytes
+ self.max_total_size_bytes = max_total_size_bytes
+ self.max_text_chars = max_text_chars
+ self.sample_rows = sample_rows
+
+ def execute(self, context: Context) -> Any:
+ request = build_file_analysis_request(
+ file_path=self.file_path,
+ file_conn_id=self.file_conn_id,
+ prompt=self.prompt,
+ multi_modal=self.multi_modal,
+ max_files=self.max_files,
+ max_file_size_bytes=self.max_file_size_bytes,
+ max_total_size_bytes=self.max_total_size_bytes,
+ max_text_chars=self.max_text_chars,
+ sample_rows=self.sample_rows,
+ )
+ self.log.info(
+ "Calling model for file analysis: files=%s, attachments=%s,
text_files=%s, total_size_bytes=%s, "
+ "omitted_files=%s, text_truncated=%s, multi_modal=%s,
sample_rows=%s",
+ len(request.resolved_paths),
+ request.attachment_count,
+ request.text_file_count,
+ request.total_size_bytes,
+ request.omitted_files,
+ request.text_truncated,
+ self.multi_modal,
+ self.sample_rows,
+ )
+ self.log.debug("Resolved file analysis paths: %s",
request.resolved_paths)
+ agent: Agent[None, Any] = self.llm_hook.create_agent(
+ output_type=self.output_type,
+ instructions=self._build_system_prompt(),
+ **self.agent_params,
+ )
+ result = agent.run_sync(request.user_content)
+ log_run_summary(self.log, result)
+ output = result.output
+
+ if self.require_approval:
+ self.defer_for_approval(context, output) # type: ignore[misc]
+
+ if isinstance(output, BaseModel):
+ output = output.model_dump()
+
+ return output
+
+ def execute_complete(self, context: Context, generated_output: str, event:
dict[str, Any]) -> Any:
+ """Resume after human review, restoring structured outputs for XCom
consumers."""
+ output = super().execute_complete(context, generated_output, event)
+ if isinstance(self.output_type, type) and issubclass(self.output_type,
BaseModel):
+ return self.output_type.model_validate_json(output).model_dump()
+ return output
+
+ def _build_system_prompt(self) -> str:
+ prompt = (
+ "You are a read-only file analysis assistant.\n"
+ "Use only the provided metadata, normalized file content, and
multimodal attachments.\n"
+ "Do not claim to have modified files or executed any external
actions.\n"
+ "If file content is truncated or sampled, say so in your answer."
+ )
+ if self.system_prompt:
+ prompt += f"\n\nAdditional instructions:\n{self.system_prompt}"
+ return prompt
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py
b/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py
new file mode 100644
index 00000000000..f962119616e
--- /dev/null
+++ b/providers/common/ai/src/airflow/providers/common/ai/utils/file_analysis.py
@@ -0,0 +1,670 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Helpers for building file-analysis prompts for LLM operators."""
+
+from __future__ import annotations
+
+import csv
+import gzip
+import io
+import json
+import logging
+from bisect import insort
+from dataclasses import dataclass
+from pathlib import PurePosixPath
+from typing import TYPE_CHECKING, Any
+
+from pydantic_ai.messages import BinaryContent
+
+from airflow.providers.common.ai.exceptions import (
+ LLMFileAnalysisLimitExceededError,
+ LLMFileAnalysisMultimodalRequiredError,
+ LLMFileAnalysisUnsupportedFormatError,
+)
+from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException, ObjectStoragePath
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+ from pydantic_ai.messages import UserContent
+
+SUPPORTED_FILE_FORMATS: tuple[str, ...] = (
+ "avro",
+ "csv",
+ "jpeg",
+ "jpg",
+ "json",
+ "log",
+ "parquet",
+ "pdf",
+ "png",
+)
+
+_TEXT_LIKE_FORMATS = frozenset({"csv", "json", "log", "avro", "parquet"})
+_MULTI_MODAL_FORMATS = frozenset({"jpeg", "jpg", "pdf", "png"})
+_COMPRESSION_SUFFIXES = {
+ "bz2": "bzip2",
+ "gz": "gzip",
+ "snappy": "snappy",
+ "xz": "xz",
+ "zst": "zstd",
+}
+_GZIP_SUPPORTED_FORMATS = frozenset({"csv", "json", "log"})
+_TEXT_SAMPLE_HEAD_CHARS = 8_000
+_TEXT_SAMPLE_TAIL_CHARS = 2_000
+_MEDIA_TYPES = {
+ "jpeg": "image/jpeg",
+ "jpg": "image/jpeg",
+ "pdf": "application/pdf",
+ "png": "image/png",
+}
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class FileAnalysisRequest:
+ """Prepared prompt content and discovery metadata for the file-analysis
operator."""
+
+ user_content: str | Sequence[UserContent]
+ resolved_paths: list[str]
+ total_size_bytes: int
+ omitted_files: int = 0
+ text_truncated: bool = False
+ attachment_count: int = 0
+ text_file_count: int = 0
+
+
+@dataclass
+class _PreparedFile:
+ path: ObjectStoragePath
+ file_format: str
+ size_bytes: int
+ compression: str | None
+ partitions: tuple[str, ...]
+ estimated_rows: int | None = None
+ text_content: str | None = None
+ attachment: BinaryContent | None = None
+ content_size_bytes: int = 0
+ content_truncated: bool = False
+ content_omitted: bool = False
+
+
+@dataclass
+class _DiscoveredFile:
+ path: ObjectStoragePath
+ file_format: str
+ size_bytes: int
+ compression: str | None
+
+
+@dataclass
+class _RenderResult:
+ text: str
+ estimated_rows: int | None
+ content_size_bytes: int
+
+
+def build_file_analysis_request(
+ *,
+ file_path: str,
+ file_conn_id: str | None,
+ prompt: str,
+ multi_modal: bool,
+ max_files: int,
+ max_file_size_bytes: int,
+ max_total_size_bytes: int,
+ max_text_chars: int,
+ sample_rows: int,
+) -> FileAnalysisRequest:
+ """Resolve files, normalize supported formats, and build prompt content
for an LLM run."""
+ if sample_rows <= 0:
+ raise ValueError("sample_rows must be greater than zero.")
+ log.info(
+ "Preparing file analysis request for path=%s, file_conn_id=%s,
multi_modal=%s, "
+ "max_files=%s, max_file_size_bytes=%s, max_total_size_bytes=%s,
max_text_chars=%s, sample_rows=%s",
+ file_path,
+ file_conn_id,
+ multi_modal,
+ max_files,
+ max_file_size_bytes,
+ max_total_size_bytes,
+ max_text_chars,
+ sample_rows,
+ )
+ root = ObjectStoragePath(file_path, conn_id=file_conn_id)
+ resolved_paths, omitted_files = _resolve_paths(root=root,
max_files=max_files)
+ log.info(
+ "Resolved %s file(s) from %s%s",
+ len(resolved_paths),
+ file_path,
+ f"; omitted {omitted_files} additional file(s) due to max_files limit"
if omitted_files else "",
+ )
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug("Resolved file paths: %s", [str(path) for path in
resolved_paths])
+
+ discovered_files: list[_DiscoveredFile] = []
+ total_size_bytes = 0
+ for path in resolved_paths:
+ discovered = _discover_file(
+ path=path,
+ max_file_size_bytes=max_file_size_bytes,
+ )
+ total_size_bytes += discovered.size_bytes
+ if total_size_bytes > max_total_size_bytes:
+ log.info(
+ "Rejecting file set before content reads because cumulative
size reached %s bytes (limit=%s bytes).",
+ total_size_bytes,
+ max_total_size_bytes,
+ )
+ raise LLMFileAnalysisLimitExceededError(
+ "Total input size exceeds the configured limit: "
+ f"{total_size_bytes} bytes > {max_total_size_bytes} bytes."
+ )
+ discovered_files.append(discovered)
+
+ log.info(
+ "Validated byte limits for %s file(s) before reading file contents;
total_size_bytes=%s.",
+ len(discovered_files),
+ total_size_bytes,
+ )
+
+ prepared_files: list[_PreparedFile] = []
+ processed_size_bytes = 0
+ for discovered in discovered_files:
+ remaining_content_bytes = max_total_size_bytes - processed_size_bytes
+ if remaining_content_bytes <= 0:
+ raise LLMFileAnalysisLimitExceededError(
+ "Total processed input size exceeds the configured limit after
decompression."
+ )
+ prepared = _prepare_file(
+ discovered_file=discovered,
+ multi_modal=multi_modal,
+ sample_rows=sample_rows,
+ max_content_bytes=min(max_file_size_bytes,
remaining_content_bytes),
+ )
+ processed_size_bytes += prepared.content_size_bytes
+ prepared_files.append(prepared)
+
+ text_truncated = _apply_text_budget(prepared_files=prepared_files,
max_text_chars=max_text_chars)
+ if text_truncated:
+ log.info("Normalized text content exceeded max_text_chars=%s and was
truncated.", max_text_chars)
+ text_preamble = _build_text_preamble(
+ prompt=prompt,
+ prepared_files=prepared_files,
+ omitted_files=omitted_files,
+ text_truncated=text_truncated,
+ )
+ attachments = [prepared.attachment for prepared in prepared_files if
prepared.attachment is not None]
+ text_file_count = sum(1 for prepared in prepared_files if
prepared.text_content is not None)
+ user_content: str | list[UserContent]
+ if attachments:
+ user_content = [text_preamble, *attachments]
+ else:
+ user_content = text_preamble
+ log.info(
+ "Prepared file analysis request with %s text file(s), %s
attachment(s), total_size_bytes=%s.",
+ text_file_count,
+ len(attachments),
+ total_size_bytes,
+ )
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug("Prepared text preamble length=%s", len(text_preamble))
+ return FileAnalysisRequest(
+ user_content=user_content,
+ resolved_paths=[str(path) for path in resolved_paths],
+ total_size_bytes=total_size_bytes,
+ omitted_files=omitted_files,
+ text_truncated=text_truncated,
+ attachment_count=len(attachments),
+ text_file_count=text_file_count,
+ )
+
+
+def _resolve_paths(*, root: ObjectStoragePath, max_files: int) ->
tuple[list[ObjectStoragePath], int]:
+ try:
+ if root.is_file():
+ return [root], 0
+ except FileNotFoundError:
+ pass
+
+ try:
+ selected: list[tuple[str, ObjectStoragePath]] = []
+ omitted_files = 0
+ for path in root.rglob("*"):
+ if not path.is_file():
+ continue
+ path_key = str(path)
+ if len(selected) < max_files:
+ insort(selected, (path_key, path))
+ continue
+ if path_key < selected[-1][0]:
+ insort(selected, (path_key, path))
+ selected.pop()
+ omitted_files += 1
+ except (FileNotFoundError, NotADirectoryError):
+ selected = []
+ omitted_files = 0
+
+ if not selected:
+ raise FileNotFoundError(f"No files found for {root}.")
+
+ return [path for _, path in selected], omitted_files
+
+
+def _discover_file(*, path: ObjectStoragePath, max_file_size_bytes: int) ->
_DiscoveredFile:
+ file_format, compression = detect_file_format(path)
+ size_bytes = path.stat().st_size
+ log.debug(
+ "Discovered file %s (format=%s, size_bytes=%s%s).",
+ path,
+ file_format,
+ size_bytes,
+ f", compression={compression}" if compression else "",
+ )
+ if size_bytes > max_file_size_bytes:
+ log.info(
+ "Rejecting file %s because size_bytes=%s exceeds the per-file
limit=%s.",
+ path,
+ size_bytes,
+ max_file_size_bytes,
+ )
+ raise LLMFileAnalysisLimitExceededError(
+ f"File {path} exceeds the configured per-file limit: {size_bytes}
bytes > {max_file_size_bytes} bytes."
+ )
+ return _DiscoveredFile(
+ path=path,
+ file_format=file_format,
+ size_bytes=size_bytes,
+ compression=compression,
+ )
+
+
+def _prepare_file(
+ *,
+ discovered_file: _DiscoveredFile,
+ multi_modal: bool,
+ sample_rows: int,
+ max_content_bytes: int,
+) -> _PreparedFile:
+ path = discovered_file.path
+ file_format = discovered_file.file_format
+ size_bytes = discovered_file.size_bytes
+ compression = discovered_file.compression
+ log.debug(
+ "Preparing file content for %s (format=%s, size_bytes=%s%s).",
+ path,
+ file_format,
+ size_bytes,
+ f", compression={compression}" if compression else "",
+ )
+ prepared = _PreparedFile(
+ path=path,
+ file_format=file_format,
+ size_bytes=size_bytes,
+ compression=compression,
+ partitions=_infer_partitions(path),
+ )
+
+ if file_format in _MULTI_MODAL_FORMATS:
+ if not multi_modal:
+ log.info("Rejecting file %s because format=%s requires
multi_modal=True.", path, file_format)
+ raise LLMFileAnalysisMultimodalRequiredError(
+ f"File {path} has format {file_format!r}; set multi_modal=True
to analyze images or PDFs."
+ )
+ prepared.attachment = BinaryContent(
+ data=_read_raw_bytes(path, compression=compression,
max_bytes=max_content_bytes),
+ media_type=_MEDIA_TYPES[file_format],
+ identifier=str(path),
+ )
+ prepared.content_size_bytes = len(prepared.attachment.data)
+ log.debug(
+ "Attached %s as multimodal binary content with media_type=%s.",
path, _MEDIA_TYPES[file_format]
+ )
+ return prepared
+
+ render_result = _render_text_content(
+ path=path,
+ file_format=file_format,
+ compression=compression,
+ sample_rows=sample_rows,
+ max_content_bytes=max_content_bytes,
+ )
+ prepared.text_content = render_result.text
+ prepared.estimated_rows = render_result.estimated_rows
+ prepared.content_size_bytes = render_result.content_size_bytes
+ log.debug(
+ "Normalized %s into text content of %s characters%s.",
+ path,
+ len(render_result.text),
+ f"; estimated_rows={render_result.estimated_rows}"
+ if render_result.estimated_rows is not None
+ else "",
+ )
+ return prepared
+
+
+def detect_file_format(path: ObjectStoragePath) -> tuple[str, str | None]:
+ """Detect the logical file format and compression codec from a path
suffix."""
+ suffixes = [suffix.removeprefix(".").lower() for suffix in path.suffixes]
+ compression: str | None = None
+ if suffixes and suffixes[-1] in _COMPRESSION_SUFFIXES:
+ compression = _COMPRESSION_SUFFIXES[suffixes[-1]]
+ suffixes = suffixes[:-1]
+ detected = suffixes[-1] if suffixes else "log"
+ if detected not in SUPPORTED_FILE_FORMATS:
+ raise LLMFileAnalysisUnsupportedFormatError(
+ f"Unsupported file format {detected!r} for {path}. Supported
formats: {', '.join(SUPPORTED_FILE_FORMATS)}."
+ )
+ if compression and compression != "gzip":
+ log.info("Rejecting file %s because compression=%s is not supported.",
path, compression)
+ raise LLMFileAnalysisUnsupportedFormatError(
+ f"Compression {compression!r} is not supported for file analysis."
+ )
+ if compression == "gzip" and detected not in _GZIP_SUPPORTED_FORMATS:
+ raise LLMFileAnalysisUnsupportedFormatError(
+ f"Compression {compression!r} is not supported for {detected!r}
file analysis."
+ )
+ return detected, compression
+
+
+def _render_text_content(
+ *,
+ path: ObjectStoragePath,
+ file_format: str,
+ compression: str | None,
+ sample_rows: int,
+ max_content_bytes: int,
+) -> _RenderResult:
+ if file_format == "json":
+ return _render_json(path, compression=compression,
max_content_bytes=max_content_bytes)
+ if file_format == "csv":
+ return _render_csv(
+ path, compression=compression, sample_rows=sample_rows,
max_content_bytes=max_content_bytes
+ )
+ if file_format == "parquet":
+ return _render_parquet(path, sample_rows=sample_rows,
max_content_bytes=max_content_bytes)
+ if file_format == "avro":
+ return _render_avro(path, sample_rows=sample_rows,
max_content_bytes=max_content_bytes)
+ return _render_text_like(path, compression=compression,
max_content_bytes=max_content_bytes)
+
+
+def _render_text_like(
+ path: ObjectStoragePath, *, compression: str | None, max_content_bytes: int
+) -> _RenderResult:
+ raw_bytes = _read_raw_bytes(path, compression=compression,
max_bytes=max_content_bytes)
+ text = _decode_text(raw_bytes)
+ return _RenderResult(text=_truncate_text(text), estimated_rows=None,
content_size_bytes=len(raw_bytes))
+
+
+def _render_json(
+ path: ObjectStoragePath, *, compression: str | None, max_content_bytes: int
+) -> _RenderResult:
+ raw_bytes = _read_raw_bytes(path, compression=compression,
max_bytes=max_content_bytes)
+ decoded = _decode_text(raw_bytes)
+ document = json.loads(decoded)
+ if isinstance(document, list):
+ estimated_rows = len(document)
+ else:
+ estimated_rows = None
+ pretty = json.dumps(document, indent=2, sort_keys=True, default=str)
+ return _RenderResult(
+ text=_truncate_text(pretty),
+ estimated_rows=estimated_rows,
+ content_size_bytes=len(raw_bytes),
+ )
+
+
+def _render_csv(
+ path: ObjectStoragePath, *, compression: str | None, sample_rows: int,
max_content_bytes: int
+) -> _RenderResult:
+ raw_bytes = _read_raw_bytes(path, compression=compression,
max_bytes=max_content_bytes)
+ decoded = _decode_text(raw_bytes)
+ reader = list(csv.reader(io.StringIO(decoded)))
+ if not reader:
+ return _RenderResult(text="", estimated_rows=0,
content_size_bytes=len(raw_bytes))
+ header, rows = reader[0], reader[1:]
+ sampled_rows = rows[:sample_rows]
+ payload = ["Header: " + ", ".join(header)]
+ if sampled_rows:
+ payload.append("Sample rows:")
+ payload += [", ".join(str(value) for value in row) for row in
sampled_rows]
+ return _RenderResult(
+ text=_truncate_text("\n".join(payload)),
+ estimated_rows=len(rows),
+ content_size_bytes=len(raw_bytes),
+ )
+
+
+def _render_parquet(path: ObjectStoragePath, *, sample_rows: int,
max_content_bytes: int) -> _RenderResult:
+ try:
+ import pyarrow.parquet as pq
+ except ImportError as exc:
+ raise AirflowOptionalProviderFeatureException(
+ "Parquet analysis requires the `parquet` extra for
apache-airflow-providers-common-ai."
+ ) from exc
+
+ with path.open("rb") as handle:
+ parquet_file = pq.ParquetFile(handle)
+ metadata = parquet_file.metadata
+ num_rows = metadata.num_rows if metadata is not None else 0
+
+ handle.seek(0, io.SEEK_END)
+ content_size_bytes = handle.tell()
+ handle.seek(0)
+ if content_size_bytes > max_content_bytes:
+ raise LLMFileAnalysisLimitExceededError(
+ f"File {path} exceeds the configured processed-content limit:
{content_size_bytes} bytes > {max_content_bytes} bytes."
+ )
+
+ schema = ", ".join(f"{field.name}: {field.type}" for field in
parquet_file.schema_arrow)
+ sampled_rows: list[dict[str, Any]] = []
+ if sample_rows > 0 and num_rows > 0 and parquet_file.num_row_groups >
0:
+ remaining_rows = sample_rows
+ for row_group_index in range(parquet_file.num_row_groups):
+ if remaining_rows <= 0:
+ break
+ row_group = parquet_file.read_row_group(row_group_index)
+ if row_group.num_rows == 0:
+ continue
+ group_rows = row_group.slice(0, remaining_rows).to_pylist()
+ sampled_rows.extend(group_rows)
+ remaining_rows -= len(group_rows)
+ payload = [f"Schema: {schema}", "Sample rows:", json.dumps(sampled_rows,
indent=2, default=str)]
+ return _RenderResult(
+ text=_truncate_text("\n".join(payload)),
+ estimated_rows=num_rows,
+ content_size_bytes=content_size_bytes,
+ )
+
+
+def _render_avro(path: ObjectStoragePath, *, sample_rows: int,
max_content_bytes: int) -> _RenderResult:
+ try:
+ import fastavro
+ except ImportError as exc:
+ raise AirflowOptionalProviderFeatureException(
+ "Avro analysis requires the `avro` extra for
apache-airflow-providers-common-ai."
+ ) from exc
+
+ sampled_rows: list[dict[str, Any]] = []
+ total_rows = 0
+ with path.open("rb") as handle:
+ handle.seek(0, io.SEEK_END)
+ content_size_bytes = handle.tell()
+ handle.seek(0)
+ if content_size_bytes > max_content_bytes:
+ raise LLMFileAnalysisLimitExceededError(
+ f"File {path} exceeds the configured processed-content limit:
{content_size_bytes} bytes > {max_content_bytes} bytes."
+ )
+ reader = fastavro.reader(handle)
+ writer_schema = getattr(reader, "writer_schema", None)
+ fully_read = False
+ if sample_rows > 0:
+ for record in reader:
+ total_rows += 1
+ if isinstance(record, dict):
+ sampled_rows.append({str(key): value for key, value in
record.items()})
+ if total_rows >= sample_rows:
+ break
+ else:
+ fully_read = True
+ payload = [
+ f"Schema: {json.dumps(writer_schema, indent=2, default=str)}",
+ "Sample rows:",
+ json.dumps(sampled_rows, indent=2, default=str),
+ ]
+ return _RenderResult(
+ text=_truncate_text("\n".join(payload)),
+ estimated_rows=total_rows if fully_read else None,
+ content_size_bytes=content_size_bytes,
+ )
+
+
+def _read_raw_bytes(path: ObjectStoragePath, *, compression: str | None,
max_bytes: int) -> bytes:
+ with path.open("rb") as handle:
+ if compression == "gzip":
+ with gzip.GzipFile(fileobj=handle) as gzip_handle:
+ return _read_limited_bytes(gzip_handle, path=path,
max_bytes=max_bytes)
+ return _read_limited_bytes(handle, path=path, max_bytes=max_bytes)
+
+
+def _read_limited_bytes(handle: io.BufferedIOBase, *, path: ObjectStoragePath,
max_bytes: int) -> bytes:
+ chunks: list[bytes] = []
+ total_bytes = 0
+ while True:
+ chunk = handle.read(min(64 * 1024, max_bytes - total_bytes + 1))
+ if not chunk:
+ break
+ total_bytes += len(chunk)
+ if total_bytes > max_bytes:
+ raise LLMFileAnalysisLimitExceededError(
+ f"File {path} exceeds the configured processed-content limit:
> {max_bytes} bytes."
+ )
+ chunks.append(chunk)
+ return b"".join(chunks)
+
+
+def _decode_text(data: bytes) -> str:
+ return data.decode("utf-8", errors="replace")
+
+
+def _apply_text_budget(*, prepared_files: list[_PreparedFile], max_text_chars:
int) -> bool:
+ remaining = max_text_chars
+ truncated_any = False
+ for prepared in prepared_files:
+ if prepared.text_content is None:
+ continue
+ if remaining <= 0:
+ prepared.text_content = None
+ prepared.content_omitted = True
+ truncated_any = True
+ log.debug(
+ "Omitted normalized text for %s because the prompt text budget
was exhausted.", prepared.path
+ )
+ continue
+ original = prepared.text_content
+ if len(original) > remaining:
+ prepared.text_content = _truncate_text(original,
max_chars=remaining)
+ prepared.content_truncated = True
+ truncated_any = True
+ log.debug(
+ "Truncated normalized text for %s from %s to %s characters to
fit the remaining budget.",
+ prepared.path,
+ len(original),
+ len(prepared.text_content),
+ )
+ remaining -= len(prepared.text_content)
+ return truncated_any
+
+
+def _build_text_preamble(
+ *,
+ prompt: str,
+ prepared_files: list[_PreparedFile],
+ omitted_files: int,
+ text_truncated: bool,
+) -> str:
+ lines = [
+ "User request:",
+ prompt,
+ "",
+ "Resolved files:",
+ ]
+ text_sections: list[str] = []
+ has_attachments = False
+ for prepared in prepared_files:
+ lines.append(f"- {_format_file_metadata(prepared)}")
+ if prepared.text_content is not None:
+ text_sections.append(f"### File:
{prepared.path}\n{prepared.text_content}")
+ if prepared.attachment is not None:
+ has_attachments = True
+ if omitted_files:
+ lines.append(f"- omitted_files={omitted_files} (max_files limit
reached)")
+ if text_truncated:
+ lines.append("- text_context_truncated=True")
+
+ if text_sections:
+ lines.extend(["", "Normalized content:", *text_sections])
+ if has_attachments:
+ lines.extend(
+ [
+ "",
+ "Attached multimodal files follow this text block.",
+ "Use the matching file metadata above when referring to those
attachments.",
+ ]
+ )
+ return "\n".join(lines)
+
+
+def _truncate_text(text: str, *, max_chars: int = _TEXT_SAMPLE_HEAD_CHARS +
_TEXT_SAMPLE_TAIL_CHARS) -> str:
+ if len(text) <= max_chars:
+ return text
+ if max_chars <= 32:
+ return text[:max_chars]
+ if max_chars >= _TEXT_SAMPLE_HEAD_CHARS + _TEXT_SAMPLE_TAIL_CHARS:
+ head = _TEXT_SAMPLE_HEAD_CHARS
+ else:
+ head = max_chars // 2
+ tail = max_chars - head - len("\n...\n")
+ if tail <= 0:
+ return text[:max_chars]
+ return f"{text[:head]}\n...\n{text[-tail:]}"
+
+
+def _format_file_metadata(prepared: _PreparedFile) -> str:
+ metadata = [
+ f"path={prepared.path}",
+ f"format={prepared.file_format}",
+ f"size_bytes={prepared.size_bytes}",
+ ]
+ if prepared.compression:
+ metadata.append(f"compression={prepared.compression}")
+ if prepared.estimated_rows is not None:
+ metadata.append(f"estimated_rows={prepared.estimated_rows}")
+ if prepared.partitions:
+ metadata.append(f"partitions={list(prepared.partitions)}")
+ if prepared.content_truncated:
+ metadata.append("content_truncated=True")
+ if prepared.content_omitted:
+ metadata.append("content_omitted=True")
+ if prepared.attachment is not None:
+ metadata.append("attached_as_binary=True")
+ return ", ".join(metadata)
+
+
+def _infer_partitions(path: ObjectStoragePath) -> tuple[str, ...]:
+ pure_path = PurePosixPath(path.path)
+ return tuple(part for part in pure_path.parts if "=" in part)
diff --git a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
b/providers/common/ai/tests/unit/common/ai/assets/__init__.py
similarity index 75%
copy from providers/common/ai/src/airflow/providers/common/ai/exceptions.py
copy to providers/common/ai/tests/unit/common/ai/assets/__init__.py
index 7c83f30d68a..13a83393a91 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/exceptions.py
+++ b/providers/common/ai/tests/unit/common/ai/assets/__init__.py
@@ -14,10 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-from airflow.providers.common.compat.sdk import AirflowException
-
-
-class HITLMaxIterationsError(AirflowException):
- """Raised when the HITL review loop exhausts max iterations without
approval or rejection."""
diff --git
a/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png
b/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png
new file mode 100644
index 00000000000..e7179f8cac5
Binary files /dev/null and
b/providers/common/ai/tests/unit/common/ai/assets/airflow-3-task-sdk.png differ
diff --git
a/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py
new file mode 100644
index 00000000000..5b32f061862
--- /dev/null
+++
b/providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from airflow.providers.common.ai.decorators.llm_file_analysis import
_LLMFileAnalysisDecoratedOperator
+from airflow.providers.common.ai.utils.file_analysis import FileAnalysisRequest
+
+
+def _make_mock_run_result(output):
+ mock_result = MagicMock(spec=["output", "usage", "response",
"all_messages"])
+ mock_result.output = output
+ mock_result.usage.return_value = MagicMock(
+ spec=["requests", "tool_calls", "input_tokens", "output_tokens",
"total_tokens"],
+ requests=1,
+ tool_calls=0,
+ input_tokens=0,
+ output_tokens=0,
+ total_tokens=0,
+ )
+ mock_result.response = MagicMock(spec=["model_name"],
model_name="test-model")
+ mock_result.all_messages.return_value = []
+ return mock_result
+
+
+class TestLLMFileAnalysisDecoratedOperator:
+ def test_custom_operator_name(self):
+ assert _LLMFileAnalysisDecoratedOperator.custom_operator_name ==
"@task.llm_file_analysis"
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_calls_callable_and_returns_output(self,
mock_build_request, mock_hook_cls):
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value = _make_mock_run_result("This is a
summary.")
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ def my_prompt():
+ return "Summarize this text"
+
+ op = _LLMFileAnalysisDecoratedOperator(
+ task_id="test",
+ python_callable=my_prompt,
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ )
+ result = op.execute(context={})
+
+ assert result == "This is a summary."
+ assert op.prompt == "Summarize this text"
+ mock_agent.run_sync.assert_called_once_with("prepared prompt")
+
+ @pytest.mark.parametrize(
+ "return_value",
+ [42, "", " ", None],
+ ids=["non-string", "empty", "whitespace", "none"],
+ )
+ def test_execute_raises_on_invalid_prompt(self, return_value):
+ op = _LLMFileAnalysisDecoratedOperator(
+ task_id="test",
+ python_callable=lambda: return_value,
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ )
+ with pytest.raises(TypeError, match="non-empty string"):
+ op.execute(context={})
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_merges_op_kwargs_into_callable(self, mock_build_request,
mock_hook_cls):
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value = _make_mock_run_result("done")
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ def my_prompt(topic):
+ return f"Summarize {topic}"
+
+ op = _LLMFileAnalysisDecoratedOperator(
+ task_id="test",
+ python_callable=my_prompt,
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ op_kwargs={"topic": "system logs"},
+ )
+ op.execute(context={"task_instance": MagicMock(spec=["task_id"])})
+
+ assert op.prompt == "Summarize system logs"
+ mock_agent.run_sync.assert_called_once_with("prepared prompt")
diff --git
a/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py
b/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py
new file mode 100644
index 00000000000..abd98d92326
--- /dev/null
+++
b/providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py
@@ -0,0 +1,303 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+from unittest.mock import MagicMock, patch
+from uuid import uuid4
+
+import pytest
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.operators.llm_file_analysis import
LLMFileAnalysisOperator
+from airflow.providers.common.ai.utils.file_analysis import FileAnalysisRequest
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+
+
+def _make_mock_run_result(output):
+ mock_result = MagicMock(spec=["output", "usage", "response",
"all_messages"])
+ mock_result.output = output
+ mock_result.usage.return_value = MagicMock(
+ spec=["requests", "tool_calls", "input_tokens", "output_tokens",
"total_tokens"],
+ requests=1,
+ tool_calls=0,
+ input_tokens=0,
+ output_tokens=0,
+ total_tokens=0,
+ )
+ mock_result.response = MagicMock(spec=["model_name"],
model_name="test-model")
+ mock_result.all_messages.return_value = []
+ return mock_result
+
+
+def _make_context(ti_id=None):
+ ti_id = ti_id or uuid4()
+ ti = MagicMock(spec=["id"])
+ ti.id = ti_id
+ context = MagicMock(spec=["__getitem__"])
+ context.__getitem__.side_effect = lambda key: {"task_instance": ti}[key]
+ return context
+
+
+class TestLLMFileAnalysisOperator:
+ def test_template_fields(self):
+ expected = {
+ "prompt",
+ "llm_conn_id",
+ "model_id",
+ "system_prompt",
+ "agent_params",
+ "file_path",
+ "file_conn_id",
+ }
+ assert set(LLMFileAnalysisOperator.template_fields) == expected
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_returns_string_output(self, mock_build_request,
mock_hook_cls):
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value = _make_mock_run_result("Analysis
complete")
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ op = LLMFileAnalysisOperator(
+ task_id="test",
+ prompt="Summarize the file",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ )
+ result = op.execute(context={})
+
+ assert result == "Analysis complete"
+ mock_build_request.assert_called_once_with(
+ file_path="/tmp/app.log",
+ file_conn_id=None,
+ prompt="Summarize the file",
+ multi_modal=False,
+ max_files=20,
+ max_file_size_bytes=5 * 1024 * 1024,
+ max_total_size_bytes=20 * 1024 * 1024,
+ max_text_chars=100_000,
+ sample_rows=10,
+ )
+ mock_agent.run_sync.assert_called_once_with("prepared prompt")
+
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_structured_output_serializes_model(self,
mock_build_request, mock_hook_cls):
+ class Summary(BaseModel):
+ findings: list[str]
+
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value =
_make_mock_run_result(Summary(findings=["error spike"]))
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ op = LLMFileAnalysisOperator(
+ task_id="test",
+ prompt="Summarize the file",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ output_type=Summary,
+ )
+ result = op.execute(context={})
+
+ assert result == {"findings": ["error spike"]}
+
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_parameter_validation(self, mock_build_request):
+ with pytest.raises(ValueError, match="max_files"):
+ LLMFileAnalysisOperator(
+ task_id="test",
+ prompt="p",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ max_files=0,
+ )
+ with pytest.raises(ValueError, match="sample_rows"):
+ LLMFileAnalysisOperator(
+ task_id="test",
+ prompt="p",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ sample_rows=0,
+ )
+ mock_build_request.assert_not_called()
+
+
[email protected](
+ not AIRFLOW_V_3_1_PLUS, reason="Human in the loop is only compatible with
Airflow >= 3.1.0"
+)
+class TestLLMFileAnalysisOperatorApproval:
+ class Summary(BaseModel):
+ findings: list[str]
+
+ @patch("airflow.providers.standard.triggers.hitl.HITLTrigger",
autospec=True)
+ @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_with_approval_defers(
+ self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+ ):
+ from airflow.providers.common.compat.sdk import TaskDeferred
+
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value = _make_mock_run_result("LLM
response")
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ op = LLMFileAnalysisOperator(
+ task_id="approval_test",
+ prompt="Summarize this",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ require_approval=True,
+ )
+ ctx = _make_context()
+
+ with pytest.raises(TaskDeferred) as exc_info:
+ op.execute(context=ctx)
+
+ assert exc_info.value.method_name == "execute_complete"
+ assert exc_info.value.kwargs["generated_output"] == "LLM response"
+ mock_upsert.assert_called_once()
+
+ @patch("airflow.providers.standard.triggers.hitl.HITLTrigger",
autospec=True)
+ @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_with_approval_defers_structured_output_as_json(
+ self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+ ):
+ from airflow.providers.common.compat.sdk import TaskDeferred
+
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value =
_make_mock_run_result(self.Summary(findings=["error spike"]))
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ op = LLMFileAnalysisOperator(
+ task_id="approval_structured_test",
+ prompt="Summarize this",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ output_type=self.Summary,
+ require_approval=True,
+ )
+
+ with pytest.raises(TaskDeferred) as exc_info:
+ op.execute(context=_make_context())
+
+ assert exc_info.value.kwargs["generated_output"] ==
'{"findings":["error spike"]}'
+ mock_upsert.assert_called_once()
+
+ def test_execute_complete_with_approval_restores_structured_output(self):
+ op = LLMFileAnalysisOperator(
+ task_id="approval_complete_test",
+ prompt="Summarize this",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ output_type=self.Summary,
+ require_approval=True,
+ )
+ event = {"chosen_options": [op.APPROVE], "params_input": {},
"responded_by_user": "reviewer"}
+
+ result = op.execute_complete({}, generated_output='{"findings":["error
spike"]}', event=event)
+
+ assert result == {"findings": ["error spike"]}
+
+ def
test_execute_complete_with_approval_restores_modified_structured_output(self):
+ op = LLMFileAnalysisOperator(
+ task_id="approval_complete_modified_test",
+ prompt="Summarize this",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ output_type=self.Summary,
+ require_approval=True,
+ allow_modifications=True,
+ )
+ event = {
+ "chosen_options": [op.APPROVE],
+ "params_input": {"output": '{"findings":["reviewed output"]}'},
+ "responded_by_user": "reviewer",
+ }
+
+ result = op.execute_complete({}, generated_output='{"findings":["error
spike"]}', event=event)
+
+ assert result == {"findings": ["reviewed output"]}
+
+ @patch("airflow.providers.standard.triggers.hitl.HITLTrigger",
autospec=True)
+ @patch("airflow.sdk.execution_time.hitl.upsert_hitl_detail")
+ @patch("airflow.providers.common.ai.operators.llm.PydanticAIHook",
autospec=True)
+ @patch(
+
"airflow.providers.common.ai.operators.llm_file_analysis.build_file_analysis_request",
autospec=True
+ )
+ def test_execute_with_approval_timeout(
+ self, mock_build_request, mock_hook_cls, mock_upsert, mock_trigger_cls
+ ):
+ from airflow.providers.common.compat.sdk import TaskDeferred
+
+ mock_build_request.return_value = FileAnalysisRequest(
+ user_content="prepared prompt",
+ resolved_paths=["/tmp/app.log"],
+ total_size_bytes=10,
+ )
+ mock_agent = MagicMock(spec=["run_sync"])
+ mock_agent.run_sync.return_value = _make_mock_run_result("output")
+ mock_hook_cls.get_hook.return_value.create_agent.return_value =
mock_agent
+
+ timeout = timedelta(hours=1)
+ op = LLMFileAnalysisOperator(
+ task_id="timeout_test",
+ prompt="p",
+ llm_conn_id="my_llm",
+ file_path="/tmp/app.log",
+ require_approval=True,
+ approval_timeout=timeout,
+ )
+
+ with pytest.raises(TaskDeferred) as exc_info:
+ op.execute(context=_make_context())
+
+ assert exc_info.value.timeout == timeout
diff --git
a/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py
b/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py
new file mode 100644
index 00000000000..32091ffa59b
--- /dev/null
+++ b/providers/common/ai/tests/unit/common/ai/utils/test_file_analysis.py
@@ -0,0 +1,525 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import builtins
+import gzip
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+from pydantic_ai.messages import BinaryContent
+
+from airflow.providers.common.ai.exceptions import (
+ LLMFileAnalysisLimitExceededError,
+ LLMFileAnalysisMultimodalRequiredError,
+ LLMFileAnalysisUnsupportedFormatError,
+)
+from airflow.providers.common.ai.utils.file_analysis import (
+ FileAnalysisRequest,
+ _infer_partitions,
+ _read_raw_bytes,
+ _render_avro,
+ _render_parquet,
+ _resolve_paths,
+ _truncate_text,
+ build_file_analysis_request,
+ detect_file_format,
+)
+from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException, ObjectStoragePath
+
+AIRFLOW_PNG_PATH = Path(__file__).resolve().parents[1] / "assets" /
"airflow-3-task-sdk.png"
+
+
+class TestBuildFileAnalysisRequest:
+ def test_text_file_analysis(self, tmp_path):
+ path = tmp_path / "app.log"
+ path.write_text("first line\nsecond line\n", encoding="utf-8")
+
+ request = build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Summarize the log",
+ multi_modal=False,
+ max_files=20,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=2048,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ assert isinstance(request, FileAnalysisRequest)
+ assert request.resolved_paths == [str(path)]
+ assert "Summarize the log" in request.user_content
+ assert "first line" in request.user_content
+ assert "format=log" in request.user_content
+
+ def test_file_conn_id_overrides_embedded_connection(self, tmp_path):
+ path = tmp_path / "data.json"
+ path.write_text('{"status": "ok"}', encoding="utf-8")
+
+ with patch(
+
"airflow.providers.common.ai.utils.file_analysis.ObjectStoragePath",
autospec=True
+ ) as mock_path:
+ mock_instance = MagicMock(spec=ObjectStoragePath)
+ mock_handle = MagicMock(spec=["read"])
+ mock_handle.read.side_effect = [b'{"status": "ok"}', b""]
+ mock_instance.is_file.return_value = True
+ mock_instance.stat.return_value.st_size = 16
+ mock_instance.suffixes = [".json"]
+ mock_instance.path = str(path)
+ mock_instance.open.return_value.__enter__.return_value =
mock_handle
+ mock_path.return_value = mock_instance
+
+ build_file_analysis_request(
+ file_path="s3://embedded_conn@bucket/data.json",
+ file_conn_id="override_conn",
+ prompt="Inspect",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+
mock_path.assert_called_once_with("s3://embedded_conn@bucket/data.json",
conn_id="override_conn")
+
+ def test_sample_rows_must_be_positive(self, tmp_path):
+ path = tmp_path / "metrics.csv"
+ path.write_text("name,value\nalpha,1\n", encoding="utf-8")
+
+ with pytest.raises(ValueError, match="sample_rows must be greater than
zero"):
+ build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=0,
+ )
+
+ def test_prefix_aggregation_is_sorted_and_omits_extra_files(self,
tmp_path):
+ (tmp_path / "b.log").write_text("b", encoding="utf-8")
+ (tmp_path / "a.log").write_text("a", encoding="utf-8")
+ (tmp_path / "c.log").write_text("c", encoding="utf-8")
+
+ request = build_file_analysis_request(
+ file_path=str(tmp_path),
+ file_conn_id=None,
+ prompt="Summarize",
+ multi_modal=False,
+ max_files=2,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=2048,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ assert request.resolved_paths == [str(tmp_path / "a.log"),
str(tmp_path / "b.log")]
+ assert request.omitted_files == 1
+ assert "omitted_files=1" in request.user_content
+
+ def test_multi_modal_image_creates_binary_attachment(self):
+ request = build_file_analysis_request(
+ file_path=str(AIRFLOW_PNG_PATH),
+ file_conn_id=None,
+ prompt="Check for anomalies",
+ multi_modal=True,
+ max_files=1,
+ max_file_size_bytes=256 * 1024,
+ max_total_size_bytes=256 * 1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ assert isinstance(request.user_content, list)
+ assert isinstance(request.user_content[1], BinaryContent)
+ assert request.user_content[1].media_type == "image/png"
+ assert request.resolved_paths == [str(AIRFLOW_PNG_PATH)]
+ assert "attached_as_binary=True" in request.user_content[0]
+
+ def test_multimodal_required_for_image_and_pdf(self, tmp_path):
+ path = tmp_path / "report.pdf"
+ path.write_bytes(b"%PDF-1.7")
+
+ with pytest.raises(LLMFileAnalysisMultimodalRequiredError,
match="multi_modal=True"):
+ build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ def test_unsupported_suffix_raises(self, tmp_path):
+ path = tmp_path / "payload.bin"
+ path.write_bytes(b"abc")
+
+ with pytest.raises(LLMFileAnalysisUnsupportedFormatError,
match="Unsupported file format"):
+ build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ def test_single_file_limit_failure(self, tmp_path):
+ path = tmp_path / "big.log"
+ path.write_text("abcdef", encoding="utf-8")
+
+ with patch(
+ "airflow.providers.common.ai.utils.file_analysis._prepare_file",
autospec=True
+ ) as mock_prepare:
+ with pytest.raises(LLMFileAnalysisLimitExceededError,
match="per-file limit"):
+ build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=5,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ mock_prepare.assert_not_called()
+
+ def test_total_size_limit_failure(self, tmp_path):
+ (tmp_path / "a.log").write_text("abc", encoding="utf-8")
+ (tmp_path / "b.log").write_text("def", encoding="utf-8")
+
+ with patch(
+ "airflow.providers.common.ai.utils.file_analysis._prepare_file",
autospec=True
+ ) as mock_prepare:
+ with pytest.raises(LLMFileAnalysisLimitExceededError, match="Total
input size exceeds"):
+ build_file_analysis_request(
+ file_path=str(tmp_path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=10,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=5,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ mock_prepare.assert_not_called()
+
+ def test_gzip_expansion_respects_processed_content_limit(self, tmp_path):
+ path = tmp_path / "big.log.gz"
+ path.write_bytes(gzip.compress(b"A" * 20_000))
+
+ with pytest.raises(LLMFileAnalysisLimitExceededError,
match="processed-content limit"):
+ build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=256,
+ max_total_size_bytes=256,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ def test_gzip_expansion_respects_total_processed_content_limit(self,
tmp_path):
+ (tmp_path / "a.log.gz").write_bytes(gzip.compress(b"A" * 1_000))
+ (tmp_path / "b.log.gz").write_bytes(gzip.compress(b"B" * 1_000))
+
+ with pytest.raises(LLMFileAnalysisLimitExceededError,
match="processed-content limit"):
+ build_file_analysis_request(
+ file_path=str(tmp_path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=10,
+ max_file_size_bytes=2_048,
+ max_total_size_bytes=1_500,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ def test_json_is_pretty_printed(self, tmp_path):
+ path = tmp_path / "data.json"
+ path.write_text('{"b": 2, "a": 1}', encoding="utf-8")
+
+ request = build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=10,
+ )
+
+ assert '"a": 1' in request.user_content
+ assert '"b": 2' in request.user_content
+
+ def test_text_context_truncation_is_marked(self, tmp_path):
+ path = tmp_path / "huge.log"
+ path.write_text("line\n" * 400, encoding="utf-8")
+
+ request = build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=16_384,
+ max_total_size_bytes=16_384,
+ max_text_chars=100,
+ sample_rows=10,
+ )
+
+ assert request.text_truncated is True
+ assert "text_context_truncated=True" in request.user_content
+
+ def test_csv_sample_rows_is_configurable(self, tmp_path):
+ path = tmp_path / "metrics.csv"
+ path.write_text("name,value\nalpha,1\nbeta,2\ngamma,3\n",
encoding="utf-8")
+
+ request = build_file_analysis_request(
+ file_path=str(path),
+ file_conn_id=None,
+ prompt="Analyze",
+ multi_modal=False,
+ max_files=1,
+ max_file_size_bytes=1024,
+ max_total_size_bytes=1024,
+ max_text_chars=500,
+ sample_rows=1,
+ )
+
+ assert "alpha, 1" in request.user_content
+ assert "beta, 2" not in request.user_content
+ assert "gamma, 3" not in request.user_content
+
+
+class TestFileAnalysisHelpers:
+ @pytest.mark.parametrize(
+ ("filename", "expected_format", "expected_compression"),
+ [
+ ("events.csv", "csv", None),
+ ("events.csv.gz", "csv", "gzip"),
+ ("dashboard.jpg", "jpg", None),
+ ("report.pdf", "pdf", None),
+ ("app", "log", None),
+ ],
+ )
+ def test_detect_file_format(self, tmp_path, filename, expected_format,
expected_compression):
+ path = tmp_path / filename
+ path.write_bytes(b"content")
+
+ file_format, compression =
detect_file_format(ObjectStoragePath(str(path)))
+
+ assert file_format == expected_format
+ assert compression == expected_compression
+
+ def test_detect_file_format_rejects_unsupported_compression(self,
tmp_path):
+ path = tmp_path / "events.csv.zst"
+ path.write_bytes(b"content")
+
+ with pytest.raises(LLMFileAnalysisUnsupportedFormatError,
match="Compression"):
+ detect_file_format(ObjectStoragePath(str(path)))
+
+ @pytest.mark.parametrize("filename", ["sample.parquet.gz",
"sample.avro.gz", "sample.png.gz"])
+ def
test_detect_file_format_rejects_unsupported_gzip_format_combinations(self,
tmp_path, filename):
+ path = tmp_path / filename
+ path.write_bytes(b"content")
+
+ with pytest.raises(LLMFileAnalysisUnsupportedFormatError, match="not
supported for"):
+ detect_file_format(ObjectStoragePath(str(path)))
+
+ def test_read_raw_bytes_decompresses_gzip(self, tmp_path):
+ path = tmp_path / "events.log.gz"
+ path.write_bytes(gzip.compress(b"line one\nline two\n"))
+
+ content = _read_raw_bytes(ObjectStoragePath(str(path)),
compression="gzip", max_bytes=1_024)
+
+ assert content == b"line one\nline two\n"
+
+ def test_truncate_text_preserves_head_and_tail(self):
+ text = "A" * 9_000 + "B" * 3_000
+
+ truncated = _truncate_text(text)
+
+ assert truncated.startswith("A" * 8_000)
+ assert truncated.endswith("B" * 1_995)
+ assert "\n...\n" in truncated
+
+ def test_truncate_text_with_small_max_chars_returns_prefix_only(self):
+ assert _truncate_text("abcdefghij", max_chars=8) == "abcdefgh"
+
+ def test_infer_partitions(self, tmp_path):
+ path = tmp_path / "dt=2024-01-15" / "region=us" / "events.csv"
+ path.parent.mkdir(parents=True)
+ path.write_text("id\n1\n", encoding="utf-8")
+
+ partitions = _infer_partitions(ObjectStoragePath(str(path)))
+
+ assert partitions == ("dt=2024-01-15", "region=us")
+
+ def test_resolve_paths_raises_when_no_files_found(self, tmp_path):
+ empty_dir = ObjectStoragePath(str(tmp_path / "missing"))
+
+ with pytest.raises(FileNotFoundError, match="No files found"):
+ _resolve_paths(root=empty_dir, max_files=10)
+
+
+class TestFormatReaders:
+ def test_render_parquet_uses_lazy_import(self, tmp_path):
+ pyarrow = pytest.importorskip("pyarrow")
+ pq = pytest.importorskip("pyarrow.parquet")
+
+ path = tmp_path / "sample.parquet"
+ table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}])
+ pq.write_table(table, path)
+
+ result = _render_parquet(ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=1_024)
+
+ assert result.estimated_rows == 2
+ assert "Schema: id: int64" in result.text
+ assert '"id": 1' in result.text
+ assert '"id": 2' not in result.text
+
+ def test_render_parquet_does_not_materialize_full_table(self, tmp_path):
+ pyarrow = pytest.importorskip("pyarrow")
+ pq = pytest.importorskip("pyarrow.parquet")
+
+ path = tmp_path / "sample.parquet"
+ table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}, {"id": 3}])
+ pq.write_table(table, path, row_group_size=1)
+
+ with patch("pyarrow.parquet.ParquetFile.read", autospec=True,
side_effect=AssertionError):
+ result = _render_parquet(ObjectStoragePath(str(path)),
sample_rows=2, max_content_bytes=4_096)
+
+ assert result.estimated_rows == 3
+ assert '"id": 1' in result.text
+ assert '"id": 2' in result.text
+ assert '"id": 3' not in result.text
+
+ def test_render_parquet_enforces_processed_content_limit_before_read(self,
tmp_path):
+ pyarrow = pytest.importorskip("pyarrow")
+ pq = pytest.importorskip("pyarrow.parquet")
+
+ path = tmp_path / "sample.parquet"
+ table = pyarrow.Table.from_pylist([{"id": 1}, {"id": 2}])
+ pq.write_table(table, path)
+
+ with pytest.raises(LLMFileAnalysisLimitExceededError,
match="processed-content limit"):
+ _render_parquet(
+ ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=path.stat().st_size - 1
+ )
+
+ def test_render_parquet_missing_dependency_raises(self, tmp_path):
+ path = tmp_path / "sample.parquet"
+ path.write_bytes(b"parquet")
+ real_import = builtins.__import__
+
+ def failing_import(name, *args, **kwargs):
+ if name in {"pyarrow", "pyarrow.parquet"}:
+ raise ImportError("missing pyarrow")
+ return real_import(name, *args, **kwargs)
+
+ with patch("builtins.__import__", side_effect=failing_import):
+ with pytest.raises(AirflowOptionalProviderFeatureException,
match="parquet"):
+ _render_parquet(ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=1_024)
+
+ def test_render_avro_uses_lazy_import(self, tmp_path):
+ fastavro = pytest.importorskip("fastavro")
+
+ path = tmp_path / "sample.avro"
+ schema = {
+ "type": "record",
+ "name": "sample",
+ "fields": [{"name": "id", "type": "long"}],
+ }
+ with path.open("wb") as handle:
+ fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+ result = _render_avro(ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=1_024)
+
+ assert result.estimated_rows is None
+ assert '"name": "sample"' in result.text
+ assert '"id": 1' in result.text
+ assert '"id": 2' not in result.text
+
+ def test_render_avro_estimates_rows_when_fully_read(self, tmp_path):
+ fastavro = pytest.importorskip("fastavro")
+
+ path = tmp_path / "sample.avro"
+ schema = {
+ "type": "record",
+ "name": "sample",
+ "fields": [{"name": "id", "type": "long"}],
+ }
+ with path.open("wb") as handle:
+ fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+ result = _render_avro(ObjectStoragePath(str(path)), sample_rows=5,
max_content_bytes=1_024)
+
+ assert result.estimated_rows == 2
+ assert '"id": 2' in result.text
+
+ def test_render_avro_enforces_processed_content_limit_before_scan(self,
tmp_path):
+ fastavro = pytest.importorskip("fastavro")
+
+ path = tmp_path / "sample.avro"
+ schema = {
+ "type": "record",
+ "name": "sample",
+ "fields": [{"name": "id", "type": "long"}],
+ }
+ with path.open("wb") as handle:
+ fastavro.writer(handle, schema, [{"id": 1}, {"id": 2}])
+
+ with pytest.raises(LLMFileAnalysisLimitExceededError,
match="processed-content limit"):
+ _render_avro(
+ ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=path.stat().st_size - 1
+ )
+
+ def test_render_avro_missing_dependency_raises(self, tmp_path):
+ path = tmp_path / "sample.avro"
+ path.write_bytes(b"avro")
+ real_import = builtins.__import__
+
+ def failing_import(name, *args, **kwargs):
+ if name == "fastavro":
+ raise ImportError("missing fastavro")
+ return real_import(name, *args, **kwargs)
+
+ with patch("builtins.__import__", side_effect=failing_import):
+ with pytest.raises(AirflowOptionalProviderFeatureException,
match="avro"):
+ _render_avro(ObjectStoragePath(str(path)), sample_rows=1,
max_content_bytes=1_024)
diff --git a/uv.lock b/uv.lock
index a06e11966a3..b79d7562ac9 100644
--- a/uv.lock
+++ b/uv.lock
@@ -3874,6 +3874,9 @@ dependencies = [
anthropic = [
{ name = "pydantic-ai-slim", extra = ["anthropic"] },
]
+avro = [
+ { name = "fastavro" },
+]
bedrock = [
{ name = "pydantic-ai-slim", extra = ["bedrock"] },
]
@@ -3889,6 +3892,9 @@ mcp = [
openai = [
{ name = "pydantic-ai-slim", extra = ["openai"] },
]
+parquet = [
+ { name = "pyarrow" },
+]
sql = [
{ name = "apache-airflow-providers-common-sql" },
{ name = "sqlglot" },
@@ -3916,6 +3922,10 @@ requires-dist = [
{ name = "apache-airflow-providers-common-sql", marker = "extra ==
'common-sql'", editable = "providers/common/sql" },
{ name = "apache-airflow-providers-common-sql", marker = "extra == 'sql'",
editable = "providers/common/sql" },
{ name = "apache-airflow-providers-standard", editable =
"providers/standard" },
+ { name = "fastavro", marker = "python_full_version >= '3.14' and extra ==
'avro'", specifier = ">=1.12.1" },
+ { name = "fastavro", marker = "python_full_version < '3.14' and extra ==
'avro'", specifier = ">=1.10.0" },
+ { name = "pyarrow", marker = "python_full_version >= '3.14' and extra ==
'parquet'", specifier = ">=22.0.0" },
+ { name = "pyarrow", marker = "python_full_version < '3.14' and extra ==
'parquet'", specifier = ">=18.0.0" },
{ name = "pydantic-ai-slim", specifier = ">=1.34.0" },
{ name = "pydantic-ai-slim", extras = ["anthropic"], marker = "extra ==
'anthropic'" },
{ name = "pydantic-ai-slim", extras = ["bedrock"], marker = "extra ==
'bedrock'" },
@@ -3924,7 +3934,7 @@ requires-dist = [
{ name = "pydantic-ai-slim", extras = ["openai"], marker = "extra ==
'openai'" },
{ name = "sqlglot", marker = "extra == 'sql'", specifier = ">=30.0.0" },
]
-provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "sql",
"common-sql"]
+provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "avro",
"parquet", "sql", "common-sql"]
[package.metadata.requires-dev]
dev = [