This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 51584b8c371 Introduce gcp translation(V3), translate document
providers (#44971)
51584b8c371 is described below
commit 51584b8c371263bf34725afedd3c2f1b35468a8e
Author: olegkachur-e <[email protected]>
AuthorDate: Wed Dec 18 11:01:04 2024 +0100
Introduce gcp translation(V3), translate document providers (#44971)
- Add TranslateDocumentOperator and TranslateDocumentBatchOperator
operators.
Co-authored-by: Oleg Kachur <[email protected]>
---
.../operators/cloud/translate.rst | 42 +++
.../providers/google/cloud/hooks/translate.py | 201 +++++++++++++
.../providers/google/cloud/links/translate.py | 35 +++
.../providers/google/cloud/operators/translate.py | 332 ++++++++++++++++++++-
.../src/airflow/providers/google/provider.yaml | 1 +
.../tests/google/cloud/operators/test_translate.py | 172 ++++++++++-
.../cloud/translate/example_translate_document.py | 131 ++++++++
7 files changed, 910 insertions(+), 4 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/translate.rst
b/docs/apache-airflow-providers-google/operators/cloud/translate.rst
index 5bda3d9085a..4b1cee34617 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/translate.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/translate.rst
@@ -247,6 +247,48 @@ Basic usage of the operator:
:end-before: [END howto_operator_translate_automl_delete_model]
+.. _howto/operator:TranslateDocumentOperator:
+
+TranslateDocumentOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^
+Translate Document using Cloud Translate API (Advanced V3).
+
+For parameter definition, take a look at
+:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentOperator`
+
+Using the operator
+""""""""""""""""""
+
+Basic usage of the operator:
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/translate/example_translate_document.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_translate_document]
+ :end-before: [END howto_operator_translate_document]
+
+
+.. _howto/operator:TranslateDocumentBatchOperator:
+
+TranslateDocumentBatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Translate Documents using Cloud Translate API (Advanced V3), by given input
configs.
+
+For parameter definition, take a look at
+:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentBatchOperator`
+
+Using the operator
+""""""""""""""""""
+
+Basic usage of the operator:
+
+.. exampleinclude::
/../../providers/tests/system/google/cloud/translate/example_translate_document.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_translate_document_batch]
+ :end-before: [END howto_operator_translate_document_batch]
+
+
More information
""""""""""""""""""
See:
diff --git a/providers/src/airflow/providers/google/cloud/hooks/translate.py
b/providers/src/airflow/providers/google/cloud/hooks/translate.py
index 43e0c15774b..cf9a748d1a2 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/translate.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/translate.py
@@ -39,9 +39,14 @@ if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.cloud.translate_v3.services.translation_service import pagers
from google.cloud.translate_v3.types import (
+ BatchDocumentInputConfig,
+ BatchDocumentOutputConfig,
DatasetInputConfig,
+ DocumentInputConfig,
+ DocumentOutputConfig,
InputConfig,
OutputConfig,
+ TranslateDocumentResponse,
TranslateTextGlossaryConfig,
TransliterationConfig,
automl_translation,
@@ -714,3 +719,199 @@ class TranslateHook(GoogleBaseHook):
metadata=metadata,
)
return result
+
+ def translate_document(
+ self,
+ *,
+ project_id: str = PROVIDE_PROJECT_ID,
+ source_language_code: str | None = None,
+ target_language_code: str,
+ location: str | None = None,
+ document_input_config: DocumentInputConfig | dict,
+ document_output_config: DocumentOutputConfig | dict | None,
+ customized_attribution: str | None = None,
+ is_translate_native_pdf_only: bool = False,
+ enable_shadow_removal_native_pdf: bool = False,
+ enable_rotation_correction: bool = False,
+ model: str | None = None,
+ glossary_config: TranslateTextGlossaryConfig | None = None,
+ labels: str | None = None,
+ timeout: float | _MethodDefault = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ retry: Retry | _MethodDefault | None = DEFAULT,
+ ) -> TranslateDocumentResponse:
+ """
+ Translate the document provided.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the service belongs to.
+ :param source_language_code: Optional. The ISO-639 language code of the
+ input document text if known. If the source language isn't
specified,
+ the API attempts to identify the source language automatically and
returns
+ the source language within the response.
+ :param target_language_code: Required. The ISO-639 language code to use
+ for translation of the input document text.
+ :param location: Optional. Project or location to make a call. Must
refer to
+ a caller's project.
+ If not specified, 'global' is used.
+ Non-global location is required for requests using AutoML models
or custom glossaries.
+ Models and glossaries must be within the same region (have the
same location-id).
+ :param document_input_config: A document translation request input
config.
+ :param document_output_config: Optional. A document translation
request output config.
+ If not provided the translated file will only be returned through
a byte-stream
+ and its output mime type will be the same as the input file's mime
type.
+ :param customized_attribution: Optional. This flag is to support user
customized
+ attribution. If not provided, the default is ``Machine Translated
by Google``.
+ Customized attribution should follow rules in
+
https://cloud.google.com/translate/attribution#attribution_and_logos
+ :param is_translate_native_pdf_only: Optional. Param for external
+ customers. If true, the page limit of online native PDF
+ translation is 300 and only native PDF pages will be
+ translated.
+ :param enable_shadow_removal_native_pdf: Optional. If true, use the
text removal server to remove the
+ shadow text on background image for native PDF translation.
+ Shadow removal feature can only be enabled when both
``is_translate_native_pdf_only``,
+ ``pdf_native_only`` are False.
+ :param enable_rotation_correction: Optional. If true, enable auto
rotation
+ correction in DVS.
+ :param model: Optional. The ``model`` type requested for this
translation.
+ If not provided, the default Google model (NMT) will be used.
+ The format depends on model type:
+
+ - AutoML Translation models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+ - General (built-in) models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+ If not provided, the default Google model (NMT) will be used
+ for translation.
+ :param glossary_config: Optional. Glossary to be applied. The glossary
must be
+ within the same region (have the same location-id) as the
+ model.
+ :param labels: Optional. The labels with user-defined
+ metadata for the request.
+ See https://cloud.google.com/translate/docs/advanced/labels for
more information.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+
+ :return: Translate document result from the API response.
+ """
+ client = self.get_client()
+ location_id = "global" if not location else location
+ parent = f"projects/{project_id or
self.project_id}/locations/{location_id}"
+ return client.translate_document(
+ request={
+ "parent": parent,
+ "source_language_code": source_language_code,
+ "target_language_code": target_language_code,
+ "document_input_config": document_input_config,
+ "document_output_config": document_output_config,
+ "customized_attribution": customized_attribution,
+ "is_translate_native_pdf_only": is_translate_native_pdf_only,
+ "enable_shadow_removal_native_pdf":
enable_shadow_removal_native_pdf,
+ "enable_rotation_correction": enable_rotation_correction,
+ "model": model,
+ "glossary_config": glossary_config,
+ "labels": labels,
+ },
+ timeout=timeout,
+ retry=retry,
+ metadata=metadata,
+ )
+
+ def batch_translate_document(
+ self,
+ *,
+ project_id: str = PROVIDE_PROJECT_ID,
+ source_language_code: str,
+ target_language_codes: MutableSequence[str] | None = None,
+ location: str | None = None,
+ input_configs: MutableSequence[BatchDocumentInputConfig | dict],
+ output_config: BatchDocumentOutputConfig | dict,
+ customized_attribution: str | None = None,
+ format_conversions: MutableMapping[str, str] | None = None,
+ enable_shadow_removal_native_pdf: bool = False,
+ enable_rotation_correction: bool = False,
+ models: MutableMapping[str, str] | None = None,
+ glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None =
None,
+ timeout: float | _MethodDefault = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ retry: Retry | _MethodDefault | None = DEFAULT,
+ ) -> Operation:
+ """
+ Translate documents batch by configs provided.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the service belongs to.
+ :param source_language_code: Optional. The ISO-639 language code of the
+ input text if known. If the source language isn't specified, the
API attempts to identify
+ the source language automatically and returns the source language
within the response.
+ :param target_language_codes: Required. The ISO-639 language code to
use
+ for translation of the input document. Specify up to 10 language
codes here.
+ :param location: Optional. Project or location to make a call. Must
refer to
+ a caller's project. If not specified, 'global' is used.
+ Non-global location is required for requests using AutoML models
or custom glossaries.
+ Models and glossaries must be within the same region (have the
same location-id).
+ :param input_configs: Input configurations. The total number of files
matched should be <=
+ 100. The total content size to translate should be <= 100M Unicode
codepoints.
+ The files must use UTF-8 encoding.
+ :param output_config: Output configuration. If 2 input configs match
to the same file (that
+ is, same input path), no output for duplicate inputs will be
generated.
+ :param format_conversions: Optional. The file format conversion map
that is applied to
+ all input files. The map key is the original mime_type.
+ The map value is the target mime_type of translated documents.
+ Supported file format conversion includes:
+
+ - ``application/pdf`` to
+
``application/vnd.openxmlformats-officedocument.wordprocessingml.document``
+
+ If nothing specified, output files will be in the same format as
the original file.
+ :param customized_attribution: Optional. This flag is to support user
customized
+ attribution. If not provided, the default is ``Machine Translated
by Google``.
+ Customized attribution should follow rules in
+
https://cloud.google.com/translate/attribution#attribution_and_logos
+ :param enable_shadow_removal_native_pdf: Optional. If true, use the
text removal server to remove the
+ shadow text on background image for native PDF translation.
+ Shadow removal feature can only be enabled when both
``is_translate_native_pdf_only``,
+ ``pdf_native_only`` are False.
+ :param enable_rotation_correction: Optional. If true, enable auto
rotation
+ correction in DVS.
+ :param models: Optional. The models to use for translation. Map's key
is
+ target language code. Map's value is the model name. Value
+ can be a built-in general model, or an AutoML Translation model.
+ The value format depends on model type:
+
+ - AutoML Translation models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+ - General (built-in) models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+ If the map is empty or a specific model is not requested for
+ a language pair, then default google model (NMT) is used.
+ :param glossaries: Glossaries to be applied. It's keyed by target
language code.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+
+ :return: Batch translate document result from the API response.
+ """
+ client = self.get_client()
+ location_id = "global" if not location else location
+ parent = f"projects/{project_id or
self.project_id}/locations/{location_id}"
+ return client.batch_translate_document(
+ request={
+ "parent": parent,
+ "source_language_code": source_language_code,
+ "target_language_codes": target_language_codes,
+ "input_configs": input_configs,
+ "output_config": output_config,
+ "format_conversions": format_conversions,
+ "customized_attribution": customized_attribution,
+ "enable_shadow_removal_native_pdf":
enable_shadow_removal_native_pdf,
+ "enable_rotation_correction": enable_rotation_correction,
+ "models": models,
+ "glossaries": glossaries,
+ },
+ timeout=timeout,
+ retry=retry,
+ metadata=metadata,
+ )
diff --git a/providers/src/airflow/providers/google/cloud/links/translate.py
b/providers/src/airflow/providers/google/cloud/links/translate.py
index 55db2650838..ecf595e9a59 100644
--- a/providers/src/airflow/providers/google/cloud/links/translate.py
+++ b/providers/src/airflow/providers/google/cloud/links/translate.py
@@ -333,3 +333,38 @@ class TranslationModelsListLink(BaseGoogleLink):
"project_id": project_id,
},
)
+
+
+class TranslateResultByOutputConfigLink(BaseGoogleLink):
+ """
+ Helper class for constructing Translation results Link.
+
+ Provides link to gcs destination output translation results, by provided
output_config
+ with gcs destination specified.
+ """
+
+ name = "Translate Results By Output Config"
+ key = "translate_results_by_output_config"
+ format_str = TRANSLATION_TRANSLATE_TEXT_BATCH
+
+ @staticmethod
+ def extract_output_uri_prefix(output_config):
+ return
output_config["gcs_destination"]["output_uri_prefix"].rpartition("gs://")[-1]
+
+ @staticmethod
+ def persist(
+ context: Context,
+ task_instance,
+ project_id: str,
+ output_config: dict,
+ ):
+ task_instance.xcom_push(
+ context,
+ key=TranslateResultByOutputConfigLink.key,
+ value={
+ "project_id": project_id,
+ "output_uri_prefix":
TranslateResultByOutputConfigLink.extract_output_uri_prefix(
+ output_config
+ ),
+ },
+ )
diff --git
a/providers/src/airflow/providers/google/cloud/operators/translate.py
b/providers/src/airflow/providers/google/cloud/operators/translate.py
index 4c04e9a7bc5..e57b9e46fcc 100644
--- a/providers/src/airflow/providers/google/cloud/operators/translate.py
+++ b/providers/src/airflow/providers/google/cloud/operators/translate.py
@@ -20,7 +20,7 @@
from __future__ import annotations
from collections.abc import MutableMapping, MutableSequence, Sequence
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
@@ -28,6 +28,7 @@ from google.api_core.gapic_v1.method import DEFAULT,
_MethodDefault
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.translate import CloudTranslateHook,
TranslateHook
from airflow.providers.google.cloud.links.translate import (
+ TranslateResultByOutputConfigLink,
TranslateTextBatchLink,
TranslationDatasetsListLink,
TranslationModelLink,
@@ -40,7 +41,11 @@ from airflow.providers.google.common.hooks.base_google
import PROVIDE_PROJECT_ID
if TYPE_CHECKING:
from google.api_core.retry import Retry
from google.cloud.translate_v3.types import (
+ BatchDocumentInputConfig,
+ BatchDocumentOutputConfig,
DatasetInputConfig,
+ DocumentInputConfig,
+ DocumentOutputConfig,
InputConfig,
OutputConfig,
TranslateTextGlossaryConfig,
@@ -978,3 +983,328 @@ class
TranslateDeleteModelOperator(GoogleCloudBaseOperator):
)
hook.wait_for_operation_done(operation=operation, timeout=self.timeout)
self.log.info("Model deletion complete!")
+
+
+class TranslateDocumentOperator(GoogleCloudBaseOperator):
+ """
+ Translate document provided.
+
+ Wraps the Google cloud Translate Text (Advanced) functionality.
+ Supports wide range of input/output file types, please visit the
+ https://cloud.google.com/translate/docs/advanced/translate-documents for
more details.
+
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:TranslateDocumentOperator`.
+
+ :param project_id: Optional. The ID of the Google Cloud project that the
+ service belongs to. If not specified the hook project_id will be used.
+ :param source_language_code: Optional. The ISO-639 language code of the
+ input document text if known. If the source language isn't specified,
+ the API attempts to identify the source language automatically and
returns
+ the source language within the response.
+ :param target_language_code: Required. The ISO-639 language code to use
+ for translation of the input document text.
+ :param location: Optional. Project or location to make a call. Must refer
to a caller's project.
+ If not specified, 'global' is used.
+ Non-global location is required for requests using AutoML models or
custom glossaries.
+ Models and glossaries must be within the same region (have the same
location-id).
+ :param document_input_config: A document translation request input config.
+ :param document_output_config: Optional. A document translation request
output config.
+ If not provided the translated file will only be returned through a
byte-stream
+ and its output mime type will be the same as the input file's mime
type.
+ :param customized_attribution: Optional. This flag is to support user
customized
+ attribution. If not provided, the default is ``Machine Translated by
Google``.
+ Customized attribution should follow rules in
+ https://cloud.google.com/translate/attribution#attribution_and_logos
+ :param is_translate_native_pdf_only: Optional. Param for external
customers.
+ If true, the page limit of online native PDF translation is 300 and
only native PDF pages
+ will be translated.
+ :param enable_shadow_removal_native_pdf: Optional. If true, use the text
removal server to remove the
+ shadow text on background image for native PDF translation.
+ Shadow removal feature can only be enabled when both
``is_translate_native_pdf_only``,
+ ``pdf_native_only`` are False.
+ :param enable_rotation_correction: Optional. If true, enable auto rotation
+ correction in DVS.
+ :param model: Optional. The ``model`` type requested for this translation.
+ If not provided, the default Google model (NMT) will be used.
+ The format depends on model type:
+
+ - AutoML Translation models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+ - General (built-in) models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``
+
+ If not provided, the default Google model (NMT) will be used
+ for translation.
+ :param glossary_config: Optional. Glossary to be applied.
+ :param transliteration_config: Optional. Transliteration to be applied.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ operator_extra_links = (TranslateResultByOutputConfigLink(),)
+
+ template_fields: Sequence[str] = (
+ "source_language_code",
+ "target_language_code",
+ "document_input_config",
+ "document_output_config",
+ "model",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ location: str | None = None,
+ project_id: str = PROVIDE_PROJECT_ID,
+ source_language_code: str | None = None,
+ target_language_code: str,
+ document_input_config: DocumentInputConfig | dict,
+ document_output_config: DocumentOutputConfig | dict | None,
+ customized_attribution: str | None = None,
+ is_translate_native_pdf_only: bool = False,
+ enable_shadow_removal_native_pdf: bool = False,
+ enable_rotation_correction: bool = False,
+ model: str | None = None,
+ glossary_config: TranslateTextGlossaryConfig | None = None,
+ labels: str | None = None,
+ timeout: float | _MethodDefault = DEFAULT,
+ retry: Retry | _MethodDefault | None = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.source_language_code = source_language_code
+ self.target_language_code = target_language_code
+ self.document_input_config = document_input_config
+ self.document_output_config = document_output_config
+ self.customized_attribution = customized_attribution
+ self.is_translate_native_pdf_only = is_translate_native_pdf_only
+ self.enable_shadow_removal_native_pdf =
enable_shadow_removal_native_pdf
+ self.enable_rotation_correction = enable_rotation_correction
+ self.location = location
+ self.labels = labels
+ self.model = model
+ self.glossary_config = glossary_config
+ self.metadate = metadata
+ self.timeout = timeout
+ self.retry = retry
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> dict:
+ hook = TranslateHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ try:
+ self.log.info("Starting the document translation")
+ doc_translation_result = hook.translate_document(
+ source_language_code=self.source_language_code,
+ target_language_code=self.target_language_code,
+ document_input_config=self.document_input_config,
+ document_output_config=self.document_output_config,
+ customized_attribution=self.customized_attribution,
+ is_translate_native_pdf_only=self.is_translate_native_pdf_only,
+
enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf,
+ enable_rotation_correction=self.enable_rotation_correction,
+ location=self.location,
+ labels=self.labels,
+ model=self.model,
+ glossary_config=self.glossary_config,
+ timeout=self.timeout,
+ retry=self.retry,
+ metadata=self.metadate,
+ )
+ self.log.info("Document translation completed")
+ except GoogleAPICallError as e:
+ self.log.error("An error occurred executing translate_document
method: \n%s", e)
+ raise AirflowException(e)
+ if self.document_output_config:
+ TranslateResultByOutputConfigLink.persist(
+ context=context,
+ task_instance=self,
+ project_id=self.project_id or hook.project_id,
+ output_config=self.document_output_config,
+ )
+ return cast(dict,
type(doc_translation_result).to_dict(doc_translation_result))
+
+
+class TranslateDocumentBatchOperator(GoogleCloudBaseOperator):
+ """
+ Translate documents provided via input and output configurations.
+
+ Up to 10 target languages per operation supported.
+ Wraps the Google cloud Translate Text (Advanced) functionality.
+ See https://cloud.google.com/translate/docs/advanced/batch-translation.
+
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:TranslateDocumentBatchOperator`.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
service belongs to.
+ :param source_language_code: Optional. The ISO-639 language code of the
+ input text if known. If the source language isn't specified, the API
attempts to identify
+ the source language automatically and returns the source language
within the response.
+ :param target_language_codes: Required. The ISO-639 language code to use
+ for translation of the input document. Specify up to 10 language codes
here.
+ :param location: Optional. Project or location to make a call. Must refer
to
+ a caller's project. If not specified, 'global' is used.
+ Non-global location is required for requests using AutoML models or
custom glossaries.
+ Models and glossaries must be within the same region (have the same
location-id).
+ :param input_configs: Input configurations. The total number of files
matched should be <=
+ 100. The total content size to translate should be <= 100M Unicode
codepoints.
+ The files must use UTF-8 encoding.
+ :param output_config: Output configuration. If 2 input configs match to
the same file (that
+ is, same input path), no output for duplicate inputs will be generated.
+ :param format_conversions: Optional. The file format conversion map that
is applied to
+ all input files. The map key is the original mime_type.
+ The map value is the target mime_type of translated documents.
+ Supported file format conversion includes:
+
+ - ``application/pdf`` to
+
``application/vnd.openxmlformats-officedocument.wordprocessingml.document``
+
+ If nothing specified, output files will be in the same format as the
original file.
+ :param customized_attribution: Optional. This flag is to support user
customized
+ attribution. If not provided, the default is ``Machine Translated by
Google``.
+ Customized attribution should follow rules in
+ https://cloud.google.com/translate/attribution#attribution_and_logos
+ :param enable_shadow_removal_native_pdf: Optional. If true, use the text
removal server to remove the
+ shadow text on background image for native PDF translation.
+ Shadow removal feature can only be enabled when both
``is_translate_native_pdf_only``,
+ ``pdf_native_only`` are False.
+ :param enable_rotation_correction: Optional. If true, enable auto rotation
+ correction in DVS.
+ :param models: Optional. The models to use for translation. Map's key is
+ target language code. Map's value is the model name. Value
+ can be a built-in general model, or an AutoML Translation model.
+ The value format depends on model type:
+
+ - AutoML Translation models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}``
+
+ - General (built-in) models:
+
``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``,
+
+ If the map is empty or a specific model is not requested for
+ a language pair, then default google model (NMT) is used.
+ :param glossaries: Glossaries to be applied. It's keyed by target language
code.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ operator_extra_links = (TranslateResultByOutputConfigLink(),)
+
+ template_fields: Sequence[str] = (
+ "input_configs",
+ "output_config",
+ "target_language_codes",
+ "source_language_code",
+ "models",
+ "glossaries",
+ "gcp_conn_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str = PROVIDE_PROJECT_ID,
+ source_language_code: str,
+ target_language_codes: MutableSequence[str] | None = None,
+ location: str | None = None,
+ input_configs: MutableSequence[BatchDocumentInputConfig | dict],
+ output_config: BatchDocumentOutputConfig | dict,
+ customized_attribution: str | None = None,
+ format_conversions: MutableMapping[str, str] | None = None,
+ enable_shadow_removal_native_pdf: bool = False,
+ enable_rotation_correction: bool = False,
+ models: MutableMapping[str, str] | None = None,
+ glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None =
None,
+ metadata: Sequence[tuple[str, str]] = (),
+ timeout: float | _MethodDefault = DEFAULT,
+ retry: Retry | _MethodDefault | None = DEFAULT,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.location = location
+ self.target_language_codes = target_language_codes
+ self.source_language_code = source_language_code
+ self.input_configs = input_configs
+ self.output_config = output_config
+ self.customized_attribution = customized_attribution
+ self.format_conversions = format_conversions
+ self.enable_shadow_removal_native_pdf =
enable_shadow_removal_native_pdf
+ self.enable_rotation_correction = enable_rotation_correction
+ self.models = models
+ self.glossaries = glossaries
+ self.metadata = metadata
+ self.timeout = timeout
+ self.retry = retry
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> dict:
+ hook = TranslateHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ try:
+ batch_document_translate_operation = hook.batch_translate_document(
+ project_id=self.project_id,
+ location=self.location,
+ target_language_codes=self.target_language_codes,
+ source_language_code=self.source_language_code,
+ input_configs=self.input_configs,
+ output_config=self.output_config,
+ customized_attribution=self.customized_attribution,
+ format_conversions=self.format_conversions,
+
enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf,
+ enable_rotation_correction=self.enable_rotation_correction,
+ models=self.models,
+ glossaries=self.glossaries,
+ metadata=self.metadata,
+ timeout=self.timeout,
+ retry=self.retry,
+ )
+ except GoogleAPICallError as e:
+ self.log.error("An error occurred executing
batch_translate_document method: \n%s", e)
+ raise AirflowException(e)
+ self.log.info("Batch document translation job started.")
+ TranslateResultByOutputConfigLink.persist(
+ context=context,
+ task_instance=self,
+ project_id=self.project_id or hook.project_id,
+ output_config=self.output_config,
+ )
+ result =
hook.wait_for_operation_result(batch_document_translate_operation)
+ self.log.info("Batch document translation job finished")
+ return cast(dict, type(result).to_dict(result))
diff --git a/providers/src/airflow/providers/google/provider.yaml
b/providers/src/airflow/providers/google/provider.yaml
index 442e1cecccc..cb3bb5082e7 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -1303,6 +1303,7 @@ extra-links:
- airflow.providers.google.cloud.links.translate.TranslationDatasetsListLink
- airflow.providers.google.cloud.links.translate.TranslationModelLink
- airflow.providers.google.cloud.links.translate.TranslationModelsListLink
+ -
airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink
secrets-backends:
diff --git a/providers/tests/google/cloud/operators/test_translate.py
b/providers/tests/google/cloud/operators/test_translate.py
index d1b6a9fa009..2e4217ee53a 100644
--- a/providers/tests/google/cloud/operators/test_translate.py
+++ b/providers/tests/google/cloud/operators/test_translate.py
@@ -20,7 +20,11 @@ from __future__ import annotations
from unittest import mock
from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.translate_v3.types import automl_translation
+from google.cloud.translate_v3.types import (
+ BatchTranslateDocumentResponse,
+ TranslateDocumentResponse,
+ automl_translation,
+)
from airflow.providers.google.cloud.hooks.translate import TranslateHook
from airflow.providers.google.cloud.operators.translate import (
@@ -30,20 +34,21 @@ from airflow.providers.google.cloud.operators.translate
import (
TranslateDatasetsListOperator,
TranslateDeleteDatasetOperator,
TranslateDeleteModelOperator,
+ TranslateDocumentBatchOperator,
+ TranslateDocumentOperator,
TranslateImportDataOperator,
TranslateModelsListOperator,
TranslateTextBatchOperator,
TranslateTextOperator,
)
-from providers.tests.system.google.cloud.tasks.example_tasks import LOCATION
-
GCP_CONN_ID = "google_cloud_default"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
PROJECT_ID = "test-project-id"
DATASET_ID = "sample_ds_id"
MODEL_ID = "sample_model_id"
TIMEOUT_VALUE = 30
+LOCATION = "location_id"
class TestCloudTranslate:
@@ -542,3 +547,164 @@ class TestTranslateDeleteModel:
metadata=(),
)
wait_for_done.assert_called_once_with(operation=m_delete_method_result,
timeout=TIMEOUT_VALUE)
+
+
+class TestTranslateDocumentBatchOperator:
+
@mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist")
+
@mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook")
+ def test_minimal_green_path(self, mock_hook, mock_link_persist):
+ input_config_item_1 = {
+ "gcs_source": {"input_uri":
"gs://source_bucket_uri/sample_data_src_lang_1.txt"},
+ }
+ input_config_item_2 = {
+ "gcs_source": {"input_uri":
"gs://source_bucket_uri/sample_data_src_lang_2.txt"},
+ }
+ SRC_LANG_CODE = "src_lang_code"
+ TARGET_LANG_CODES = ["target_lang_code1", "target_lang_code2"]
+ TIMEOUT = 30
+ INPUT_CONFIGS = [input_config_item_1, input_config_item_2]
+ OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix":
"gs://source_bucket_uri/output/"}}
+ BATCH_DOC_TRANSLATION_RESULT = {
+ "submit_time": "2024-12-01T00:01:16Z",
+ "end_time": "2024-12-01T00:10:01Z",
+ "failed_characters": "0",
+ "failed_pages": "0",
+ "total_billable_characters": "0",
+ "total_billable_pages": "6",
+ "total_characters": "4240",
+ "total_pages": "6",
+ "translated_characters": "4240",
+ "translated_pages": "6",
+ }
+ sample_operation = mock.MagicMock()
+ sample_operation.result.return_value =
BatchTranslateDocumentResponse(BATCH_DOC_TRANSLATION_RESULT)
+
+ mock_hook.return_value.batch_translate_document.return_value =
sample_operation
+ mock_hook.return_value.wait_for_operation_result.side_effect = lambda
operation: operation.result()
+
+ op = TranslateDocumentBatchOperator(
+ task_id="task_id_test",
+ project_id=PROJECT_ID,
+ source_language_code=SRC_LANG_CODE,
+ target_language_codes=TARGET_LANG_CODES,
+ location=LOCATION,
+ models=None,
+ glossaries=None,
+ input_configs=INPUT_CONFIGS,
+ output_config=OUTPUT_CONFIG,
+ customized_attribution=None,
+ format_conversions=None,
+ enable_shadow_removal_native_pdf=False,
+ enable_rotation_correction=False,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ metadata=(),
+ timeout=TIMEOUT,
+ retry=None,
+ )
+ context = {"ti": mock.MagicMock()}
+ result = op.execute(context=context)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
mock_hook.return_value.batch_translate_document.assert_called_once_with(
+ project_id=PROJECT_ID,
+ source_language_code=SRC_LANG_CODE,
+ target_language_codes=TARGET_LANG_CODES,
+ location=LOCATION,
+ input_configs=INPUT_CONFIGS,
+ output_config=OUTPUT_CONFIG,
+ customized_attribution=None,
+ format_conversions=None,
+ enable_shadow_removal_native_pdf=False,
+ enable_rotation_correction=False,
+ timeout=TIMEOUT,
+ models=None,
+ glossaries=None,
+ retry=None,
+ metadata=(),
+ )
+
+ assert result == BATCH_DOC_TRANSLATION_RESULT
+ mock_link_persist.assert_called_once_with(
+ context=context,
+ task_instance=op,
+ project_id=PROJECT_ID,
+ output_config=OUTPUT_CONFIG,
+ )
+
+
+class TestTranslateDocumentOperator:
+
@mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist")
+
@mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook")
+ def test_minimal_green_path(self, mock_hook, mock_link_persist):
+ SRC_LANG_CODE = "src_lang_code"
+ TARGET_LANG_CODE = "target_lang_code1"
+ TIMEOUT = 30
+ INPUT_CONFIG = {"gcs_source": {"input_uri":
"gs://source_bucket_uri/sample_data_src_lang_1.txt"}}
+ OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix":
"gs://source_bucket_uri/output/"}}
+ DOC_TRANSLATION_RESULT = {
+ "document_translation": {
+ "byte_stream_outputs": ["c29tZV9kYXRh"],
+ "detected_language_code": "",
+ "mime_type":
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ },
+ "model":
f"projects/{PROJECT_ID}/locations/us-central1/models/general/nmt",
+ }
+
+ mock_hook.return_value.translate_document.return_value =
TranslateDocumentResponse(
+ DOC_TRANSLATION_RESULT
+ )
+
+ op = TranslateDocumentOperator(
+ task_id="task_id_test",
+ project_id=PROJECT_ID,
+ source_language_code=SRC_LANG_CODE,
+ target_language_code=TARGET_LANG_CODE,
+ location=LOCATION,
+ model=None,
+ glossary_config=None,
+ labels=None,
+ document_input_config=INPUT_CONFIG,
+ document_output_config=OUTPUT_CONFIG,
+ customized_attribution=None,
+ is_translate_native_pdf_only=False,
+ enable_shadow_removal_native_pdf=False,
+ enable_rotation_correction=False,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ timeout=TIMEOUT,
+ retry=None,
+ )
+ context = {"ti": mock.MagicMock()}
+ result = op.execute(context=context)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.translate_document.assert_called_once_with(
+ source_language_code=SRC_LANG_CODE,
+ target_language_code=TARGET_LANG_CODE,
+ location=LOCATION,
+ model=None,
+ glossary_config=None,
+ labels=None,
+ document_input_config=INPUT_CONFIG,
+ document_output_config=OUTPUT_CONFIG,
+ customized_attribution=None,
+ is_translate_native_pdf_only=False,
+ enable_shadow_removal_native_pdf=False,
+ enable_rotation_correction=False,
+ timeout=TIMEOUT,
+ retry=None,
+ metadata=(),
+ )
+
+ assert result == DOC_TRANSLATION_RESULT
+ mock_link_persist.assert_called_once_with(
+ context=context,
+ task_instance=op,
+ project_id=PROJECT_ID,
+ output_config=OUTPUT_CONFIG,
+ )
diff --git
a/providers/tests/system/google/cloud/translate/example_translate_document.py
b/providers/tests/system/google/cloud/translate/example_translate_document.py
new file mode 100644
index 00000000000..8c47a4d4ecd
--- /dev/null
+++
b/providers/tests/system/google/cloud/translate/example_translate_document.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that translates text in Google Cloud Translate using V3
API version
+service in the Google Cloud.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.operators.translate import (
+ TranslateDocumentBatchOperator,
+ TranslateDocumentOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "gcp_translate_document"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+REGION = "us-central1"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+DATA_OUTPUT_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
+
+DOC_TRANSLATE_INPUT = {
+ "gcs_source": {
+ "input_uri":
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/document_translate/translate_me_sample.xlsx"
+ },
+}
+GCS_OUTPUT_DST = {
+ "gcs_destination": {"output_uri_prefix":
f"gs://{DATA_OUTPUT_BUCKET_NAME}/doc_translate_output/"}
+}
+BATCH_DOC_INPUT_ITEM_1 = {
+ "gcs_source": {
+ "input_uri":
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_doc_sample_1.docx"
+ }
+}
+BATCH_DOC_INPUT_ITEM_2 = {
+ "gcs_source": {
+ "input_uri":
f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_sample_2.pdf"
+ }
+}
+BATCH_OUTPUT_CONFIG = {
+ "gcs_destination": {"output_uri_prefix":
f"gs://{DATA_OUTPUT_BUCKET_NAME}/batch_translate_docs_output/"}
+}
+
+
+with DAG(
+ DAG_ID,
+ schedule="@once", # Override to match your needs
+ start_date=datetime(2024, 1, 1),
+ catchup=False,
+ tags=["example", "document_translate", "document_translate_batch",
"translate_V3"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=DATA_OUTPUT_BUCKET_NAME,
+ storage_class="REGIONAL",
+ location=REGION,
+ )
+ # [START howto_operator_translate_document]
+ translate_document = TranslateDocumentOperator(
+ task_id="translate_document_op",
+ project_id=PROJECT_ID,
+ location=REGION,
+ source_language_code="en",
+ target_language_code="uk",
+ document_input_config=DOC_TRANSLATE_INPUT,
+ document_output_config=GCS_OUTPUT_DST,
+ )
+ # [END howto_operator_translate_document]
+
+ # [START howto_operator_translate_document_batch]
+ translate_document_batch = TranslateDocumentBatchOperator(
+ task_id="batch_translate_document_op",
+ project_id=PROJECT_ID,
+ location=REGION,
+ source_language_code="en",
+ target_language_codes=["uk", "fr"],
+ input_configs=[BATCH_DOC_INPUT_ITEM_1, BATCH_DOC_INPUT_ITEM_2],
+ output_config=BATCH_OUTPUT_CONFIG,
+ )
+ # [END howto_operator_translate_document_batch]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=DATA_OUTPUT_BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> [translate_document, translate_document_batch]
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)