kaxil commented on code in PR #67120: URL: https://github.com/apache/airflow/pull/67120#discussion_r3260907219
########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: Review Comment: Truthy checks here miss empty-string and empty-bytes inputs. `DocumentLoaderOperator(source_path="")` falls through this guard and later blows up deep in `_resolve_files`. Use `source_path is not None` / `source_bytes is not None` instead. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: Review Comment: Temp-file leak if `tmp.write(raw)` fails. `NamedTemporaryFile(delete=False)` means the OS won't clean up, and the `try/finally` only wraps the consumer, so a write failure (disk full, non-bytes input from the templating issue above) leaves an orphan file behind. Move `tmp.write(raw)` inside the protective `try`, or use `tempfile.TemporaryDirectory()` as the outer context. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] Review Comment: `glob.glob("/path/does/not/exist")` returns `[]`, so a typo in `source_path` produces zero documents and a green task, which is silent data loss for a downstream embedding pipeline. Worth raising `FileNotFoundError` (or at minimum logging a warning) when the resolved file list is empty. ########## providers/common/ai/docs/operators/index.rst: ########## @@ -46,6 +46,9 @@ to pick the one that fits your use case: * - Multi-turn reasoning with tools (DB queries, API calls, etc.) - :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` - ``@task.agent`` + * - Parse files (PDF, DOCX, CSV, etc.) into document dicts for embedding Review Comment: While you're adding this entry: line 24 of this file still says "The common-ai provider ships **five** operators" -- this PR makes it six. Bump the count, and consider adding a matching descriptive paragraph below the table for `DocumentLoaderOperator` like the other operators have. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", Review Comment: `source_bytes` in `template_fields` is broken via the documented XCom workflow. Jinja stringifies `bytes` to its `repr` (`b"x..."` becomes the literal string `"b'x...'"`), so after templating `self.source_bytes` is a `str`, not `bytes`. `raw.decode("utf-8")` then raises `AttributeError`, and the binary parsers receive the literal `b'...'` text. The flagship `source_bytes="{{ ti.xcom_pull(...) }}"` example in the docs crashes on every run. Either drop `source_bytes` from `template_fields` (and document that callers must pass bytes directly), or template a separate `source_bytes_xcom_key` and pull bytes inside `execute()`. ########## providers/common/ai/docs/operators/document_loader.rst: ########## @@ -0,0 +1,194 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:document_loader: + +``DocumentLoaderOperator`` +========================== + +Use :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` +to parse files into ``list[dict(text, metadata)]`` for downstream embedding +pipelines. The operator bridges Airflow's connectivity layer (hooks that +produce bytes or local files) and the AI embedding layer (operators that +need structured text with metadata). + +The operator is **framework-agnostic** — it has no dependency on LlamaIndex, +LangChain, or any other AI framework. + +Built-in formats +---------------- + +``.txt``, ``.md``, ``.csv``, and ``.json`` are handled with zero extra +dependencies: + +.. code-block:: python + + from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/articles/", + ) + +CSV files produce one document per row. JSON files with a top-level array +produce one document per element; a single JSON object produces one document. + +PDF parsing +----------- + +Install the ``pdf`` extra to parse PDF files via `pypdf <https://pypdf.readthedocs.io/>`__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[pdf] + +.. code-block:: python + + load_pdfs = DocumentLoaderOperator( + task_id="load_pdfs", + source_path="/opt/airflow/data/reports/*.pdf", + ) + +Each page with extractable text becomes a separate document. Empty pages are +skipped. The ``page_number`` is included in the document metadata. + +DOCX parsing +------------ + +Install the ``docx`` extra to parse Word documents via +`python-docx <https://python-docx.readthedocs.io/>`__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[docx] + +.. code-block:: python + + load_word = DocumentLoaderOperator( + task_id="load_word", + source_path="/opt/airflow/data/specs/*.docx", + ) + +All non-empty paragraphs are concatenated into a single document per file. + +Glob patterns and filtering +---------------------------- + +Pass a glob pattern to ``source_path`` to match multiple files. Use +``file_extensions`` to limit which files are processed: + +.. code-block:: python + + load_filtered = DocumentLoaderOperator( + task_id="load_filtered", + source_path="/opt/airflow/data/mixed/*", + file_extensions=[".pdf", ".txt"], + ) + +Composing with downstream operators +------------------------------------ + +The output format (``list[dict(text, metadata)]``) is designed to feed +directly into embedding operators. For example, with the LlamaIndex +``EmbeddingOperator``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_path="/data/docs/*.pdf", + ) + + embed = EmbeddingOperator( + task_id="embed", + documents="{{ ti.xcom_pull(task_ids='load') }}", + llm_conn_id="openai_default", + ) + + load >> embed + +Composing with Airflow providers +--------------------------------- + +Use any Airflow provider to download files, then parse them with +``DocumentLoaderOperator``: + +.. code-block:: python + + from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator + + download = S3ToLocalFilesystemOperator( + task_id="download", + bucket_name="my-bucket", + key="documents/report.pdf", + local_path="/tmp/report.pdf", + ) + + load = DocumentLoaderOperator( + task_id="load", + source_path="/tmp/report.pdf", + ) + + download >> load + +For **structured API data** (Salesforce SOQL results, database query exports), +a ``@task`` that maps fields to text and metadata is more appropriate than +``DocumentLoaderOperator``, which is designed for binary file parsing: + +.. code-block:: python + + @task + def transform_cases(records: list[dict]) -> list[dict]: + return [ + { + "text": f"{r['Subject']}\n\n{r['Description']}", + "metadata": {"case_id": r["Id"], "source": "salesforce"}, + } + for r in records + ] + +Loading from bytes +------------------ + +When upstream tasks pass file content via XCom, use ``source_bytes`` with +an explicit ``file_type``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_bytes="{{ ti.xcom_pull(task_ids='fetch_file') }}", Review Comment: This example is the headline XCom workflow but it won't work today because `source_bytes` is templated as a string (see the comment on `document_loader.py:72`). Either fix the template handling or update this example to call the operator from a `@task` that pre-fetches bytes. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(raw) + tmp_path = Path(tmp.name) + try: + return self._parse_file(tmp_path, ext) + finally: + os.unlink(tmp_path) + + text = raw.decode("utf-8") + if backend == "csv": + return self._parse_csv_text(text) + if backend == "json": + return self._parse_json_text(text) + return [{"text": text, "metadata": {}}] + + def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: + backend = self._resolve_backend(ext) + + if backend == "text": + return self._parse_text(file_path) + if backend == "csv": + return self._parse_csv(file_path) + if backend == "json": + return self._parse_json(file_path) + if backend == "pypdf": + return self._parse_pdf(file_path) + if backend == "python-docx": + return self._parse_docx(file_path) + + raise ValueError(f"No parser found for backend '{backend}'.") + + def _resolve_backend(self, ext: str) -> str: + if self.parser != "auto": + return self.parser + + ext = ext.lower() + if ext not in self.EXTENSION_BACKEND_MAP: + supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys())) + raise ValueError( + f"No parser registered for extension '{ext}'. " + f"Supported extensions: {supported}. " + f"Set 'parser' explicitly to override auto-detection." + ) + return self.EXTENSION_BACKEND_MAP[ext] + + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return [{"text": text, "metadata": {}}] + + def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_csv_text(text) + + def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: + reader = csv.DictReader(io.StringIO(text)) + documents = [] + for row_idx, row in enumerate(reader): + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) + documents.append( + { + "text": row_text, + "metadata": {"row_index": row_idx}, + } + ) + return documents + + def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_json_text(text) + + def _parse_json_text(self, text: str) -> list[dict[str, Any]]: + data = json.loads(text) + if isinstance(data, list): + return [ + {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} + for idx, item in enumerate(data) + ] + return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] + + def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: + try: + from pypdf import PdfReader + except ImportError: + raise ImportError( Review Comment: This should be `AirflowOptionalProviderFeatureException`, not plain `ImportError`. It's the convention used elsewhere in this provider (`agent.py:189`, `llm_sql.py:34`, `llm_schema_compare.py:249`) and lets the scheduler classify the failure as "missing optional dep" rather than a generic crash. Same applies to the docx branch at line 262. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(raw) + tmp_path = Path(tmp.name) + try: + return self._parse_file(tmp_path, ext) + finally: + os.unlink(tmp_path) + + text = raw.decode("utf-8") + if backend == "csv": + return self._parse_csv_text(text) + if backend == "json": + return self._parse_json_text(text) + return [{"text": text, "metadata": {}}] + + def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: + backend = self._resolve_backend(ext) + + if backend == "text": + return self._parse_text(file_path) + if backend == "csv": + return self._parse_csv(file_path) + if backend == "json": + return self._parse_json(file_path) + if backend == "pypdf": + return self._parse_pdf(file_path) + if backend == "python-docx": + return self._parse_docx(file_path) + + raise ValueError(f"No parser found for backend '{backend}'.") + + def _resolve_backend(self, ext: str) -> str: + if self.parser != "auto": + return self.parser + + ext = ext.lower() + if ext not in self.EXTENSION_BACKEND_MAP: + supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys())) + raise ValueError( + f"No parser registered for extension '{ext}'. " + f"Supported extensions: {supported}. " + f"Set 'parser' explicitly to override auto-detection." + ) + return self.EXTENSION_BACKEND_MAP[ext] + + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return [{"text": text, "metadata": {}}] + + def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_csv_text(text) + + def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: + reader = csv.DictReader(io.StringIO(text)) + documents = [] + for row_idx, row in enumerate(reader): + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) + documents.append( + { + "text": row_text, + "metadata": {"row_index": row_idx}, + } + ) + return documents + + def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_json_text(text) + + def _parse_json_text(self, text: str) -> list[dict[str, Any]]: + data = json.loads(text) + if isinstance(data, list): + return [ + {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} Review Comment: For JSON arrays of primitives, `json.dumps("alpha")` returns `'"alpha"'` (with embedded quotes), so embedded text becomes `"alpha"` instead of `alpha`. Special-case string items: `item if isinstance(item, str) else json.dumps(item, ensure_ascii=False)` before dumping. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(raw) + tmp_path = Path(tmp.name) + try: + return self._parse_file(tmp_path, ext) + finally: + os.unlink(tmp_path) + + text = raw.decode("utf-8") + if backend == "csv": + return self._parse_csv_text(text) + if backend == "json": + return self._parse_json_text(text) + return [{"text": text, "metadata": {}}] + + def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: + backend = self._resolve_backend(ext) + + if backend == "text": + return self._parse_text(file_path) + if backend == "csv": + return self._parse_csv(file_path) + if backend == "json": + return self._parse_json(file_path) + if backend == "pypdf": + return self._parse_pdf(file_path) + if backend == "python-docx": + return self._parse_docx(file_path) + + raise ValueError(f"No parser found for backend '{backend}'.") + + def _resolve_backend(self, ext: str) -> str: + if self.parser != "auto": + return self.parser + + ext = ext.lower() + if ext not in self.EXTENSION_BACKEND_MAP: + supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys())) + raise ValueError( + f"No parser registered for extension '{ext}'. " + f"Supported extensions: {supported}. " + f"Set 'parser' explicitly to override auto-detection." + ) + return self.EXTENSION_BACKEND_MAP[ext] + + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return [{"text": text, "metadata": {}}] + + def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_csv_text(text) + + def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: + reader = csv.DictReader(io.StringIO(text)) + documents = [] + for row_idx, row in enumerate(reader): + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) + documents.append( + { + "text": row_text, + "metadata": {"row_index": row_idx}, + } + ) + return documents + + def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_json_text(text) + + def _parse_json_text(self, text: str) -> list[dict[str, Any]]: + data = json.loads(text) + if isinstance(data, list): + return [ + {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} + for idx, item in enumerate(data) + ] + return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] + + def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: + try: + from pypdf import PdfReader + except ImportError: + raise ImportError( + "pypdf is required for PDF parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[pdf]" + ) + + reader = PdfReader(str(file_path)) + documents = [] + for page_num, page in enumerate(reader.pages): + text = page.extract_text() or "" + if text.strip(): + documents.append( + { + "text": text, + "metadata": {"page_number": page_num + 1}, + } + ) + return documents + + def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]: + try: + from docx import Document + except ImportError: + raise ImportError( + "python-docx is required for DOCX parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[docx]" + ) + + doc = Document(str(file_path)) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] Review Comment: DOCX extraction loses tables, headers, footers, and footnotes. For RAG over real business documents (specs, reports) that often is most of the content. Worth either documenting the limitation or iterating `doc.element.body` to pick up `w:tbl` blocks too. ########## providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} Review Comment: Case mismatch makes this filter silently drop everything. `file_extensions=[".TXT"]` produces `allowed = {".TXT"}`, while line 148 compares `p.suffix.lower()` (always lowercase). `.lower()` the extensions when building `allowed`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
