This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d8961101b03 Add MongoToGCSOperator to copy MongoDB collections to GCS
(#66013)
d8961101b03 is described below
commit d8961101b03be1b992d17c9c4b27be323afe4f39
Author: 이승오(Data Platform) <[email protected]>
AuthorDate: Tue Jun 9 19:54:46 2026 +0900
Add MongoToGCSOperator to copy MongoDB collections to GCS (#66013)
* Add MongoToGCSOperator to copy MongoDB collections to GCS
Introduces a new transfer operator under the Google provider that exports
MongoDB documents to Google Cloud Storage in JSON, CSV, or Parquet format.
The operator extends BaseSQLToGCSOperator and accepts either a find filter
or an aggregation pipeline through mongo_query, with optional projection
and allowDiskUse controls.
A small cursor adapter wraps the pymongo cursor as a DB-API style cursor
so the parent operator chunking, schema inference, and upload flow can
be reused. BSON-specific values (ObjectId, Decimal128, bytes) are
converted to BigQuery-friendly types.
Includes provider.yaml registration, mongo provider as a transitive extra,
unit tests, system test example, and how-to documentation.
* Fix CI failures for MongoToGCSOperator PR
- mongo_to_gcs.py: declare cursor as Any so the find/aggregate
branches don't trip mypy's stricter type narrowing.
- test_selective_checks: add mongo to google-related expected
provider lists, now that mongo is a google cross-provider dep.
- get_provider_info.py: register the mongo_to_gcs transfer so the
provider build-files check stays in sync with provider.yaml.
- uv.lock: refresh to add the mongo extra and pick up an
unrelated dev/registry tomli dep that was already on main.
* Regenerate google docs/index.rst for new mongo cross-provider dep
The update-providers-build-files prek hook also regenerates the
google provider docs/index.rst from provider.yaml. The previous
fixup commit added mongo to provider.yaml/pyproject.toml but did
not include the docs/index.rst line, so the CI hook kept failing.
* Address review feedback on MongoToGCSOperator
- Document why MongoToGCSOperator reuses the SQL-to-GCS base class despite
MongoDB being a NoSQL store, and note a possible future
BaseNoSQLToGCSOperator.
- Drop the inherited but unused `sql` template field (and the `.sql`
template
extension / sql renderer) so the empty `sql` value is no longer exposed in
rendered templates; this operator is driven by `mongo_query`.
- Use `autospec=True` when patching MongoHook/GCSHook in the unit tests.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
dev/breeze/tests/test_selective_checks.py | 8 +-
providers/google/docs/index.rst | 1 +
.../docs/operators/transfer/mongo_to_gcs.rst | 68 +++++
providers/google/provider.yaml | 4 +
providers/google/pyproject.toml | 4 +
.../google/cloud/transfers/mongo_to_gcs.py | 225 +++++++++++++++++
.../airflow/providers/google/get_provider_info.py | 6 +
.../google/cloud/transfers/example_mongo_to_gcs.py | 111 +++++++++
.../google/cloud/transfers/test_mongo_to_gcs.py | 273 +++++++++++++++++++++
uv.lock | 8 +-
10 files changed, 703 insertions(+), 5 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 622041285ac..f678f056db2 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2376,7 +2376,7 @@ def test_expected_output_push(
{
"selected-providers-list-as-string": "amazon apache.cassandra
apache.kafka "
"cncf.kubernetes common.compat common.messaging common.sql
databricks "
- "facebook google hashicorp http microsoft.azure
microsoft.mssql mysql "
+ "facebook google hashicorp http microsoft.azure
microsoft.mssql mongo mysql "
"openlineage oracle postgres presto salesforce samba sftp ssh
standard trino",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2388,7 +2388,7 @@ def test_expected_output_push(
"docs-build": "true",
"docs-list-as-string": "apache-airflow helm-chart amazon
apache.cassandra "
"apache.kafka cncf.kubernetes common.compat common.messaging
common.sql databricks facebook google hashicorp http microsoft.azure "
- "microsoft.mssql mysql openlineage oracle postgres "
+ "microsoft.mssql mongo mysql openlineage oracle postgres "
"presto salesforce samba sftp ssh standard trino",
"skip-prek-hooks": (
"identity,ktlint,mypy-airflow-ctl,mypy-airflow-ctl-tests,mypy-airflow-e2e-tests,"
@@ -2411,7 +2411,7 @@ def test_expected_output_push(
"description": "amazon...standard",
"test_types": "Providers[amazon]
Providers[apache.cassandra,"
"apache.kafka,cncf.kubernetes,common.compat,common.messaging,common.sql,databricks,facebook,"
-
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
+
"hashicorp,http,microsoft.azure,microsoft.mssql,mongo,mysql,"
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
"Providers[google] "
"Providers[standard]",
@@ -2682,7 +2682,7 @@ def test_upgrade_to_newer_dependencies(
{
"docs-list-as-string": "amazon apache.cassandra apache.kafka "
"cncf.kubernetes common.compat common.messaging common.sql
databricks facebook google hashicorp http "
- "microsoft.azure microsoft.mssql mysql openlineage oracle "
+ "microsoft.azure microsoft.mssql mongo mysql openlineage
oracle "
"postgres presto salesforce samba sftp ssh standard trino",
},
id="Google provider docs changed",
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 5f5cf23705e..b1f44df8006 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -217,6 +217,7 @@ Dependent package
`apache-airflow-providers-http
<https://airflow.apache.org/docs/apache-airflow-providers-http>`_
``http``
`apache-airflow-providers-microsoft-azure
<https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure>`_
``microsoft.azure``
`apache-airflow-providers-microsoft-mssql
<https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql>`_
``microsoft.mssql``
+`apache-airflow-providers-mongo
<https://airflow.apache.org/docs/apache-airflow-providers-mongo>`_
``mongo``
`apache-airflow-providers-mysql
<https://airflow.apache.org/docs/apache-airflow-providers-mysql>`_
``mysql``
`apache-airflow-providers-openlineage
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_
``openlineage``
`apache-airflow-providers-oracle
<https://airflow.apache.org/docs/apache-airflow-providers-oracle>`_
``oracle``
diff --git a/providers/google/docs/operators/transfer/mongo_to_gcs.rst
b/providers/google/docs/operators/transfer/mongo_to_gcs.rst
new file mode 100644
index 00000000000..eec4741cf17
--- /dev/null
+++ b/providers/google/docs/operators/transfer/mongo_to_gcs.rst
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+MongoDB To Google Cloud Storage Operator
+========================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service
is
+used to store large data from various applications. This page shows how to copy
+data from MongoDB to GCS.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:MongoToGCSOperator:
+
+MongoToGCSOperator
+~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoToGCSOperator`
allows you to upload
+data from a MongoDB collection to GCS in JSON, CSV, or Parquet format.
+
+The operator accepts either a ``find()`` filter (a ``dict``) or an aggregation
pipeline (a ``list``)
+through the ``mongo_query`` parameter. When ``mongo_query`` is a dict,
``mongo_projection`` may be
+used to limit the fields returned. When ``mongo_query`` is a list, the value
is passed as an
+aggregation pipeline and ``mongo_projection`` is ignored.
+
+The schema is derived from the first document in the result set, so all
documents are expected to
+share a consistent shape; missing fields are exported as ``null``.
+
+Below is an example of using this operator to export the result of a
``find()`` query to GCS.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_mongo_to_gcs]
+ :end-before: [END howto_operator_mongo_to_gcs]
+
+The operator also supports running an aggregation pipeline.
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_mongo_to_gcs_aggregation]
+ :end-before: [END howto_operator_mongo_to_gcs_aggregation]
+
+
+Reference
+---------
+
+For further information, look at:
+
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
+* `MongoDB Documentation <https://www.mongodb.com/docs/>`__
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index c89f9424c23..dff2d84481f 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -1010,6 +1010,10 @@ transfers:
- source-integration-name: Apache Cassandra
target-integration-name: Google Cloud Storage (GCS)
python-module: airflow.providers.google.cloud.transfers.cassandra_to_gcs
+ - source-integration-name: MongoDB
+ target-integration-name: Google Cloud Storage (GCS)
+ how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/mongo_to_gcs.rst
+ python-module: airflow.providers.google.cloud.transfers.mongo_to_gcs
- source-integration-name: Google Calendar
target-integration-name: Google Cloud Storage (GCS)
how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/calendar_to_gcs.rst
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index 679785c778d..756d96deaff 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -178,6 +178,9 @@ dependencies = [
"microsoft.mssql" = [
"apache-airflow-providers-microsoft-mssql"
]
+"mongo" = [
+ "apache-airflow-providers-mongo"
+]
"mysql" = [
"apache-airflow-providers-mysql"
]
@@ -227,6 +230,7 @@ dev = [
"apache-airflow-providers-http",
"apache-airflow-providers-microsoft-azure",
"apache-airflow-providers-microsoft-mssql",
+ "apache-airflow-providers-mongo",
"apache-airflow-providers-mysql",
"apache-airflow-providers-openlineage",
"apache-airflow-providers-oracle",
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py
new file mode 100644
index 00000000000..0a3c9c437a3
--- /dev/null
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/mongo_to_gcs.py
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""MongoDB to GCS operator."""
+
+from __future__ import annotations
+
+import base64
+import json
+from collections.abc import Iterator, Sequence
+from datetime import date, datetime, time
+from decimal import Decimal
+from functools import cached_property
+from typing import Any
+
+from bson import ObjectId
+from bson.decimal128 import Decimal128
+
+from airflow.providers.google.cloud.transfers.sql_to_gcs import
BaseSQLToGCSOperator
+from airflow.providers.mongo.hooks.mongo import MongoHook
+
+
+class _MongoCursorAdapter:
+ """
+ Wrap a pymongo cursor as a DB-API 2.0 style cursor.
+
+ ``BaseSQLToGCSOperator`` consumes ``cursor.description`` to derive the
+ BigQuery schema and iterates the cursor expecting tuples. MongoDB documents
+ are dict-shaped and have no fixed schema; this adapter:
+
+ * Peeks the first document to derive ``description`` (column names and
+ Python types).
+ * Re-yields the first document, then iterates the rest.
+ * Converts each document to a tuple in the derived column order, filling
+ missing fields with ``None``.
+ """
+
+ def __init__(self, cursor: Any) -> None:
+ self._cursor = iter(cursor)
+ self._first: dict | None = None
+ self._description: list[tuple] = []
+ try:
+ self._first = next(self._cursor)
+ except StopIteration:
+ return
+ self._description = [
+ (name, type(value), None, None, None, None, True) for name, value
in self._first.items()
+ ]
+
+ @property
+ def description(self) -> list[tuple]:
+ return self._description
+
+ def __iter__(self) -> Iterator[tuple]:
+ if self._first is None:
+ return
+ names = [d[0] for d in self._description]
+ yield tuple(self._first.get(n) for n in names)
+ for doc in self._cursor:
+ yield tuple(doc.get(n) for n in names)
+
+
+class MongoToGCSOperator(BaseSQLToGCSOperator):
+ """
+ Copy data from MongoDB to Google Cloud Storage in JSON, CSV or Parquet
format.
+
+ .. note::
+ MongoDB is a NoSQL store, so subclassing ``BaseSQLToGCSOperator`` is a
+ deliberate reuse choice rather than a natural fit. The base class
already
+ implements the chunking, schema inference and GCS upload flow we want;
this
+ operator reuses it by adapting the pymongo cursor to a DB-API style
cursor
+ (see :class:`_MongoCursorAdapter`) and overriding ``query`` /
+ ``field_to_bigquery`` / ``convert_type``. A dedicated
+ ``BaseNoSQLToGCSOperator`` could be a cleaner home for this in the
future.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:MongoToGCSOperator`
+
+ :param mongo_conn_id: Reference to a specific
+ :ref:`Mongo connection <howto/connection:mongo>`.
+ :param mongo_db: The MongoDB database name.
+ :param mongo_collection: The MongoDB collection name.
+ :param mongo_query: A MongoDB find filter (``dict``) or aggregation
pipeline
+ (``list``). Defaults to ``{}`` (match all).
+ :param mongo_projection: Optional projection passed to ``find()``. Ignored
+ when ``mongo_query`` is an aggregation pipeline. Accepts a dict
+ (``{"field": 1}``) or list of field names.
+ :param allow_disk_use: Whether to pass ``allowDiskUse=True`` to
+ ``aggregate()``. Defaults to True.
+ """
+
+ ui_color = "#a0e08c"
+
+ # ``sql``, ``template_ext`` (``.sql``) and the ``sql`` renderer are
inherited
+ # from ``BaseSQLToGCSOperator`` but are meaningless here — this operator is
+ # driven by ``mongo_query``, not SQL. Override them explicitly so the
unused
+ # ``sql`` field is not exposed in rendered templates.
+ template_fields: Sequence[str] = (
+ "bucket",
+ "filename",
+ "schema_filename",
+ "schema",
+ "parameters",
+ "impersonation_chain",
+ "partition_columns",
+ "mongo_collection",
+ "mongo_db",
+ "mongo_query",
+ )
+ template_ext: Sequence[str] = ()
+ template_fields_renderers = {}
+
+ type_map: dict[type, str] = {
+ bool: "BOOL",
+ int: "INTEGER",
+ float: "FLOAT",
+ Decimal: "FLOAT",
+ Decimal128: "FLOAT",
+ str: "STRING",
+ bytes: "BYTES",
+ ObjectId: "STRING",
+ datetime: "TIMESTAMP",
+ date: "DATE",
+ time: "TIME",
+ list: "STRING",
+ dict: "STRING",
+ }
+
+ def __init__(
+ self,
+ *,
+ mongo_conn_id: str = "mongo_default",
+ mongo_db: str,
+ mongo_collection: str,
+ mongo_query: dict | list | None = None,
+ mongo_projection: dict | list | None = None,
+ allow_disk_use: bool = True,
+ **kwargs: Any,
+ ) -> None:
+ # `sql` is required by BaseSQLToGCSOperator but is unused here.
+ kwargs.setdefault("sql", "")
+ super().__init__(**kwargs)
+ self.mongo_conn_id = mongo_conn_id
+ self.mongo_db = mongo_db
+ self.mongo_collection = mongo_collection
+ self.mongo_query = mongo_query if mongo_query is not None else {}
+ self.mongo_projection = mongo_projection
+ self.allow_disk_use = allow_disk_use
+
+ @cached_property
+ def db_hook(self) -> MongoHook:
+ return MongoHook(mongo_conn_id=self.mongo_conn_id)
+
+ def query(self) -> _MongoCursorAdapter:
+ """Execute the configured find/aggregate and return a DB-API style
cursor."""
+ coll = self.db_hook.get_conn()[self.mongo_db][self.mongo_collection]
+ cursor: Any
+ if isinstance(self.mongo_query, list):
+ self.log.info("Executing aggregate: %s", self.mongo_query)
+ cursor = coll.aggregate(self.mongo_query,
allowDiskUse=self.allow_disk_use)
+ else:
+ self.log.info(
+ "Executing find: filter=%s projection=%s",
+ self.mongo_query,
+ self.mongo_projection,
+ )
+ cursor = coll.find(self.mongo_query,
projection=self.mongo_projection)
+ return _MongoCursorAdapter(cursor)
+
+ def field_to_bigquery(self, field) -> dict[str, str]:
+ return {
+ "name": field[0],
+ "type": self.type_map.get(field[1], "STRING"),
+ "mode": "NULLABLE",
+ }
+
+ def convert_type(self, value: Any, schema_type: str | None, **kwargs: Any)
-> Any:
+ """
+ Convert pymongo values to BigQuery-friendly types.
+
+ * ``ObjectId`` -> ``str``.
+ * ``Decimal128`` / ``Decimal`` -> ``float``.
+ * ``bytes`` -> base64-encoded ``str`` (or ``int`` when
+ ``schema_type == 'INTEGER'``).
+ * ``datetime`` -> ``str(value)``.
+ * ``date`` -> ISO date string when ``schema_type == 'DATE'``, otherwise
+ combined ``datetime`` string.
+ * ``list`` / ``dict`` / ``tuple`` -> JSON string.
+ """
+ if value is None:
+ return value
+ if isinstance(value, ObjectId):
+ return str(value)
+ if isinstance(value, Decimal128):
+ return float(value.to_decimal())
+ if isinstance(value, Decimal):
+ return float(value)
+ if isinstance(value, bytes):
+ if schema_type == "INTEGER":
+ return int.from_bytes(value, "big")
+ return base64.standard_b64encode(value).decode("ascii") #
type:ignore
+ if isinstance(value, datetime):
+ return str(value)
+ if isinstance(value, date):
+ if schema_type == "DATE":
+ return value.isoformat()
+ return str(datetime.combine(value, time.min))
+ if isinstance(value, (list, dict, tuple)):
+ return json.dumps(value, default=str)
+ return value
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py
b/providers/google/src/airflow/providers/google/get_provider_info.py
index ec6f630a162..61571bc6831 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -1196,6 +1196,12 @@ def get_provider_info():
"target-integration-name": "Google Cloud Storage (GCS)",
"python-module":
"airflow.providers.google.cloud.transfers.cassandra_to_gcs",
},
+ {
+ "source-integration-name": "MongoDB",
+ "target-integration-name": "Google Cloud Storage (GCS)",
+ "how-to-guide":
"/docs/apache-airflow-providers-google/operators/transfer/mongo_to_gcs.rst",
+ "python-module":
"airflow.providers.google.cloud.transfers.mongo_to_gcs",
+ },
{
"source-integration-name": "Google Calendar",
"target-integration-name": "Google Cloud Storage (GCS)",
diff --git
a/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
b/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
new file mode 100644
index 00000000000..8d33c5af44c
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/transfers/example_mongo_to_gcs.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use MongoToGCSOperator.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.transfers.mongo_to_gcs import
MongoToGCSOperator
+
+try:
+ from airflow.sdk import TriggerRule
+except ImportError:
+ # Compatibility for Airflow < 3.1
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+DAG_ID = "mongo_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "mongo_export_{}.ndjson"
+SCHEMA_FILE_NAME = "mongo_export_schema.json"
+
+MONGO_CONN_ID = os.environ.get("SYSTEM_TESTS_MONGO_CONN_ID", "mongo_default")
+MONGO_DATABASE = os.environ.get("SYSTEM_TESTS_MONGO_DATABASE", "test_db")
+MONGO_COLLECTION = os.environ.get("SYSTEM_TESTS_MONGO_COLLECTION",
"test_collection")
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "mongo", "gcs"],
+) as dag:
+ create_gcs_bucket = GCSCreateBucketOperator(
+ task_id="create_gcs_bucket",
+ bucket_name=BUCKET_NAME,
+ )
+
+ # [START howto_operator_mongo_to_gcs]
+ mongo_to_gcs_find = MongoToGCSOperator(
+ task_id="mongo_to_gcs_find",
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DATABASE,
+ mongo_collection=MONGO_COLLECTION,
+ mongo_query={"status": "active"},
+ mongo_projection={"_id": 1, "name": 1, "status": 1},
+ bucket=BUCKET_NAME,
+ filename=FILE_NAME,
+ schema_filename=SCHEMA_FILE_NAME,
+ export_format="json",
+ )
+ # [END howto_operator_mongo_to_gcs]
+
+ # [START howto_operator_mongo_to_gcs_aggregation]
+ mongo_to_gcs_aggregate = MongoToGCSOperator(
+ task_id="mongo_to_gcs_aggregate",
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DATABASE,
+ mongo_collection=MONGO_COLLECTION,
+ mongo_query=[
+ {"$match": {"status": "active"}},
+ {"$group": {"_id": "$category", "total": {"$sum": 1}}},
+ ],
+ bucket=BUCKET_NAME,
+ filename="mongo_aggregate_{}.ndjson",
+ export_format="json",
+ )
+ # [END howto_operator_mongo_to_gcs_aggregation]
+
+ delete_gcs_bucket = GCSDeleteBucketOperator(
+ task_id="delete_gcs_bucket",
+ bucket_name=BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ create_gcs_bucket >> [mongo_to_gcs_find, mongo_to_gcs_aggregate] >>
delete_gcs_bucket
+
+ from tests_common.test_utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
contributing-docs/testing/system_tests.rst)
+test_run = get_test_run(dag)
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py
new file mode 100644
index 00000000000..020266d6809
--- /dev/null
+++ b/providers/google/tests/unit/google/cloud/transfers/test_mongo_to_gcs.py
@@ -0,0 +1,273 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+from decimal import Decimal
+from unittest import mock
+
+import pytest
+from bson import ObjectId
+from bson.decimal128 import Decimal128
+
+from airflow.providers.google.cloud.transfers.mongo_to_gcs import (
+ MongoToGCSOperator,
+ _MongoCursorAdapter,
+)
+
+TASK_ID = "test-mongo-to-gcs"
+MONGO_CONN_ID = "mongo_test"
+MONGO_DB = "test_db"
+MONGO_COLLECTION = "test_collection"
+BUCKET = "gs://test"
+JSON_FILENAME = "test_{}.ndjson"
+
+
+class TestMongoCursorAdapter:
+ def test_empty_cursor(self):
+ adapter = _MongoCursorAdapter(iter([]))
+ assert adapter.description == []
+ assert list(adapter) == []
+
+ def test_description_derived_from_first_doc(self):
+ docs = [{"name": "alice", "age": 30}, {"name": "bob", "age": 25}]
+ adapter = _MongoCursorAdapter(iter(docs))
+
+ names = [d[0] for d in adapter.description]
+ types = [d[1] for d in adapter.description]
+ assert names == ["name", "age"]
+ assert types == [str, int]
+
+ def test_iteration_returns_tuples_in_description_order(self):
+ docs = [{"name": "alice", "age": 30}, {"name": "bob", "age": 25}]
+ adapter = _MongoCursorAdapter(iter(docs))
+
+ rows = list(adapter)
+ assert rows == [("alice", 30), ("bob", 25)]
+
+ def test_missing_field_filled_with_none(self):
+ docs = [{"name": "alice", "age": 30}, {"name": "bob"}]
+ adapter = _MongoCursorAdapter(iter(docs))
+
+ rows = list(adapter)
+ assert rows == [("alice", 30), ("bob", None)]
+
+
+class TestMongoToGCSOperator:
+ def test_init(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.task_id == TASK_ID
+ assert op.mongo_conn_id == MONGO_CONN_ID
+ assert op.mongo_db == MONGO_DB
+ assert op.mongo_collection == MONGO_COLLECTION
+ assert op.mongo_query == {}
+ assert op.mongo_projection is None
+ assert op.allow_disk_use is True
+ assert op.bucket == BUCKET
+ assert op.filename == JSON_FILENAME
+
+ def test_field_to_bigquery_known_type(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.field_to_bigquery(("col", int, None, None, None, None,
True)) == {
+ "name": "col",
+ "type": "INTEGER",
+ "mode": "NULLABLE",
+ }
+
+ def test_field_to_bigquery_unknown_type_defaults_to_string(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+
+ class _Custom:
+ pass
+
+ assert op.field_to_bigquery(("col", _Custom, None, None, None, None,
True))["type"] == "STRING"
+
+ @pytest.mark.parametrize(
+ ("value", "schema_type", "expected"),
+ [
+ (None, None, None),
+ ("text", None, "text"),
+ (42, None, 42),
+ (3.14, None, 3.14),
+ (True, None, True),
+ (Decimal("1.5"), None, 1.5),
+ (datetime.datetime(2023, 1, 2, 3, 4, 5), None, "2023-01-02
03:04:05"),
+ (datetime.date(2023, 1, 2), "DATE", "2023-01-02"),
+ (datetime.date(2023, 1, 2), None, "2023-01-02 00:00:00"),
+ ([1, 2, 3], None, "[1, 2, 3]"),
+ ({"a": 1}, None, '{"a": 1}'),
+ ((1, 2), None, "[1, 2]"),
+ ],
+ )
+ def test_convert_type_basic(self, value, schema_type, expected):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.convert_type(value, schema_type) == expected
+
+ def test_convert_type_object_id(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ oid = ObjectId("507f1f77bcf86cd799439011")
+ assert op.convert_type(oid, None) == "507f1f77bcf86cd799439011"
+
+ def test_convert_type_decimal128(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.convert_type(Decimal128("3.14"), None) == 3.14
+
+ def test_convert_type_bytes_default_base64(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.convert_type(b"hello", None) == "aGVsbG8="
+
+ def test_convert_type_bytes_as_integer(self):
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ assert op.convert_type(b"\x00\x01", "INTEGER") == 1
+
+
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook",
autospec=True)
+ def test_query_with_find_filter(self, mock_hook_class):
+ mock_collection = mock.MagicMock()
+
mock_hook_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
= mock_collection
+ mock_collection.find.return_value = iter([{"_id": ObjectId(), "name":
"alice"}])
+
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ mongo_query={"name": "alice"},
+ mongo_projection={"name": 1},
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+
+ cursor = op.query()
+ mock_collection.find.assert_called_once_with({"name": "alice"},
projection={"name": 1})
+ mock_collection.aggregate.assert_not_called()
+ assert isinstance(cursor, _MongoCursorAdapter)
+
+
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook",
autospec=True)
+ def test_query_with_aggregation_pipeline(self, mock_hook_class):
+ mock_collection = mock.MagicMock()
+
mock_hook_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
= mock_collection
+ mock_collection.aggregate.return_value = iter([{"_id": "alice",
"count": 1}])
+
+ pipeline = [{"$match": {"active": True}}, {"$group": {"_id": "$name",
"count": {"$sum": 1}}}]
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ mongo_query=pipeline,
+ allow_disk_use=False,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+
+ cursor = op.query()
+ mock_collection.aggregate.assert_called_once_with(pipeline,
allowDiskUse=False)
+ mock_collection.find.assert_not_called()
+ assert isinstance(cursor, _MongoCursorAdapter)
+
+
@mock.patch("airflow.providers.google.cloud.transfers.mongo_to_gcs.MongoHook",
autospec=True)
+ @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook",
autospec=True)
+ def test_execute_uploads_to_gcs(self, gcs_hook_mock_class,
mongo_hook_mock_class):
+ mock_collection = mock.MagicMock()
+
mongo_hook_mock_class.return_value.get_conn.return_value.__getitem__.return_value.__getitem__.return_value
= mock_collection
+ mock_collection.find.return_value = iter(
+ [
+ {"name": "alice", "age": 30},
+ {"name": "bob", "age": 25},
+ ]
+ )
+
+ gcs_hook_mock = gcs_hook_mock_class.return_value
+ upload_called = False
+
+ def _assert_upload(bucket, obj, tmp_filename, mime_type=None,
gzip=False, metadata=None):
+ nonlocal upload_called
+ upload_called = True
+ assert bucket == BUCKET
+ assert obj == JSON_FILENAME.format(0)
+ assert mime_type == "application/json"
+ with open(tmp_filename, "rb") as fh:
+ lines = fh.read().splitlines()
+ assert lines == [
+ b'{"age": 30, "name": "alice"}',
+ b'{"age": 25, "name": "bob"}',
+ ]
+
+ gcs_hook_mock.upload.side_effect = _assert_upload
+
+ op = MongoToGCSOperator(
+ task_id=TASK_ID,
+ mongo_conn_id=MONGO_CONN_ID,
+ mongo_db=MONGO_DB,
+ mongo_collection=MONGO_COLLECTION,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ )
+ op.execute(None)
+
+ assert upload_called, "Expected GCS upload to be called"
+
mongo_hook_mock_class.assert_called_once_with(mongo_conn_id=MONGO_CONN_ID)
diff --git a/uv.lock b/uv.lock
index 8a0780cf8a5..35ea5e56476 100644
--- a/uv.lock
+++ b/uv.lock
@@ -5489,6 +5489,9 @@ microsoft-azure = [
microsoft-mssql = [
{ name = "apache-airflow-providers-microsoft-mssql" },
]
+mongo = [
+ { name = "apache-airflow-providers-mongo" },
+]
mysql = [
{ name = "apache-airflow-providers-mysql" },
]
@@ -5535,6 +5538,7 @@ dev = [
{ name = "apache-airflow-providers-http" },
{ name = "apache-airflow-providers-microsoft-azure" },
{ name = "apache-airflow-providers-microsoft-mssql" },
+ { name = "apache-airflow-providers-mongo" },
{ name = "apache-airflow-providers-mysql" },
{ name = "apache-airflow-providers-openlineage" },
{ name = "apache-airflow-providers-oracle" },
@@ -5565,6 +5569,7 @@ requires-dist = [
{ name = "apache-airflow-providers-http", marker = "extra == 'http'",
editable = "providers/http" },
{ name = "apache-airflow-providers-microsoft-azure", marker = "extra ==
'microsoft-azure'", editable = "providers/microsoft/azure" },
{ name = "apache-airflow-providers-microsoft-mssql", marker = "extra ==
'microsoft-mssql'", editable = "providers/microsoft/mssql" },
+ { name = "apache-airflow-providers-mongo", marker = "extra == 'mongo'",
editable = "providers/mongo" },
{ name = "apache-airflow-providers-mysql", marker = "extra == 'mysql'",
editable = "providers/mysql" },
{ name = "apache-airflow-providers-openlineage", marker = "extra ==
'openlineage'", editable = "providers/openlineage" },
{ name = "apache-airflow-providers-oracle", marker = "extra == 'oracle'",
editable = "providers/oracle" },
@@ -5649,7 +5654,7 @@ requires-dist = [
{ name = "tenacity", specifier = ">=8.3.0" },
{ name = "types-protobuf", specifier = ">=5.27.0,!=5.29.1.20250402" },
]
-provides-extras = ["cncf-kubernetes", "fab", "leveldb", "oracle", "facebook",
"amazon", "apache-cassandra", "microsoft-azure", "microsoft-mssql", "mysql",
"openlineage", "postgres", "presto", "salesforce", "sftp", "ssh", "trino",
"http", "standard", "common-messaging"]
+provides-extras = ["cncf-kubernetes", "fab", "leveldb", "oracle", "facebook",
"amazon", "apache-cassandra", "microsoft-azure", "microsoft-mssql", "mongo",
"mysql", "openlineage", "postgres", "presto", "salesforce", "sftp", "ssh",
"trino", "http", "standard", "common-messaging"]
[package.metadata.requires-dev]
dev = [
@@ -5667,6 +5672,7 @@ dev = [
{ name = "apache-airflow-providers-http", editable = "providers/http" },
{ name = "apache-airflow-providers-microsoft-azure", editable =
"providers/microsoft/azure" },
{ name = "apache-airflow-providers-microsoft-mssql", editable =
"providers/microsoft/mssql" },
+ { name = "apache-airflow-providers-mongo", editable = "providers/mongo" },
{ name = "apache-airflow-providers-mysql", editable = "providers/mysql" },
{ name = "apache-airflow-providers-openlineage", editable =
"providers/openlineage" },
{ name = "apache-airflow-providers-oracle", editable = "providers/oracle"
},