This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip99-doc-loader in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5e1c8abea8ed7b75f0adcc5cc56bff90820e570e Author: Vikram Koka <[email protected]> AuthorDate: Mon May 18 15:48:14 2026 +0100 Add DocumentLoaderOperator to common.ai provider - Adds DocumentLoaderOperator, a framework-agnostic file parser that bridges Airflow's connectivity layer (hooks returning bytes/files) and the AI embedding layer (operators needing list[dict(text, metadata)]). No LlamaIndex, LangChain, or other AI framework dependency. - Built-in parsers for .txt, .md, .csv, .json with zero extra deps. PDF (via pypdf, BSD) and DOCX (via python-docx, MIT) available as optional extras: pip install apache-airflow-providers-common-ai[pdf] / [docx]. - Supports two input modes: source_path (local file, directory, or glob pattern) and source_bytes (raw bytes from XCom). Output is list[dict(text, metadata)], the same shape consumed by downstream embedding operators. Motivation File parsing is the highest-volume gap in Airflow's AI story Every RAG pipeline on Airflow currently requires custom parsing code. This operator makes it a single line in a Dag. What's included ┌────────────────────────────────────┬───────────────────────────────────────────┐ │ File │ Purpose │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ operators/document_loader.py │ Operator (~270 lines) │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ tests/.../test_document_loader.py │ 26 unit tests │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/document_loader.rst │ Usage docs │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ provider.yaml │ Operator registration + how-to-guide link │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ pyproject.toml │ [pdf] and [docx] optional dependencies │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/index.rst │ Chooser table row │ └────────────────────────────────────┴───────────────────────────────────────────┘ Test plan - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py -xvs (26 tests) - Built-in parsers: txt, md, csv (one doc per row), json (single object and array) - PDF/DOCX parsers: mocked via sys.modules injection (packages not installed in test env) - ImportError guidance when optional packages are missing - Init validation: mutual exclusion of source_path/source_bytes, file_type required with source_bytes - File discovery: glob patterns, extension filtering, empty directories - Output shape: every item has text and metadata, file_name/file_path in metadata, custom metadata_fields merged --- .../common/ai/docs/operators/document_loader.rst | 194 +++++++++++++ providers/common/ai/docs/operators/index.rst | 3 + providers/common/ai/provider.yaml | 2 + providers/common/ai/pyproject.toml | 2 + .../common/ai/operators/document_loader.py | 270 +++++++++++++++++ .../common/ai/operators/test_document_loader.py | 319 +++++++++++++++++++++ 6 files changed, 790 insertions(+) diff --git a/providers/common/ai/docs/operators/document_loader.rst b/providers/common/ai/docs/operators/document_loader.rst new file mode 100644 index 00000000000..39f4d9d501c --- /dev/null +++ b/providers/common/ai/docs/operators/document_loader.rst @@ -0,0 +1,194 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:document_loader: + +``DocumentLoaderOperator`` +========================== + +Use :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` +to parse files into ``list[dict(text, metadata)]`` for downstream embedding +pipelines. The operator bridges Airflow's connectivity layer (hooks that +produce bytes or local files) and the AI embedding layer (operators that +need structured text with metadata). + +The operator is **framework-agnostic** — it has no dependency on LlamaIndex, +LangChain, or any other AI framework. + +Built-in formats +---------------- + +``.txt``, ``.md``, ``.csv``, and ``.json`` are handled with zero extra +dependencies: + +.. code-block:: python + + from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/articles/", + ) + +CSV files produce one document per row. JSON files with a top-level array +produce one document per element; a single JSON object produces one document. + +PDF parsing +----------- + +Install the ``pdf`` extra to parse PDF files via `pypdf <https://pypdf.readthedocs.io/>`__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[pdf] + +.. code-block:: python + + load_pdfs = DocumentLoaderOperator( + task_id="load_pdfs", + source_path="/opt/airflow/data/reports/*.pdf", + ) + +Each page with extractable text becomes a separate document. Empty pages are +skipped. The ``page_number`` is included in the document metadata. + +DOCX parsing +------------ + +Install the ``docx`` extra to parse Word documents via +`python-docx <https://python-docx.readthedocs.io/>`__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[docx] + +.. code-block:: python + + load_word = DocumentLoaderOperator( + task_id="load_word", + source_path="/opt/airflow/data/specs/*.docx", + ) + +All non-empty paragraphs are concatenated into a single document per file. + +Glob patterns and filtering +---------------------------- + +Pass a glob pattern to ``source_path`` to match multiple files. Use +``file_extensions`` to limit which files are processed: + +.. code-block:: python + + load_filtered = DocumentLoaderOperator( + task_id="load_filtered", + source_path="/opt/airflow/data/mixed/*", + file_extensions=[".pdf", ".txt"], + ) + +Composing with downstream operators +------------------------------------ + +The output format (``list[dict(text, metadata)]``) is designed to feed +directly into embedding operators. For example, with the LlamaIndex +``EmbeddingOperator``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_path="/data/docs/*.pdf", + ) + + embed = EmbeddingOperator( + task_id="embed", + documents="{{ ti.xcom_pull(task_ids='load') }}", + llm_conn_id="openai_default", + ) + + load >> embed + +Composing with Airflow providers +--------------------------------- + +Use any Airflow provider to download files, then parse them with +``DocumentLoaderOperator``: + +.. code-block:: python + + from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator + + download = S3ToLocalFilesystemOperator( + task_id="download", + bucket_name="my-bucket", + key="documents/report.pdf", + local_path="/tmp/report.pdf", + ) + + load = DocumentLoaderOperator( + task_id="load", + source_path="/tmp/report.pdf", + ) + + download >> load + +For **structured API data** (Salesforce SOQL results, database query exports), +a ``@task`` that maps fields to text and metadata is more appropriate than +``DocumentLoaderOperator``, which is designed for binary file parsing: + +.. code-block:: python + + @task + def transform_cases(records: list[dict]) -> list[dict]: + return [ + { + "text": f"{r['Subject']}\n\n{r['Description']}", + "metadata": {"case_id": r["Id"], "source": "salesforce"}, + } + for r in records + ] + +Loading from bytes +------------------ + +When upstream tasks pass file content via XCom, use ``source_bytes`` with +an explicit ``file_type``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_bytes="{{ ti.xcom_pull(task_ids='fetch_file') }}", + file_type=".pdf", + ) + +Parameters +---------- + +- ``source_path``: Local file path, directory, or glob pattern. + Mutually exclusive with ``source_bytes``. +- ``source_bytes``: Raw file bytes from XCom. Requires ``file_type``. + Mutually exclusive with ``source_path``. +- ``file_type``: File extension hint (e.g. ``".pdf"``). Required with + ``source_bytes``. Optional with ``source_path`` to override + auto-detection. +- ``parser``: Parsing backend. ``"auto"`` (default) selects from the file + extension. Set explicitly to force a specific backend (e.g. ``"text"`` + to treat an unknown extension as plain text). +- ``file_extensions``: Filter which files to process when ``source_path`` + matches multiple files. +- ``metadata_fields``: Extra key-value pairs merged into every document's + metadata dict. diff --git a/providers/common/ai/docs/operators/index.rst b/providers/common/ai/docs/operators/index.rst index 89ba5d15e6c..967a43de281 100644 --- a/providers/common/ai/docs/operators/index.rst +++ b/providers/common/ai/docs/operators/index.rst @@ -46,6 +46,9 @@ to pick the one that fits your use case: * - Multi-turn reasoning with tools (DB queries, API calls, etc.) - :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` - ``@task.agent`` + * - Parse files (PDF, DOCX, CSV, etc.) into document dicts for embedding + - :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` + - *(no decorator)* **LLMOperator / @task.llm** — stateless, single-turn calls. Use this for classification, summarization, extraction, or any prompt that produces one response. Supports structured output diff --git a/providers/common/ai/provider.yaml b/providers/common/ai/provider.yaml index 2a13392ea99..d7ffaee57b9 100644 --- a/providers/common/ai/provider.yaml +++ b/providers/common/ai/provider.yaml @@ -41,6 +41,7 @@ integrations: - /docs/apache-airflow-providers-common-ai/operators/llm_branch.rst - /docs/apache-airflow-providers-common-ai/operators/llm_sql.rst - /docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst + - /docs/apache-airflow-providers-common-ai/operators/document_loader.rst tags: [ai] - integration-name: Pydantic AI external-doc-url: https://ai.pydantic.dev/ @@ -323,6 +324,7 @@ operators: - airflow.providers.common.ai.operators.llm_branch - airflow.providers.common.ai.operators.llm_sql - airflow.providers.common.ai.operators.llm_schema_compare + - airflow.providers.common.ai.operators.document_loader task-decorators: - class-name: airflow.providers.common.ai.decorators.agent.agent_task diff --git a/providers/common/ai/pyproject.toml b/providers/common/ai/pyproject.toml index 36dfcf450da..7958d95d2f1 100644 --- a/providers/common/ai/pyproject.toml +++ b/providers/common/ai/pyproject.toml @@ -95,6 +95,8 @@ dependencies = [ "common.sql" = [ "apache-airflow-providers-common-sql" ] +"pdf" = ["pypdf>=4.0.0"] +"docx" = ["python-docx>=1.0.0"] [dependency-groups] dev = [ diff --git a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py new file mode 100644 index 00000000000..e131d61617c --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(raw) + tmp_path = Path(tmp.name) + try: + return self._parse_file(tmp_path, ext) + finally: + os.unlink(tmp_path) + + text = raw.decode("utf-8") + if backend == "csv": + return self._parse_csv_text(text) + if backend == "json": + return self._parse_json_text(text) + return [{"text": text, "metadata": {}}] + + def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: + backend = self._resolve_backend(ext) + + if backend == "text": + return self._parse_text(file_path) + if backend == "csv": + return self._parse_csv(file_path) + if backend == "json": + return self._parse_json(file_path) + if backend == "pypdf": + return self._parse_pdf(file_path) + if backend == "python-docx": + return self._parse_docx(file_path) + + raise ValueError(f"No parser found for backend '{backend}'.") + + def _resolve_backend(self, ext: str) -> str: + if self.parser != "auto": + return self.parser + + ext = ext.lower() + if ext not in self.EXTENSION_BACKEND_MAP: + supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys())) + raise ValueError( + f"No parser registered for extension '{ext}'. " + f"Supported extensions: {supported}. " + f"Set 'parser' explicitly to override auto-detection." + ) + return self.EXTENSION_BACKEND_MAP[ext] + + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return [{"text": text, "metadata": {}}] + + def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_csv_text(text) + + def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: + reader = csv.DictReader(io.StringIO(text)) + documents = [] + for row_idx, row in enumerate(reader): + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) + documents.append( + { + "text": row_text, + "metadata": {"row_index": row_idx}, + } + ) + return documents + + def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_json_text(text) + + def _parse_json_text(self, text: str) -> list[dict[str, Any]]: + data = json.loads(text) + if isinstance(data, list): + return [ + {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} + for idx, item in enumerate(data) + ] + return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] + + def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: + try: + from pypdf import PdfReader + except ImportError: + raise ImportError( + "pypdf is required for PDF parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[pdf]" + ) + + reader = PdfReader(str(file_path)) + documents = [] + for page_num, page in enumerate(reader.pages): + text = page.extract_text() or "" + if text.strip(): + documents.append( + { + "text": text, + "metadata": {"page_number": page_num + 1}, + } + ) + return documents + + def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]: + try: + from docx import Document + except ImportError: + raise ImportError( + "python-docx is required for DOCX parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[docx]" + ) + + doc = Document(str(file_path)) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + text = "\n\n".join(paragraphs) + return [{"text": text, "metadata": {}}] diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py new file mode 100644 index 00000000000..14cf92ba14d --- /dev/null +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + +class TestDocumentLoaderInit: + def test_template_fields(self): + expected = {"source_path", "source_bytes", "metadata_fields"} + assert set(DocumentLoaderOperator.template_fields) == expected + + def test_both_sources_raises(self): + with pytest.raises(ValueError, match="not both"): + DocumentLoaderOperator(task_id="test", source_path="/tmp/file.txt", source_bytes=b"hello") + + def test_neither_source_raises(self): + with pytest.raises(ValueError, match="Provide exactly one"): + DocumentLoaderOperator(task_id="test") + + def test_source_bytes_without_file_type_raises(self): + with pytest.raises(ValueError, match="file_type"): + DocumentLoaderOperator(task_id="test", source_bytes=b"hello") + + +class TestTextParser: + def test_txt_file(self, tmp_path): + f = tmp_path / "doc.txt" + f.write_text("Hello world", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "Hello world" + assert result[0]["metadata"]["file_name"] == "doc.txt" + + def test_md_file(self, tmp_path): + f = tmp_path / "readme.md" + f.write_text("# Title\n\nSome content", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "# Title" in result[0]["text"] + + +class TestCsvParser: + def test_csv_one_doc_per_row(self, tmp_path): + f = tmp_path / "data.csv" + f.write_text("name,age\nAlice,30\nBob,25\n", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert "Alice" in result[0]["text"] + assert "Bob" in result[1]["text"] + assert result[0]["metadata"]["row_index"] == 0 + assert result[1]["metadata"]["row_index"] == 1 + + def test_csv_from_bytes(self): + raw = b"col1,col2\nval1,val2\n" + op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".csv") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "val1" in result[0]["text"] + + +class TestJsonParser: + def test_json_array(self, tmp_path): + f = tmp_path / "items.json" + data = [{"title": "First"}, {"title": "Second"}] + f.write_text(json.dumps(data), encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert result[0]["metadata"]["item_index"] == 0 + + def test_json_single_object(self, tmp_path): + f = tmp_path / "config.json" + f.write_text('{"key": "value"}', encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "key" in result[0]["text"] + + def test_json_from_bytes(self): + raw = b'[{"a": 1}, {"b": 2}]' + op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".json") + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + + +def _make_mock_pypdf_module(mock_reader): + """Create a fake pypdf module with a PdfReader that returns mock_reader.""" + mock_module = MagicMock() + mock_module.PdfReader = MagicMock(return_value=mock_reader) + return mock_module + + +def _make_mock_docx_module(mock_doc): + """Create a fake docx module with a Document that returns mock_doc.""" + mock_module = MagicMock() + mock_module.Document = MagicMock(return_value=mock_doc) + return mock_module + + +class TestPdfParser: + def test_pdf_parsing(self, tmp_path): + mock_page_1 = MagicMock() + mock_page_1.extract_text.return_value = "Page one content" + mock_page_2 = MagicMock() + mock_page_2.extract_text.return_value = "Page two content" + + mock_reader = MagicMock() + mock_reader.pages = [mock_page_1, mock_page_2] + + f = tmp_path / "report.pdf" + f.write_bytes(b"fake pdf bytes") + + mock_pypdf = _make_mock_pypdf_module(mock_reader) + with patch.dict("sys.modules", {"pypdf": mock_pypdf}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert result[0]["text"] == "Page one content" + assert result[0]["metadata"]["page_number"] == 1 + assert result[1]["metadata"]["page_number"] == 2 + + def test_pdf_skips_empty_pages(self, tmp_path): + mock_page = MagicMock() + mock_page.extract_text.return_value = " " + mock_reader = MagicMock() + mock_reader.pages = [mock_page] + + f = tmp_path / "empty.pdf" + f.write_bytes(b"fake pdf") + + mock_pypdf = _make_mock_pypdf_module(mock_reader) + with patch.dict("sys.modules", {"pypdf": mock_pypdf}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 0 + + def test_pdf_missing_raises_importerror(self, tmp_path): + f = tmp_path / "doc.pdf" + f.write_bytes(b"fake pdf") + + with patch.dict("sys.modules", {"pypdf": None}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[pdf\\]"): + op.execute(context=MagicMock()) + + +class TestDocxParser: + def test_docx_parsing(self, tmp_path): + mock_para_1 = MagicMock() + mock_para_1.text = "First paragraph" + mock_para_2 = MagicMock() + mock_para_2.text = "Second paragraph" + mock_para_empty = MagicMock() + mock_para_empty.text = " " + + mock_doc_obj = MagicMock() + mock_doc_obj.paragraphs = [mock_para_1, mock_para_empty, mock_para_2] + + f = tmp_path / "doc.docx" + f.write_bytes(b"fake docx") + + mock_docx = _make_mock_docx_module(mock_doc_obj) + with patch.dict("sys.modules", {"docx": mock_docx}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "First paragraph" in result[0]["text"] + assert "Second paragraph" in result[0]["text"] + + def test_docx_missing_raises_importerror(self, tmp_path): + f = tmp_path / "doc.docx" + f.write_bytes(b"fake docx") + + with patch.dict("sys.modules", {"docx": None}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[docx\\]"): + op.execute(context=MagicMock()) + + +class TestFileDiscovery: + def test_glob_multiple_files(self, tmp_path): + (tmp_path / "a.txt").write_text("file a", encoding="utf-8") + (tmp_path / "b.txt").write_text("file b", encoding="utf-8") + (tmp_path / "c.md").write_text("file c", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "*.txt")) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + texts = {doc["text"] for doc in result} + assert texts == {"file a", "file b"} + + def test_directory_source(self, tmp_path): + (tmp_path / "x.txt").write_text("hello", encoding="utf-8") + (tmp_path / "y.md").write_text("world", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + + def test_file_extensions_filter(self, tmp_path): + (tmp_path / "keep.txt").write_text("keep me", encoding="utf-8") + (tmp_path / "skip.md").write_text("skip me", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path), file_extensions=[".txt"]) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "keep me" + + def test_empty_directory_returns_empty(self, tmp_path): + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + result = op.execute(context=MagicMock()) + + assert result == [] + + def test_unknown_extension_raises(self, tmp_path): + f = tmp_path / "data.xyz" + f.write_text("some data", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ValueError, match="No parser registered"): + op.execute(context=MagicMock()) + + +class TestOutputShape: + def test_every_item_has_text_and_metadata(self, tmp_path): + (tmp_path / "a.txt").write_text("doc a", encoding="utf-8") + (tmp_path / "b.txt").write_text("doc b", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "*.txt")) + result = op.execute(context=MagicMock()) + + for doc in result: + assert "text" in doc + assert "metadata" in doc + assert isinstance(doc["text"], str) + assert isinstance(doc["metadata"], dict) + + def test_metadata_fields_appended(self, tmp_path): + f = tmp_path / "doc.txt" + f.write_text("content", encoding="utf-8") + + op = DocumentLoaderOperator( + task_id="test", + source_path=str(f), + metadata_fields={"source": "test_suite", "version": 2}, + ) + result = op.execute(context=MagicMock()) + + assert result[0]["metadata"]["source"] == "test_suite" + assert result[0]["metadata"]["version"] == 2 + + def test_file_metadata_included(self, tmp_path): + f = tmp_path / "report.txt" + f.write_text("content", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert result[0]["metadata"]["file_name"] == "report.txt" + assert "file_path" in result[0]["metadata"] + + def test_source_bytes_no_file_metadata(self): + op = DocumentLoaderOperator(task_id="test", source_bytes=b"hello", file_type=".txt") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "hello" + assert "file_name" not in result[0]["metadata"] + + def test_explicit_parser_override(self, tmp_path): + f = tmp_path / "data.log" + f.write_text("log line", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f), parser="text") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "log line"
