This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 51584b8c371 Introduce gcp translation(V3), translate document 
providers (#44971)
51584b8c371 is described below

commit 51584b8c371263bf34725afedd3c2f1b35468a8e
Author: olegkachur-e <[email protected]>
AuthorDate: Wed Dec 18 11:01:04 2024 +0100

    Introduce gcp translation(V3), translate document providers (#44971)
    
    - Add TranslateDocumentOperator and TranslateDocumentBatchOperator
    operators.
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../operators/cloud/translate.rst                  |  42 +++
 .../providers/google/cloud/hooks/translate.py      | 201 +++++++++++++
 .../providers/google/cloud/links/translate.py      |  35 +++
 .../providers/google/cloud/operators/translate.py  | 332 ++++++++++++++++++++-
 .../src/airflow/providers/google/provider.yaml     |   1 +
 .../tests/google/cloud/operators/test_translate.py | 172 ++++++++++-
 .../cloud/translate/example_translate_document.py  | 131 ++++++++
 7 files changed, 910 insertions(+), 4 deletions(-)

diff --git a/docs/apache-airflow-providers-google/operators/cloud/translate.rst 
b/docs/apache-airflow-providers-google/operators/cloud/translate.rst
index 5bda3d9085a..4b1cee34617 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/translate.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/translate.rst
@@ -247,6 +247,48 @@ Basic usage of the operator:
     :end-before: [END howto_operator_translate_automl_delete_model]
 
 
+.. _howto/operator:TranslateDocumentOperator:
+
+TranslateDocumentOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^
+Translate Document using Cloud Translate API (Advanced V3).
+
+For parameter definition, take a look at
+:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentOperator`
+
+Using the operator
+""""""""""""""""""
+
+Basic usage of the operator:
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/translate/example_translate_document.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_translate_document]
+    :end-before: [END howto_operator_translate_document]
+
+
+.. _howto/operator:TranslateDocumentBatchOperator:
+
+TranslateDocumentBatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Translate Documents using Cloud Translate API (Advanced V3), by given input 
configs.
+
+For parameter definition, take a look at
+:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentBatchOperator`
+
+Using the operator
+""""""""""""""""""
+
+Basic usage of the operator:
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/translate/example_translate_document.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_translate_document_batch]
+    :end-before: [END howto_operator_translate_document_batch]
+
+
 More information
 """"""""""""""""""
 See:
diff --git a/providers/src/airflow/providers/google/cloud/hooks/translate.py 
b/providers/src/airflow/providers/google/cloud/hooks/translate.py
index 43e0c15774b..cf9a748d1a2 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/translate.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/translate.py
@@ -39,9 +39,14 @@ if TYPE_CHECKING:
     from google.api_core.operation import Operation
     from google.cloud.translate_v3.services.translation_service import pagers
     from google.cloud.translate_v3.types import (
+        BatchDocumentInputConfig,
+        BatchDocumentOutputConfig,
         DatasetInputConfig,
+        DocumentInputConfig,
+        DocumentOutputConfig,
         InputConfig,
         OutputConfig,
+        TranslateDocumentResponse,
         TranslateTextGlossaryConfig,
         TransliterationConfig,
         automl_translation,
@@ -714,3 +719,199 @@ class TranslateHook(GoogleBaseHook):
             metadata=metadata,
         )
         return result
+
+    def translate_document(
+        self,
+        *,
+        project_id: str = PROVIDE_PROJECT_ID,
+        source_language_code: str | None = None,
+        target_language_code: str,
+        location: str | None = None,
+        document_input_config: DocumentInputConfig | dict,
+        document_output_config: DocumentOutputConfig | dict | None,
+        customized_attribution: str | None = None,
+        is_translate_native_pdf_only: bool = False,
+        enable_shadow_removal_native_pdf: bool = False,
+        enable_rotation_correction: bool = False,
+        model: str | None = None,
+        glossary_config: TranslateTextGlossaryConfig | None = None,
+        labels: str | None = None,
+        timeout: float | _MethodDefault = DEFAULT,
+        metadata: Sequence[tuple[str, str]] = (),
+        retry: Retry | _MethodDefault | None = DEFAULT,
+    ) -> TranslateDocumentResponse:
+        """
+        Translate the document provided.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param source_language_code: Optional. The ISO-639 language code of the
+            input document text if known. If the source language isn't 
specified,
+            the API attempts to identify the source language automatically and 
returns
+            the source language within the response.
+        :param target_language_code: Required. The ISO-639 language code to use
+            for translation of the input document text.
+        :param location: Optional. Project or location to make a call. Must 
refer to
+            a caller's project.
+            If not specified, 'global' is used.
+            Non-global location is required for requests using AutoML models 
or custom glossaries.
+            Models and glossaries must be within the same region (have the 
same location-id).
+        :param document_input_config: A document translation request input 
config.
+        :param document_output_config: Optional. A document translation 
request output config.
+            If not provided the translated file will only be returned through 
a byte-stream
+            and its output mime type will be the same as the input file's mime 
type.
+        :param customized_attribution: Optional. This flag is to support user 
customized
+            attribution. If not provided, the default is ``Machine Translated 
by Google``.
+            Customized attribution should follow rules in
+            
https://cloud.google.com/translate/attribution#attribution_and_logos
+        :param is_translate_native_pdf_only: Optional. Param for external
+            customers. If true, the page limit of online native PDF
+            translation is 300 and only native PDF pages will be
+            translated.
+        :param enable_shadow_removal_native_pdf: Optional. If true, use the 
text removal server to remove the
+            shadow text on background image for native PDF translation.
+            Shadow removal feature can only be enabled when both 
``is_translate_native_pdf_only``,
+            ``pdf_native_only`` are False.
+        :param enable_rotation_correction: Optional. If true, enable auto 
rotation
+            correction in DVS.
+        :param model: Optional. The ``model`` type requested for this 
translation.
+            If not provided, the default Google model (NMT) will be used.
+            The format depends on model type:
+
+            -  AutoML Translation models:
+               
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+            -  General (built-in) models:
+               
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+            If not provided, the default Google model (NMT) will be used
+            for translation.
+        :param glossary_config: Optional. Glossary to be applied. The glossary 
must be
+            within the same region (have the same location-id) as the
+            model.
+        :param labels: Optional. The labels with user-defined
+            metadata for the request.
+            See https://cloud.google.com/translate/docs/advanced/labels for 
more information.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+
+        :return: Translate document result from the API response.
+        """
+        client = self.get_client()
+        location_id = "global" if not location else location
+        parent = f"projects/{project_id or 
self.project_id}/locations/{location_id}"
+        return client.translate_document(
+            request={
+                "parent": parent,
+                "source_language_code": source_language_code,
+                "target_language_code": target_language_code,
+                "document_input_config": document_input_config,
+                "document_output_config": document_output_config,
+                "customized_attribution": customized_attribution,
+                "is_translate_native_pdf_only": is_translate_native_pdf_only,
+                "enable_shadow_removal_native_pdf": 
enable_shadow_removal_native_pdf,
+                "enable_rotation_correction": enable_rotation_correction,
+                "model": model,
+                "glossary_config": glossary_config,
+                "labels": labels,
+            },
+            timeout=timeout,
+            retry=retry,
+            metadata=metadata,
+        )
+
+    def batch_translate_document(
+        self,
+        *,
+        project_id: str = PROVIDE_PROJECT_ID,
+        source_language_code: str,
+        target_language_codes: MutableSequence[str] | None = None,
+        location: str | None = None,
+        input_configs: MutableSequence[BatchDocumentInputConfig | dict],
+        output_config: BatchDocumentOutputConfig | dict,
+        customized_attribution: str | None = None,
+        format_conversions: MutableMapping[str, str] | None = None,
+        enable_shadow_removal_native_pdf: bool = False,
+        enable_rotation_correction: bool = False,
+        models: MutableMapping[str, str] | None = None,
+        glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None = 
None,
+        timeout: float | _MethodDefault = DEFAULT,
+        metadata: Sequence[tuple[str, str]] = (),
+        retry: Retry | _MethodDefault | None = DEFAULT,
+    ) -> Operation:
+        """
+        Translate documents batch by configs provided.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the service belongs to.
+        :param source_language_code: Optional. The ISO-639 language code of the
+            input text if known. If the source language isn't specified, the 
API attempts to identify
+            the source language automatically and returns the source language 
within the response.
+        :param target_language_codes: Required. The ISO-639 language code to 
use
+            for translation of the input document. Specify up to 10 language 
codes here.
+        :param location: Optional. Project or location to make a call. Must 
refer to
+            a caller's project. If not specified, 'global' is used.
+            Non-global location is required for requests using AutoML models 
or custom glossaries.
+            Models and glossaries must be within the same region (have the 
same location-id).
+        :param input_configs: Input configurations. The total number of files 
matched should be <=
+            100. The total content size to translate should be <= 100M Unicode 
codepoints.
+            The files must use UTF-8 encoding.
+        :param output_config: Output configuration. If 2 input configs match 
to the same file (that
+            is, same input path), no output for duplicate inputs will be 
generated.
+        :param format_conversions: Optional. The file format conversion map 
that is applied to
+            all input files. The map key is the original mime_type.
+            The map value is the target mime_type of translated documents.
+            Supported file format conversion includes:
+
+            -  ``application/pdf`` to
+               
``application/vnd.openxmlformats-officedocument.wordprocessingml.document``
+
+            If nothing specified, output files will be in the same format as 
the original file.
+        :param customized_attribution: Optional. This flag is to support user 
customized
+            attribution. If not provided, the default is ``Machine Translated 
by Google``.
+            Customized attribution should follow rules in
+            
https://cloud.google.com/translate/attribution#attribution_and_logos
+        :param enable_shadow_removal_native_pdf: Optional. If true, use the 
text removal server to remove the
+            shadow text on background image for native PDF translation.
+            Shadow removal feature can only be enabled when both 
``is_translate_native_pdf_only``,
+            ``pdf_native_only`` are False.
+        :param enable_rotation_correction: Optional. If true, enable auto 
rotation
+            correction in DVS.
+        :param models: Optional. The models to use for translation. Map's key 
is
+            target language code. Map's value is the model name. Value
+            can be a built-in general model, or an AutoML Translation model.
+            The value format depends on model type:
+
+            -  AutoML Translation models:
+               
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+            -  General (built-in) models:
+               
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+            If the map is empty or a specific model is not requested for
+            a language pair, then default google model (NMT) is used.
+        :param glossaries: Glossaries to be applied. It's keyed by target 
language code.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+
+        :return: Batch translate document result from the API response.
+        """
+        client = self.get_client()
+        location_id = "global" if not location else location
+        parent = f"projects/{project_id or 
self.project_id}/locations/{location_id}"
+        return client.batch_translate_document(
+            request={
+                "parent": parent,
+                "source_language_code": source_language_code,
+                "target_language_codes": target_language_codes,
+                "input_configs": input_configs,
+                "output_config": output_config,
+                "format_conversions": format_conversions,
+                "customized_attribution": customized_attribution,
+                "enable_shadow_removal_native_pdf": 
enable_shadow_removal_native_pdf,
+                "enable_rotation_correction": enable_rotation_correction,
+                "models": models,
+                "glossaries": glossaries,
+            },
+            timeout=timeout,
+            retry=retry,
+            metadata=metadata,
+        )
diff --git a/providers/src/airflow/providers/google/cloud/links/translate.py 
b/providers/src/airflow/providers/google/cloud/links/translate.py
index 55db2650838..ecf595e9a59 100644
--- a/providers/src/airflow/providers/google/cloud/links/translate.py
+++ b/providers/src/airflow/providers/google/cloud/links/translate.py
@@ -333,3 +333,38 @@ class TranslationModelsListLink(BaseGoogleLink):
                 "project_id": project_id,
             },
         )
+
+
+class TranslateResultByOutputConfigLink(BaseGoogleLink):
+    """
+    Helper class for constructing Translation results Link.
+
+    Provides link to gcs destination output translation results, by provided 
output_config
+    with gcs destination specified.
+    """
+
+    name = "Translate Results By Output Config"
+    key = "translate_results_by_output_config"
+    format_str = TRANSLATION_TRANSLATE_TEXT_BATCH
+
+    @staticmethod
+    def extract_output_uri_prefix(output_config):
+        return 
output_config["gcs_destination"]["output_uri_prefix"].rpartition("gs://")[-1]
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+        project_id: str,
+        output_config: dict,
+    ):
+        task_instance.xcom_push(
+            context,
+            key=TranslateResultByOutputConfigLink.key,
+            value={
+                "project_id": project_id,
+                "output_uri_prefix": 
TranslateResultByOutputConfigLink.extract_output_uri_prefix(
+                    output_config
+                ),
+            },
+        )
diff --git 
a/providers/src/airflow/providers/google/cloud/operators/translate.py 
b/providers/src/airflow/providers/google/cloud/operators/translate.py
index 4c04e9a7bc5..e57b9e46fcc 100644
--- a/providers/src/airflow/providers/google/cloud/operators/translate.py
+++ b/providers/src/airflow/providers/google/cloud/operators/translate.py
@@ -20,7 +20,7 @@
 from __future__ import annotations
 
 from collections.abc import MutableMapping, MutableSequence, Sequence
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
 
 from google.api_core.exceptions import GoogleAPICallError
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
@@ -28,6 +28,7 @@ from google.api_core.gapic_v1.method import DEFAULT, 
_MethodDefault
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.translate import CloudTranslateHook, 
TranslateHook
 from airflow.providers.google.cloud.links.translate import (
+    TranslateResultByOutputConfigLink,
     TranslateTextBatchLink,
     TranslationDatasetsListLink,
     TranslationModelLink,
@@ -40,7 +41,11 @@ from airflow.providers.google.common.hooks.base_google 
import PROVIDE_PROJECT_ID
 if TYPE_CHECKING:
     from google.api_core.retry import Retry
     from google.cloud.translate_v3.types import (
+        BatchDocumentInputConfig,
+        BatchDocumentOutputConfig,
         DatasetInputConfig,
+        DocumentInputConfig,
+        DocumentOutputConfig,
         InputConfig,
         OutputConfig,
         TranslateTextGlossaryConfig,
@@ -978,3 +983,328 @@ class 
TranslateDeleteModelOperator(GoogleCloudBaseOperator):
         )
         hook.wait_for_operation_done(operation=operation, timeout=self.timeout)
         self.log.info("Model deletion complete!")
+
+
+class TranslateDocumentOperator(GoogleCloudBaseOperator):
+    """
+    Translate document provided.
+
+    Wraps the Google cloud Translate Text (Advanced) functionality.
+    Supports wide range of input/output file types, please visit the
+    https://cloud.google.com/translate/docs/advanced/translate-documents for 
more details.
+
+    For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:TranslateDocumentOperator`.
+
+    :param project_id: Optional. The ID of the Google Cloud project that the
+        service belongs to. If not specified the hook project_id will be used.
+    :param source_language_code: Optional. The ISO-639 language code of the
+        input document text if known. If the source language isn't specified,
+        the API attempts to identify the source language automatically and 
returns
+        the source language within the response.
+    :param target_language_code: Required. The ISO-639 language code to use
+        for translation of the input document text.
+    :param location: Optional. Project or location to make a call. Must refer 
to a caller's project.
+        If not specified, 'global' is used.
+        Non-global location is required for requests using AutoML models or 
custom glossaries.
+        Models and glossaries must be within the same region (have the same 
location-id).
+    :param document_input_config: A document translation request input config.
+    :param document_output_config: Optional. A document translation request 
output config.
+        If not provided the translated file will only be returned through a 
byte-stream
+        and its output mime type will be the same as the input file's mime 
type.
+    :param customized_attribution: Optional. This flag is to support user 
customized
+        attribution. If not provided, the default is ``Machine Translated by 
Google``.
+        Customized attribution should follow rules in
+        https://cloud.google.com/translate/attribution#attribution_and_logos
+    :param is_translate_native_pdf_only: Optional. Param for external 
customers.
+        If true, the page limit of online native PDF translation is 300 and 
only native PDF pages
+        will be translated.
+    :param enable_shadow_removal_native_pdf: Optional. If true, use the text 
removal server to remove the
+        shadow text on background image for native PDF translation.
+        Shadow removal feature can only be enabled when both 
``is_translate_native_pdf_only``,
+        ``pdf_native_only`` are False.
+    :param enable_rotation_correction: Optional. If true, enable auto rotation
+        correction in DVS.
+    :param model: Optional. The ``model`` type requested for this translation.
+        If not provided, the default Google model (NMT) will be used.
+        The format depends on model type:
+
+        -  AutoML Translation models:
+           
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+        -  General (built-in) models:
+           
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``
+
+        If not provided, the default Google model (NMT) will be used
+        for translation.
+    :param glossary_config: Optional. Glossary to be applied.
+    :param transliteration_config: Optional. Transliteration to be applied.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata:  Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    operator_extra_links = (TranslateResultByOutputConfigLink(),)
+
+    template_fields: Sequence[str] = (
+        "source_language_code",
+        "target_language_code",
+        "document_input_config",
+        "document_output_config",
+        "model",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        location: str | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        source_language_code: str | None = None,
+        target_language_code: str,
+        document_input_config: DocumentInputConfig | dict,
+        document_output_config: DocumentOutputConfig | dict | None,
+        customized_attribution: str | None = None,
+        is_translate_native_pdf_only: bool = False,
+        enable_shadow_removal_native_pdf: bool = False,
+        enable_rotation_correction: bool = False,
+        model: str | None = None,
+        glossary_config: TranslateTextGlossaryConfig | None = None,
+        labels: str | None = None,
+        timeout: float | _MethodDefault = DEFAULT,
+        retry: Retry | _MethodDefault | None = DEFAULT,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.source_language_code = source_language_code
+        self.target_language_code = target_language_code
+        self.document_input_config = document_input_config
+        self.document_output_config = document_output_config
+        self.customized_attribution = customized_attribution
+        self.is_translate_native_pdf_only = is_translate_native_pdf_only
+        self.enable_shadow_removal_native_pdf = 
enable_shadow_removal_native_pdf
+        self.enable_rotation_correction = enable_rotation_correction
+        self.location = location
+        self.labels = labels
+        self.model = model
+        self.glossary_config = glossary_config
+        self.metadate = metadata
+        self.timeout = timeout
+        self.retry = retry
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> dict:
+        hook = TranslateHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        try:
+            self.log.info("Starting the document translation")
+            doc_translation_result = hook.translate_document(
+                source_language_code=self.source_language_code,
+                target_language_code=self.target_language_code,
+                document_input_config=self.document_input_config,
+                document_output_config=self.document_output_config,
+                customized_attribution=self.customized_attribution,
+                is_translate_native_pdf_only=self.is_translate_native_pdf_only,
+                
enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf,
+                enable_rotation_correction=self.enable_rotation_correction,
+                location=self.location,
+                labels=self.labels,
+                model=self.model,
+                glossary_config=self.glossary_config,
+                timeout=self.timeout,
+                retry=self.retry,
+                metadata=self.metadate,
+            )
+            self.log.info("Document translation completed")
+        except GoogleAPICallError as e:
+            self.log.error("An error occurred executing translate_document 
method: \n%s", e)
+            raise AirflowException(e)
+        if self.document_output_config:
+            TranslateResultByOutputConfigLink.persist(
+                context=context,
+                task_instance=self,
+                project_id=self.project_id or hook.project_id,
+                output_config=self.document_output_config,
+            )
+        return cast(dict, 
type(doc_translation_result).to_dict(doc_translation_result))
+
+
+class TranslateDocumentBatchOperator(GoogleCloudBaseOperator):
+    """
+    Translate documents provided via input and output configurations.
+
+    Up to 10 target languages per operation supported.
+    Wraps the Google cloud Translate Text (Advanced) functionality.
+    See https://cloud.google.com/translate/docs/advanced/batch-translation.
+
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:TranslateDocumentBatchOperator`.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
service belongs to.
+    :param source_language_code: Optional. The ISO-639 language code of the
+        input text if known. If the source language isn't specified, the API 
attempts to identify
+        the source language automatically and returns the source language 
within the response.
+    :param target_language_codes: Required. The ISO-639 language code to use
+        for translation of the input document. Specify up to 10 language codes 
here.
+    :param location: Optional. Project or location to make a call. Must refer 
to
+        a caller's project. If not specified, 'global' is used.
+        Non-global location is required for requests using AutoML models or 
custom glossaries.
+        Models and glossaries must be within the same region (have the same 
location-id).
+    :param input_configs: Input configurations. The total number of files 
matched should be <=
+        100. The total content size to translate should be <= 100M Unicode 
codepoints.
+        The files must use UTF-8 encoding.
+    :param output_config: Output configuration. If 2 input configs match to 
the same file (that
+        is, same input path), no output for duplicate inputs will be generated.
+    :param format_conversions: Optional. The file format conversion map that 
is applied to
+        all input files. The map key is the original mime_type.
+        The map value is the target mime_type of translated documents.
+        Supported file format conversion includes:
+
+        -  ``application/pdf`` to
+           
``application/vnd.openxmlformats-officedocument.wordprocessingml.document``
+
+        If nothing specified, output files will be in the same format as the 
original file.
+    :param customized_attribution: Optional. This flag is to support user 
customized
+        attribution. If not provided, the default is ``Machine Translated by 
Google``.
+        Customized attribution should follow rules in
+        https://cloud.google.com/translate/attribution#attribution_and_logos
+    :param enable_shadow_removal_native_pdf: Optional. If true, use the text 
removal server to remove the
+        shadow text on background image for native PDF translation.
+        Shadow removal feature can only be enabled when both 
``is_translate_native_pdf_only``,
+        ``pdf_native_only`` are False.
+    :param enable_rotation_correction: Optional. If true, enable auto rotation
+        correction in DVS.
+    :param models: Optional. The models to use for translation. Map's key is
+        target language code. Map's value is the model name. Value
+        can be a built-in general model, or an AutoML Translation model.
+        The value format depends on model type:
+
+        -  AutoML Translation models:
+           
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+
+        -  General (built-in) models:
+           
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+        If the map is empty or a specific model is not requested for
+        a language pair, then default google model (NMT) is used.
+    :param glossaries: Glossaries to be applied. It's keyed by target language 
code.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata:  Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    operator_extra_links = (TranslateResultByOutputConfigLink(),)
+
+    template_fields: Sequence[str] = (
+        "input_configs",
+        "output_config",
+        "target_language_codes",
+        "source_language_code",
+        "models",
+        "glossaries",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str = PROVIDE_PROJECT_ID,
+        source_language_code: str,
+        target_language_codes: MutableSequence[str] | None = None,
+        location: str | None = None,
+        input_configs: MutableSequence[BatchDocumentInputConfig | dict],
+        output_config: BatchDocumentOutputConfig | dict,
+        customized_attribution: str | None = None,
+        format_conversions: MutableMapping[str, str] | None = None,
+        enable_shadow_removal_native_pdf: bool = False,
+        enable_rotation_correction: bool = False,
+        models: MutableMapping[str, str] | None = None,
+        glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None = 
None,
+        metadata: Sequence[tuple[str, str]] = (),
+        timeout: float | _MethodDefault = DEFAULT,
+        retry: Retry | _MethodDefault | None = DEFAULT,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.target_language_codes = target_language_codes
+        self.source_language_code = source_language_code
+        self.input_configs = input_configs
+        self.output_config = output_config
+        self.customized_attribution = customized_attribution
+        self.format_conversions = format_conversions
+        self.enable_shadow_removal_native_pdf = 
enable_shadow_removal_native_pdf
+        self.enable_rotation_correction = enable_rotation_correction
+        self.models = models
+        self.glossaries = glossaries
+        self.metadata = metadata
+        self.timeout = timeout
+        self.retry = retry
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> dict:
+        hook = TranslateHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        try:
+            batch_document_translate_operation = hook.batch_translate_document(
+                project_id=self.project_id,
+                location=self.location,
+                target_language_codes=self.target_language_codes,
+                source_language_code=self.source_language_code,
+                input_configs=self.input_configs,
+                output_config=self.output_config,
+                customized_attribution=self.customized_attribution,
+                format_conversions=self.format_conversions,
+                
enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf,
+                enable_rotation_correction=self.enable_rotation_correction,
+                models=self.models,
+                glossaries=self.glossaries,
+                metadata=self.metadata,
+                timeout=self.timeout,
+                retry=self.retry,
+            )
+        except GoogleAPICallError as e:
+            self.log.error("An error occurred executing 
batch_translate_document method: \n%s", e)
+            raise AirflowException(e)
+        self.log.info("Batch document translation job started.")
+        TranslateResultByOutputConfigLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=self.project_id or hook.project_id,
+            output_config=self.output_config,
+        )
+        result = 
hook.wait_for_operation_result(batch_document_translate_operation)
+        self.log.info("Batch document translation job finished")
+        return cast(dict, type(result).to_dict(result))
diff --git a/providers/src/airflow/providers/google/provider.yaml 
b/providers/src/airflow/providers/google/provider.yaml
index 442e1cecccc..cb3bb5082e7 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -1303,6 +1303,7 @@ extra-links:
   - airflow.providers.google.cloud.links.translate.TranslationDatasetsListLink
   - airflow.providers.google.cloud.links.translate.TranslationModelLink
   - airflow.providers.google.cloud.links.translate.TranslationModelsListLink
+  - 
airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink
 
 
 secrets-backends:
diff --git a/providers/tests/google/cloud/operators/test_translate.py 
b/providers/tests/google/cloud/operators/test_translate.py
index d1b6a9fa009..2e4217ee53a 100644
--- a/providers/tests/google/cloud/operators/test_translate.py
+++ b/providers/tests/google/cloud/operators/test_translate.py
@@ -20,7 +20,11 @@ from __future__ import annotations
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.translate_v3.types import automl_translation
+from google.cloud.translate_v3.types import (
+    BatchTranslateDocumentResponse,
+    TranslateDocumentResponse,
+    automl_translation,
+)
 
 from airflow.providers.google.cloud.hooks.translate import TranslateHook
 from airflow.providers.google.cloud.operators.translate import (
@@ -30,20 +34,21 @@ from airflow.providers.google.cloud.operators.translate 
import (
     TranslateDatasetsListOperator,
     TranslateDeleteDatasetOperator,
     TranslateDeleteModelOperator,
+    TranslateDocumentBatchOperator,
+    TranslateDocumentOperator,
     TranslateImportDataOperator,
     TranslateModelsListOperator,
     TranslateTextBatchOperator,
     TranslateTextOperator,
 )
 
-from providers.tests.system.google.cloud.tasks.example_tasks import LOCATION
-
 GCP_CONN_ID = "google_cloud_default"
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
 PROJECT_ID = "test-project-id"
 DATASET_ID = "sample_ds_id"
 MODEL_ID = "sample_model_id"
 TIMEOUT_VALUE = 30
+LOCATION = "location_id"
 
 
 class TestCloudTranslate:
@@ -542,3 +547,164 @@ class TestTranslateDeleteModel:
             metadata=(),
         )
         
wait_for_done.assert_called_once_with(operation=m_delete_method_result, 
timeout=TIMEOUT_VALUE)
+
+
+class TestTranslateDocumentBatchOperator:
+    
@mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist")
+    
@mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook")
+    def test_minimal_green_path(self, mock_hook, mock_link_persist):
+        input_config_item_1 = {
+            "gcs_source": {"input_uri": 
"gs://source_bucket_uri/sample_data_src_lang_1.txt"},
+        }
+        input_config_item_2 = {
+            "gcs_source": {"input_uri": 
"gs://source_bucket_uri/sample_data_src_lang_2.txt"},
+        }
+        SRC_LANG_CODE = "src_lang_code"
+        TARGET_LANG_CODES = ["target_lang_code1", "target_lang_code2"]
+        TIMEOUT = 30
+        INPUT_CONFIGS = [input_config_item_1, input_config_item_2]
+        OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix": 
"gs://source_bucket_uri/output/"}}
+        BATCH_DOC_TRANSLATION_RESULT = {
+            "submit_time": "2024-12-01T00:01:16Z",
+            "end_time": "2024-12-01T00:10:01Z",
+            "failed_characters": "0",
+            "failed_pages": "0",
+            "total_billable_characters": "0",
+            "total_billable_pages": "6",
+            "total_characters": "4240",
+            "total_pages": "6",
+            "translated_characters": "4240",
+            "translated_pages": "6",
+        }
+        sample_operation = mock.MagicMock()
+        sample_operation.result.return_value = 
BatchTranslateDocumentResponse(BATCH_DOC_TRANSLATION_RESULT)
+
+        mock_hook.return_value.batch_translate_document.return_value = 
sample_operation
+        mock_hook.return_value.wait_for_operation_result.side_effect = lambda 
operation: operation.result()
+
+        op = TranslateDocumentBatchOperator(
+            task_id="task_id_test",
+            project_id=PROJECT_ID,
+            source_language_code=SRC_LANG_CODE,
+            target_language_codes=TARGET_LANG_CODES,
+            location=LOCATION,
+            models=None,
+            glossaries=None,
+            input_configs=INPUT_CONFIGS,
+            output_config=OUTPUT_CONFIG,
+            customized_attribution=None,
+            format_conversions=None,
+            enable_shadow_removal_native_pdf=False,
+            enable_rotation_correction=False,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            metadata=(),
+            timeout=TIMEOUT,
+            retry=None,
+        )
+        context = {"ti": mock.MagicMock()}
+        result = op.execute(context=context)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        
mock_hook.return_value.batch_translate_document.assert_called_once_with(
+            project_id=PROJECT_ID,
+            source_language_code=SRC_LANG_CODE,
+            target_language_codes=TARGET_LANG_CODES,
+            location=LOCATION,
+            input_configs=INPUT_CONFIGS,
+            output_config=OUTPUT_CONFIG,
+            customized_attribution=None,
+            format_conversions=None,
+            enable_shadow_removal_native_pdf=False,
+            enable_rotation_correction=False,
+            timeout=TIMEOUT,
+            models=None,
+            glossaries=None,
+            retry=None,
+            metadata=(),
+        )
+
+        assert result == BATCH_DOC_TRANSLATION_RESULT
+        mock_link_persist.assert_called_once_with(
+            context=context,
+            task_instance=op,
+            project_id=PROJECT_ID,
+            output_config=OUTPUT_CONFIG,
+        )
+
+
+class TestTranslateDocumentOperator:
+    
@mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist")
+    
@mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook")
+    def test_minimal_green_path(self, mock_hook, mock_link_persist):
+        SRC_LANG_CODE = "src_lang_code"
+        TARGET_LANG_CODE = "target_lang_code1"
+        TIMEOUT = 30
+        INPUT_CONFIG = {"gcs_source": {"input_uri": 
"gs://source_bucket_uri/sample_data_src_lang_1.txt"}}
+        OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix": 
"gs://source_bucket_uri/output/"}}
+        DOC_TRANSLATION_RESULT = {
+            "document_translation": {
+                "byte_stream_outputs": ["c29tZV9kYXRh"],
+                "detected_language_code": "",
+                "mime_type": 
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+            },
+            "model": 
f"projects/{PROJECT_ID}/locations/us-central1/models/general/nmt",
+        }
+
+        mock_hook.return_value.translate_document.return_value = 
TranslateDocumentResponse(
+            DOC_TRANSLATION_RESULT
+        )
+
+        op = TranslateDocumentOperator(
+            task_id="task_id_test",
+            project_id=PROJECT_ID,
+            source_language_code=SRC_LANG_CODE,
+            target_language_code=TARGET_LANG_CODE,
+            location=LOCATION,
+            model=None,
+            glossary_config=None,
+            labels=None,
+            document_input_config=INPUT_CONFIG,
+            document_output_config=OUTPUT_CONFIG,
+            customized_attribution=None,
+            is_translate_native_pdf_only=False,
+            enable_shadow_removal_native_pdf=False,
+            enable_rotation_correction=False,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            timeout=TIMEOUT,
+            retry=None,
+        )
+        context = {"ti": mock.MagicMock()}
+        result = op.execute(context=context)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.translate_document.assert_called_once_with(
+            source_language_code=SRC_LANG_CODE,
+            target_language_code=TARGET_LANG_CODE,
+            location=LOCATION,
+            model=None,
+            glossary_config=None,
+            labels=None,
+            document_input_config=INPUT_CONFIG,
+            document_output_config=OUTPUT_CONFIG,
+            customized_attribution=None,
+            is_translate_native_pdf_only=False,
+            enable_shadow_removal_native_pdf=False,
+            enable_rotation_correction=False,
+            timeout=TIMEOUT,
+            retry=None,
+            metadata=(),
+        )
+
+        assert result == DOC_TRANSLATION_RESULT
+        mock_link_persist.assert_called_once_with(
+            context=context,
+            task_instance=op,
+            project_id=PROJECT_ID,
+            output_config=OUTPUT_CONFIG,
+        )
diff --git 
a/providers/tests/system/google/cloud/translate/example_translate_document.py 
b/providers/tests/system/google/cloud/translate/example_translate_document.py
new file mode 100644
index 00000000000..8c47a4d4ecd
--- /dev/null
+++ 
b/providers/tests/system/google/cloud/translate/example_translate_document.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that translates text in Google Cloud Translate using V3 
API version
+service in the Google Cloud.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.operators.translate import (
+    TranslateDocumentBatchOperator,
+    TranslateDocumentOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "gcp_translate_document"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+REGION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+DATA_OUTPUT_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+
+DOC_TRANSLATE_INPUT = {
+    "gcs_source": {
+        "input_uri": 
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/document_translate/translate_me_sample.xlsx"
+    },
+}
+GCS_OUTPUT_DST = {
+    "gcs_destination": {"output_uri_prefix": 
f"gs://{DATA_OUTPUT_BUCKET_NAME}/doc_translate_output/"}
+}
+BATCH_DOC_INPUT_ITEM_1 = {
+    "gcs_source": {
+        "input_uri": 
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_doc_sample_1.docx"
+    }
+}
+BATCH_DOC_INPUT_ITEM_2 = {
+    "gcs_source": {
+        "input_uri": 
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_sample_2.pdf"
+    }
+}
+BATCH_OUTPUT_CONFIG = {
+    "gcs_destination": {"output_uri_prefix": 
f"gs://{DATA_OUTPUT_BUCKET_NAME}/batch_translate_docs_output/"}
+}
+
+
+with DAG(
+    DAG_ID,
+    schedule="@once",  # Override to match your needs
+    start_date=datetime(2024, 1, 1),
+    catchup=False,
+    tags=["example", "document_translate", "document_translate_batch", 
"translate_V3"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=DATA_OUTPUT_BUCKET_NAME,
+        storage_class="REGIONAL",
+        location=REGION,
+    )
+    # [START howto_operator_translate_document]
+    translate_document = TranslateDocumentOperator(
+        task_id="translate_document_op",
+        project_id=PROJECT_ID,
+        location=REGION,
+        source_language_code="en",
+        target_language_code="uk",
+        document_input_config=DOC_TRANSLATE_INPUT,
+        document_output_config=GCS_OUTPUT_DST,
+    )
+    # [END howto_operator_translate_document]
+
+    # [START howto_operator_translate_document_batch]
+    translate_document_batch = TranslateDocumentBatchOperator(
+        task_id="batch_translate_document_op",
+        project_id=PROJECT_ID,
+        location=REGION,
+        source_language_code="en",
+        target_language_codes=["uk", "fr"],
+        input_configs=[BATCH_DOC_INPUT_ITEM_1, BATCH_DOC_INPUT_ITEM_2],
+        output_config=BATCH_OUTPUT_CONFIG,
+    )
+    # [END howto_operator_translate_document_batch]
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=DATA_OUTPUT_BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> [translate_document, translate_document_batch]
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to