This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d8961101b03 Add MongoToGCSOperator to copy MongoDB collections to GCS 
(#66013)
d8961101b03 is described below

commit d8961101b03be1b992d17c9c4b27be323afe4f39
Author: 이승오(Data Platform) <[email protected]>
AuthorDate: Tue Jun 9 19:54:46 2026 +0900

    Add MongoToGCSOperator to copy MongoDB collections to GCS (#66013)
    
    * Add MongoToGCSOperator to copy MongoDB collections to GCS
    
    Introduces a new transfer operator under the Google provider that exports
    MongoDB documents to Google Cloud Storage in JSON, CSV, or Parquet format.
    The operator extends BaseSQLToGCSOperator and accepts either a find filter
    or an aggregation pipeline through mongo_query, with optional projection
    and allowDiskUse controls.
    
    A small cursor adapter wraps the pymongo cursor as a DB-API style cursor
    so the parent operator chunking, schema inference, and upload flow can
    be reused. BSON-specific values (ObjectId, Decimal128, bytes) are
    converted to BigQuery-friendly types.
    
    Includes provider.yaml registration, mongo provider as a transitive extra,
    unit tests, system test example, and how-to documentation.
    
    * Fix CI failures for MongoToGCSOperator PR
    
    - mongo_to_gcs.py: declare cursor as Any so the find/aggregate
      branches don't trip mypy's stricter type narrowing.
    - test_selective_checks: add mongo to google-related expected
      provider lists, now that mongo is a google cross-provider dep.
    - get_provider_info.py: register the mongo_to_gcs transfer so the
      provider build-files check stays in sync with provider.yaml.
    - uv.lock: refresh to add the mongo extra and pick up an
      unrelated dev/registry tomli dep that was already on main.
    
    * Regenerate google docs/index.rst for new mongo cross-provider dep
    
    The update-providers-build-files prek hook also regenerates the
    google provider docs/index.rst from provider.yaml. The previous
    fixup commit added mongo to provider.yaml/pyproject.toml but did
    not include the docs/index.rst line, so the CI hook kept failing.
    
    * Address review feedback on MongoToGCSOperator
    
    - Document why MongoToGCSOperator reuses the SQL-to-GCS base class despite
      MongoDB being a NoSQL store, and note a possible future 
BaseNoSQLToGCSOperator.
    - Drop the inherited but unused `sql` template field (and the `.sql` 
template
      extension / sql renderer) so the empty `sql` value is no longer exposed in
      rendered templates; this operator is driven by `mongo_query`.
    - Use `autospec=True` when patching MongoHook/GCSHook in the unit tests.
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 dev/breeze/tests/test_selective_checks.py          |   8 +-
 providers/google/docs/index.rst                    |   1 +
 .../docs/operators/transfer/mongo_to_gcs.rst       |  68 +++++
 providers/google/provider.yaml                     |   4 +
 providers/google/pyproject.toml                    |   4 +
 .../google/cloud/transfers/mongo_to_gcs.py         | 225 +++++++++++++++++
 .../airflow/providers/google/get_provider_info.py  |   6 +
 .../google/cloud/transfers/example_mongo_to_gcs.py | 111 +++++++++
 .../google/cloud/transfers/test_mongo_to_gcs.py    | 273 +++++++++++++++++++++
 uv.lock                                            |   8 +-
 10 files changed, 703 insertions(+), 5 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 622041285ac..f678f056db2 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2376,7 +2376,7 @@ def test_expected_output_push(
             {
                 "selected-providers-list-as-string": "amazon apache.cassandra 
apache.kafka "
                 "cncf.kubernetes common.compat common.messaging common.sql 
databricks "
-                "facebook google hashicorp http microsoft.azure 
microsoft.mssql mysql "
+                "facebook google hashicorp http microsoft.azure 
microsoft.mssql mongo mysql "
                 "openlineage oracle postgres presto salesforce samba sftp ssh 
standard trino",
                 "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                 "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2388,7 +2388,7 @@ def test_expected_output_push(
                 "docs-build": "true",
                 "docs-list-as-string": "apache-airflow helm-chart amazon 
apache.cassandra "
                 "apache.kafka cncf.kubernetes common.compat common.messaging 
common.sql databricks facebook google hashicorp http microsoft.azure "
-                "microsoft.mssql mysql openlineage oracle postgres "
+                "microsoft.mssql mongo mysql openlineage oracle postgres "
                 "presto salesforce samba sftp ssh standard trino",
                 "skip-prek-hooks": (
                     
"identity,ktlint,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests,"
@@ -2411,7 +2411,7 @@ def test_expected_output_push(
                             "description": "amazon...standard",
                             "test_types": "Providers[amazon] 
Providers[apache.cassandra,"
                             
"apache.kafka,cncf.kubernetes,common.compat,common.messaging,common.sql,databricks,facebook,"
-                            
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
+                            
"hashicorp,http,microsoft.azure,microsoft.mssql,mongo,mysql,"
                             
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
                             "Providers[google] "
                             "Providers[standard]",
@@ -2682,7 +2682,7 @@ def test_upgrade_to_newer_dependencies(
             {
                 "docs-list-as-string": "amazon apache.cassandra apache.kafka "
                 "cncf.kubernetes common.compat common.messaging common.sql 
databricks facebook google hashicorp http "
-                "microsoft.azure microsoft.mssql mysql openlineage oracle "
+                "microsoft.azure microsoft.mssql mongo mysql openlineage 
oracle "
                 "postgres presto salesforce samba sftp ssh standard trino",
             },
             id="Google provider docs changed",
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 5f5cf23705e..b1f44df8006 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -217,6 +217,7 @@ Dependent package
 `apache-airflow-providers-http 
<https://airflow.apache.org/docs/apache-airflow-providers-http>`_               
           ``http``
 `apache-airflow-providers-microsoft-azure 
<https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure>`_    
``microsoft.azure``
 `apache-airflow-providers-microsoft-mssql 
<https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql>`_    
``microsoft.mssql``
+`apache-airflow-providers-mongo 
<https://airflow.apache.org/docs/apache-airflow-providers-mongo>`_              
          ``mongo``
 `apache-airflow-providers-mysql 
<https://airflow.apache.org/docs/apache-airflow-providers-mysql>`_              
          ``mysql``
 `apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_        
    ``openlineage``
 `apache-airflow-providers-oracle 
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_             
         ``oracle``
diff --git a/providers/google/docs/operators/transfer/mongo_to_gcs.rst 
b/providers/google/docs/operators/transfer/mongo_to_gcs.rst
new file mode 100644
index 00000000000..eec4741cf17
--- /dev/null
+++ b/providers/google/docs/operators/transfer/mongo_to_gcs.rst
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+MongoDB To Google Cloud Storage Operator
+========================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service 
is
+used to store large data from various applications. This page shows how to copy
+data from MongoDB to GCS.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:MongoToGCSOperator:
+
+MongoToGCSOperator
+~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoToGCSOperator`
 allows you to upload
+data from a MongoDB collection to GCS in JSON, CSV, or Parquet format.
+
+The operator accepts either a ``find()`` filter (a ``dict``) or an aggregation 
pipeline (a ``list``)
+through the ``mongo_query`` parameter. When ``mongo_query`` is a dict, 
``mongo_projection`` may be
+used to limit the fields returned. When ``mongo_query`` is a list, the value 
is passed as an
+aggregation pipeline and ``mongo_projection`` is ignored.
+
+The schema is derived from the first document in the result set, so all 
documents are expected to
+share a consistent shape; missing fields are exported as ``null``.
+
+Below is an example of using this operator to export the result of a 
``find()`` query to GCS.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_mongo_to_gcs]
+    :end-before: [END howto_operator_mongo_to_gcs]
+
+The operator also supports running an aggregation pipeline.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_mongo_to_gcs_aggregation]
+    :end-before: [END howto_operator_mongo_to_gcs_aggregation]
+
+
+Reference
+---------
+
+For further information, look at:
+
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
+* `MongoDB Documentation <https://www.mongodb.com/docs/>`__
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index c89f9424c23..dff2d84481f 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -1010,6 +1010,10 @@ transfers:
   - source-integration-name: Apache Cassandra
     target-integration-name: Google Cloud Storage (GCS)
     python-module: airflow.providers.google.cloud.transfers.cassandra_to_gcs
+  - source-integration-name: MongoDB
+    target-integration-name: Google Cloud Storage (GCS)
+    how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/mongo_to_gcs.rst
+    python-module: airflow.providers.google.cloud.transfers.mongo_to_gcs
   - source-integration-name: Google Calendar
     target-integration-name: Google Cloud Storage (GCS)
     how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/calendar_to_gcs.rst
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 679785c778d..756d96deaff 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -178,6 +178,9 @@ dependencies = [
 "microsoft.mssql" = [
     "apache-airflow-providers-microsoft-mssql"
 ]
+"mongo" = [
+    "apache-airflow-providers-mongo"
+]
 "mysql" = [
     "apache-airflow-providers-mysql"
 ]
@@ -227,6 +230,7 @@ dev = [
     "apache-airflow-providers-http",
     "apache-airflow-providers-microsoft-azure",
     "apache-airflow-providers-microsoft-mssql",
+    "apache-airflow-providers-mongo",
     "apache-airflow-providers-mysql",
     "apache-airflow-providers-openlineage",
     "apache-airflow-providers-oracle",
diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py 
b/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py
new file mode 100644
index 00000000000..0a3c9c437a3
--- /dev/null
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""MongoDB to GCS operator."""
+
+from __future__ import annotations
+
+import base64
+import json
+from collections.abc import Iterator, Sequence
+from datetime import date, datetime, time
+from decimal import Decimal
+from functools import cached_property
+from typing import Any
+
+from bson import ObjectId
+from bson.decimal128 import Decimal128
+
+from airflow.providers.google.cloud.transfers.sql_to_gcs import 
BaseSQLToGCSOperator
+from airflow.providers.mongo.hooks.mongo import MongoHook
+
+
+class _MongoCursorAdapter:
+    """
+    Wrap a pymongo cursor as a DB-API 2.0 style cursor.
+
+    ``BaseSQLToGCSOperator`` consumes ``cursor.description`` to derive the
+    BigQuery schema and iterates the cursor expecting tuples. MongoDB documents
+    are dict-shaped and have no fixed schema; this adapter:
+
+    * Peeks the first document to derive ``description`` (column names and
+      Python types).
+    * Re-yields the first document, then iterates the rest.
+    * Converts each document to a tuple in the derived column order, filling
+      missing fields with ``None``.
+    """
+
+    def __init__(self, cursor: Any) -> None:
+        self._cursor = iter(cursor)
+        self._first: dict | None = None
+        self._description: list[tuple] = []
+        try:
+            self._first = next(self._cursor)
+        except StopIteration:
+            return
+        self._description = [
+            (name, type(value), None, None, None, None, True) for name, value 
in self._first.items()
+        ]
+
+    @property
+    def description(self) -> list[tuple]:
+        return self._description
+
+    def __iter__(self) -> Iterator[tuple]:
+        if self._first is None:
+            return
+        names = [d[0] for d in self._description]
+        yield tuple(self._first.get(n) for n in names)
+        for doc in self._cursor:
+            yield tuple(doc.get(n) for n in names)
+
+
+class MongoToGCSOperator(BaseSQLToGCSOperator):
+    """
+    Copy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet 
format.
+
+    .. note::
+        MongoDB is a NoSQL store, so subclassing ``BaseSQLToGCSOperator`` is a
+        deliberate reuse choice rather than a natural fit. The base class 
already
+        implements the chunking, schema inference and GCS upload flow we want; 
this
+        operator reuses it by adapting the pymongo cursor to a DB-API style 
cursor
+        (see :class:`_MongoCursorAdapter`) and overriding ``query`` /
+        ``field_to_bigquery`` / ``convert_type``. A dedicated
+        ``BaseNoSQLToGCSOperator`` could be a cleaner home for this in the 
future.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:MongoToGCSOperator`
+
+    :param mongo_conn_id: Reference to a specific
+        :ref:`Mongo connection <howto/connection:mongo>`.
+    :param mongo_db: The MongoDB database name.
+    :param mongo_collection: The MongoDB collection name.
+    :param mongo_query: A MongoDB find filter (``dict``) or aggregation 
pipeline
+        (``list``). Defaults to ``{}`` (match all).
+    :param mongo_projection: Optional projection passed to ``find()``. Ignored
+        when ``mongo_query`` is an aggregation pipeline. Accepts a dict
+        (``{"field": 1}``) or list of field names.
+    :param allow_disk_use: Whether to pass ``allowDiskUse=True`` to
+        ``aggregate()``. Defaults to True.
+    """
+
+    ui_color = "#a0e08c"
+
+    # ``sql``, ``template_ext`` (``.sql``) and the ``sql`` renderer are 
inherited
+    # from ``BaseSQLToGCSOperator`` but are meaningless here — this operator is
+    # driven by ``mongo_query``, not SQL. Override them explicitly so the 
unused
+    # ``sql`` field is not exposed in rendered templates.
+    template_fields: Sequence[str] = (
+        "bucket",
+        "filename",
+        "schema_filename",
+        "schema",
+        "parameters",
+        "impersonation_chain",
+        "partition_columns",
+        "mongo_collection",
+        "mongo_db",
+        "mongo_query",
+    )
+    template_ext: Sequence[str] = ()
+    template_fields_renderers = {}
+
+    type_map: dict[type, str] = {
+        bool: "BOOL",
+        int: "INTEGER",
+        float: "FLOAT",
+        Decimal: "FLOAT",
+        Decimal128: "FLOAT",
+        str: "STRING",
+        bytes: "BYTES",
+        ObjectId: "STRING",
+        datetime: "TIMESTAMP",
+        date: "DATE",
+        time: "TIME",
+        list: "STRING",
+        dict: "STRING",
+    }
+
+    def __init__(
+        self,
+        *,
+        mongo_conn_id: str = "mongo_default",
+        mongo_db: str,
+        mongo_collection: str,
+        mongo_query: dict | list | None = None,
+        mongo_projection: dict | list | None = None,
+        allow_disk_use: bool = True,
+        **kwargs: Any,
+    ) -> None:
+        # `sql` is required by BaseSQLToGCSOperator but is unused here.
+        kwargs.setdefault("sql", "")
+        super().__init__(**kwargs)
+        self.mongo_conn_id = mongo_conn_id
+        self.mongo_db = mongo_db
+        self.mongo_collection = mongo_collection
+        self.mongo_query = mongo_query if mongo_query is not None else {}
+        self.mongo_projection = mongo_projection
+        self.allow_disk_use = allow_disk_use
+
+    @cached_property
+    def db_hook(self) -> MongoHook:
+        return MongoHook(mongo_conn_id=self.mongo_conn_id)
+
+    def query(self) -> _MongoCursorAdapter:
+        """Execute the configured find/aggregate and return a DB-API style 
cursor."""
+        coll = self.db_hook.get_conn()[self.mongo_db][self.mongo_collection]
+        cursor: Any
+        if isinstance(self.mongo_query, list):
+            self.log.info("Executing aggregate: %s", self.mongo_query)
+            cursor = coll.aggregate(self.mongo_query, 
allowDiskUse=self.allow_disk_use)
+        else:
+            self.log.info(
+                "Executing find: filter=%s projection=%s",
+                self.mongo_query,
+                self.mongo_projection,
+            )
+            cursor = coll.find(self.mongo_query, 
projection=self.mongo_projection)
+        return _MongoCursorAdapter(cursor)
+
+    def field_to_bigquery(self, field) -> dict[str, str]:
+        return {
+            "name": field[0],
+            "type": self.type_map.get(field[1], "STRING"),
+            "mode": "NULLABLE",
+        }
+
+    def convert_type(self, value: Any, schema_type: str | None, **kwargs: Any) 
-> Any:
+        """
+        Convert pymongo values to BigQuery-friendly types.
+
+        * ``ObjectId`` -> ``str``.
+        * ``Decimal128`` / ``Decimal`` -> ``float``.
+        * ``bytes`` -> base64-encoded ``str`` (or ``int`` when
+          ``schema_type == 'INTEGER'``).
+        * ``datetime`` -> ``str(value)``.
+        * ``date`` -> ISO date string when ``schema_type == 'DATE'``, otherwise
+          combined ``datetime`` string.
+        * ``list`` / ``dict`` / ``tuple`` -> JSON string.
+        """
+        if value is None:
+            return value
+        if isinstance(value, ObjectId):
+            return str(value)
+        if isinstance(value, Decimal128):
+            return float(value.to_decimal())
+        if isinstance(value, Decimal):
+            return float(value)
+        if isinstance(value, bytes):
+            if schema_type == "INTEGER":
+                return int.from_bytes(value, "big")
+            return base64.standard_b64encode(value).decode("ascii")  # 
type:ignore
+        if isinstance(value, datetime):
+            return str(value)
+        if isinstance(value, date):
+            if schema_type == "DATE":
+                return value.isoformat()
+            return str(datetime.combine(value, time.min))
+        if isinstance(value, (list, dict, tuple)):
+            return json.dumps(value, default=str)
+        return value
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index ec6f630a162..61571bc6831 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1196,6 +1196,12 @@ def get_provider_info():
                 "target-integration-name": "Google Cloud Storage (GCS)",
                 "python-module": 
"airflow.providers.google.cloud.transfers.cassandra_to_gcs",
             },
+            {
+                "source-integration-name": "MongoDB",
+                "target-integration-name": "Google Cloud Storage (GCS)",
+                "how-to-guide": 
"/docs/apache-airflow-providers-google/operators/transfer/mongo_to_gcs.rst",
+                "python-module": 
"airflow.providers.google.cloud.transfers.mongo_to_gcs",
+            },
             {
                 "source-integration-name": "Google Calendar",
                 "target-integration-name": "Google Cloud Storage (GCS)",
diff --git 
a/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py 
b/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
new file mode 100644
index 00000000000..8d33c5af44c
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use MongoToGCSOperator.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.transfers.mongo_to_gcs import 
MongoToGCSOperator
+
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    # Compatibility for Airflow < 3.1
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "mongo_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "mongo_export_{}.ndjson"
+SCHEMA_FILE_NAME = "mongo_export_schema.json"
+
+MONGO_CONN_ID = os.environ.get("SYSTEM_TESTS_MONGO_CONN_ID", "mongo_default")
+MONGO_DATABASE = os.environ.get("SYSTEM_TESTS_MONGO_DATABASE", "test_db")
+MONGO_COLLECTION = os.environ.get("SYSTEM_TESTS_MONGO_COLLECTION", 
"test_collection")
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "mongo", "gcs"],
+) as dag:
+    create_gcs_bucket = GCSCreateBucketOperator(
+        task_id="create_gcs_bucket",
+        bucket_name=BUCKET_NAME,
+    )
+
+    # [START howto_operator_mongo_to_gcs]
+    mongo_to_gcs_find = MongoToGCSOperator(
+        task_id="mongo_to_gcs_find",
+        mongo_conn_id=MONGO_CONN_ID,
+        mongo_db=MONGO_DATABASE,
+        mongo_collection=MONGO_COLLECTION,
+        mongo_query={"status": "active"},
+        mongo_projection={"_id": 1, "name": 1, "status": 1},
+        bucket=BUCKET_NAME,
+        filename=FILE_NAME,
+        schema_filename=SCHEMA_FILE_NAME,
+        export_format="json",
+    )
+    # [END howto_operator_mongo_to_gcs]
+
+    # [START howto_operator_mongo_to_gcs_aggregation]
+    mongo_to_gcs_aggregate = MongoToGCSOperator(
+        task_id="mongo_to_gcs_aggregate",
+        mongo_conn_id=MONGO_CONN_ID,
+        mongo_db=MONGO_DATABASE,
+        mongo_collection=MONGO_COLLECTION,
+        mongo_query=[
+            {"$match": {"status": "active"}},
+            {"$group": {"_id": "$category", "total": {"$sum": 1}}},
+        ],
+        bucket=BUCKET_NAME,
+        filename="mongo_aggregate_{}.ndjson",
+        export_format="json",
+    )
+    # [END howto_operator_mongo_to_gcs_aggregation]
+
+    delete_gcs_bucket = GCSDeleteBucketOperator(
+        task_id="delete_gcs_bucket",
+        bucket_name=BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    create_gcs_bucket >> [mongo_to_gcs_find, mongo_to_gcs_aggregate] >> 
delete_gcs_bucket
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
contributing-docs/testing/system_tests.rst)
+test_run = get_test_run(dag)
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py 
b/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py
new file mode 100644
index 00000000000..020266d6809
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py
@@ -0,0 +1,273 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from decimal import Decimal
+from unittest import mock
+
+import pytest
+from bson import ObjectId
+from bson.decimal128 import Decimal128
+
+from airflow.providers.google.cloud.transfers.mongo_to_gcs import (
+    MongoToGCSOperator,
+    _MongoCursorAdapter,
+)
+
+TASK_ID = "test-mongo-to-gcs"
+MONGO_CONN_ID = "mongo_test"
+MONGO_DB = "test_db"
+MONGO_COLLECTION = "test_collection"
+BUCKET = "gs://test"
+JSON_FILENAME = "test_{}.ndjson"
+
+
+class TestMongoCursorAdapter:
+    def test_empty_cursor(self):
+        adapter = _MongoCursorAdapter(iter([]))
+        assert adapter.description == []
+        assert list(adapter) == []
+
+    def test_description_derived_from_first_doc(self):
+        docs = [{"name": "alice", "age": 30}, {"name": "bob", "age": 25}]
+        adapter = _MongoCursorAdapter(iter(docs))
+
+        names = [d[0] for d in adapter.description]
+        types = [d[1] for d in adapter.description]
+        assert names == ["name", "age"]
+        assert types == [str, int]
+
+    def test_iteration_returns_tuples_in_description_order(self):
+        docs = [{"name": "alice", "age": 30}, {"name": "bob", "age": 25}]
+        adapter = _MongoCursorAdapter(iter(docs))
+
+        rows = list(adapter)
+        assert rows == [("alice", 30), ("bob", 25)]
+
+    def test_missing_field_filled_with_none(self):
+        docs = [{"name": "alice", "age": 30}, {"name": "bob"}]
+        adapter = _MongoCursorAdapter(iter(docs))
+
+        rows = list(adapter)
+        assert rows == [("alice", 30), ("bob", None)]
+
+
+class TestMongoToGCSOperator:
+    def test_init(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_conn_id=MONGO_CONN_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.task_id == TASK_ID
+        assert op.mongo_conn_id == MONGO_CONN_ID
+        assert op.mongo_db == MONGO_DB
+        assert op.mongo_collection == MONGO_COLLECTION
+        assert op.mongo_query == {}
+        assert op.mongo_projection is None
+        assert op.allow_disk_use is True
+        assert op.bucket == BUCKET
+        assert op.filename == JSON_FILENAME
+
+    def test_field_to_bigquery_known_type(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.field_to_bigquery(("col", int, None, None, None, None, 
True)) == {
+            "name": "col",
+            "type": "INTEGER",
+            "mode": "NULLABLE",
+        }
+
+    def test_field_to_bigquery_unknown_type_defaults_to_string(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+
+        class _Custom:
+            pass
+
+        assert op.field_to_bigquery(("col", _Custom, None, None, None, None, 
True))["type"] == "STRING"
+
+    @pytest.mark.parametrize(
+        ("value", "schema_type", "expected"),
+        [
+            (None, None, None),
+            ("text", None, "text"),
+            (42, None, 42),
+            (3.14, None, 3.14),
+            (True, None, True),
+            (Decimal("1.5"), None, 1.5),
+            (datetime.datetime(2023, 1, 2, 3, 4, 5), None, "2023-01-02 
03:04:05"),
+            (datetime.date(2023, 1, 2), "DATE", "2023-01-02"),
+            (datetime.date(2023, 1, 2), None, "2023-01-02 00:00:00"),
+            ([1, 2, 3], None, "[1, 2, 3]"),
+            ({"a": 1}, None, '{"a": 1}'),
+            ((1, 2), None, "[1, 2]"),
+        ],
+    )
+    def test_convert_type_basic(self, value, schema_type, expected):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.convert_type(value, schema_type) == expected
+
+    def test_convert_type_object_id(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        oid = ObjectId("507f1f77bcf86cd799439011")
+        assert op.convert_type(oid, None) == "507f1f77bcf86cd799439011"
+
+    def test_convert_type_decimal128(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.convert_type(Decimal128("3.14"), None) == 3.14
+
+    def test_convert_type_bytes_default_base64(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.convert_type(b"hello", None) == "aGVsbG8="
+
+    def test_convert_type_bytes_as_integer(self):
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        assert op.convert_type(b"\x00\x01", "INTEGER") == 1
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook", 
autospec=True)
+    def test_query_with_find_filter(self, mock_hook_class):
+        mock_collection = mock.MagicMock()
+        
mock_hook_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
 = mock_collection
+        mock_collection.find.return_value = iter([{"_id": ObjectId(), "name": 
"alice"}])
+
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_conn_id=MONGO_CONN_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            mongo_query={"name": "alice"},
+            mongo_projection={"name": 1},
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+
+        cursor = op.query()
+        mock_collection.find.assert_called_once_with({"name": "alice"}, 
projection={"name": 1})
+        mock_collection.aggregate.assert_not_called()
+        assert isinstance(cursor, _MongoCursorAdapter)
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook", 
autospec=True)
+    def test_query_with_aggregation_pipeline(self, mock_hook_class):
+        mock_collection = mock.MagicMock()
+        
mock_hook_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
 = mock_collection
+        mock_collection.aggregate.return_value = iter([{"_id": "alice", 
"count": 1}])
+
+        pipeline = [{"$match": {"active": True}}, {"$group": {"_id": "$name", 
"count": {"$sum": 1}}}]
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_conn_id=MONGO_CONN_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            mongo_query=pipeline,
+            allow_disk_use=False,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+
+        cursor = op.query()
+        mock_collection.aggregate.assert_called_once_with(pipeline, 
allowDiskUse=False)
+        mock_collection.find.assert_not_called()
+        assert isinstance(cursor, _MongoCursorAdapter)
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook", 
autospec=True)
+    @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook", 
autospec=True)
+    def test_execute_uploads_to_gcs(self, gcs_hook_mock_class, 
mongo_hook_mock_class):
+        mock_collection = mock.MagicMock()
+        
mongo_hook_mock_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
 = mock_collection
+        mock_collection.find.return_value = iter(
+            [
+                {"name": "alice", "age": 30},
+                {"name": "bob", "age": 25},
+            ]
+        )
+
+        gcs_hook_mock = gcs_hook_mock_class.return_value
+        upload_called = False
+
+        def _assert_upload(bucket, obj, tmp_filename, mime_type=None, 
gzip=False, metadata=None):
+            nonlocal upload_called
+            upload_called = True
+            assert bucket == BUCKET
+            assert obj == JSON_FILENAME.format(0)
+            assert mime_type == "application/json"
+            with open(tmp_filename, "rb") as fh:
+                lines = fh.read().splitlines()
+            assert lines == [
+                b'{"age": 30, "name": "alice"}',
+                b'{"age": 25, "name": "bob"}',
+            ]
+
+        gcs_hook_mock.upload.side_effect = _assert_upload
+
+        op = MongoToGCSOperator(
+            task_id=TASK_ID,
+            mongo_conn_id=MONGO_CONN_ID,
+            mongo_db=MONGO_DB,
+            mongo_collection=MONGO_COLLECTION,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+        )
+        op.execute(None)
+
+        assert upload_called, "Expected GCS upload to be called"
+        
mongo_hook_mock_class.assert_called_once_with(mongo_conn_id=MONGO_CONN_ID)
diff --git a/uv.lock b/uv.lock
index 8a0780cf8a5..35ea5e56476 100644
--- a/uv.lock
+++ b/uv.lock
@@ -5489,6 +5489,9 @@ microsoft-azure = [
 microsoft-mssql = [
     { name = "apache-airflow-providers-microsoft-mssql" },
 ]
+mongo = [
+    { name = "apache-airflow-providers-mongo" },
+]
 mysql = [
     { name = "apache-airflow-providers-mysql" },
 ]
@@ -5535,6 +5538,7 @@ dev = [
     { name = "apache-airflow-providers-http" },
     { name = "apache-airflow-providers-microsoft-azure" },
     { name = "apache-airflow-providers-microsoft-mssql" },
+    { name = "apache-airflow-providers-mongo" },
     { name = "apache-airflow-providers-mysql" },
     { name = "apache-airflow-providers-openlineage" },
     { name = "apache-airflow-providers-oracle" },
@@ -5565,6 +5569,7 @@ requires-dist = [
     { name = "apache-airflow-providers-http", marker = "extra == 'http'", 
editable = "providers/http" },
     { name = "apache-airflow-providers-microsoft-azure", marker = "extra == 
'microsoft-azure'", editable = "providers/microsoft/azure" },
     { name = "apache-airflow-providers-microsoft-mssql", marker = "extra == 
'microsoft-mssql'", editable = "providers/microsoft/mssql" },
+    { name = "apache-airflow-providers-mongo", marker = "extra == 'mongo'", 
editable = "providers/mongo" },
     { name = "apache-airflow-providers-mysql", marker = "extra == 'mysql'", 
editable = "providers/mysql" },
     { name = "apache-airflow-providers-openlineage", marker = "extra == 
'openlineage'", editable = "providers/openlineage" },
     { name = "apache-airflow-providers-oracle", marker = "extra == 'oracle'", 
editable = "providers/oracle" },
@@ -5649,7 +5654,7 @@ requires-dist = [
     { name = "tenacity", specifier = ">=8.3.0" },
     { name = "types-protobuf", specifier = ">=5.27.0,!=5.29.1.20250402" },
 ]
-provides-extras = ["cncf-kubernetes", "fab", "leveldb", "oracle", "facebook", 
"amazon", "apache-cassandra", "microsoft-azure", "microsoft-mssql", "mysql", 
"openlineage", "postgres", "presto", "salesforce", "sftp", "ssh", "trino", 
"http", "standard", "common-messaging"]
+provides-extras = ["cncf-kubernetes", "fab", "leveldb", "oracle", "facebook", 
"amazon", "apache-cassandra", "microsoft-azure", "microsoft-mssql", "mongo", 
"mysql", "openlineage", "postgres", "presto", "salesforce", "sftp", "ssh", 
"trino", "http", "standard", "common-messaging"]
 
 [package.metadata.requires-dev]
 dev = [
@@ -5667,6 +5672,7 @@ dev = [
     { name = "apache-airflow-providers-http", editable = "providers/http" },
     { name = "apache-airflow-providers-microsoft-azure", editable = 
"providers/microsoft/azure" },
     { name = "apache-airflow-providers-microsoft-mssql", editable = 
"providers/microsoft/mssql" },
+    { name = "apache-airflow-providers-mongo", editable = "providers/mongo" },
     { name = "apache-airflow-providers-mysql", editable = "providers/mysql" },
     { name = "apache-airflow-providers-openlineage", editable = 
"providers/openlineage" },
     { name = "apache-airflow-providers-oracle", editable = "providers/oracle" 
},


Reply via email to